LCOV - differential code coverage report
Current view: top level - libs/pageserver_api/src - models.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 88.6 % 385 341 1 43 341
Current Date: 2023-10-19 02:04:12 Functions: 23.4 % 692 162 1 529 162
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::{
       2                 :     collections::HashMap,
       3                 :     num::{NonZeroU64, NonZeroUsize},
       4                 :     time::SystemTime,
       5                 : };
       6                 : 
       7                 : use byteorder::{BigEndian, ReadBytesExt};
       8                 : use serde::{Deserialize, Serialize};
       9                 : use serde_with::{serde_as, DisplayFromStr};
      10                 : use strum_macros;
      11                 : use utils::{
      12                 :     completion,
      13                 :     generation::Generation,
      14                 :     history_buffer::HistoryBufferWithDropCounter,
      15                 :     id::{NodeId, TenantId, TimelineId},
      16                 :     lsn::Lsn,
      17                 : };
      18                 : 
      19                 : use crate::reltag::RelTag;
      20                 : use anyhow::bail;
      21                 : use bytes::{BufMut, Bytes, BytesMut};
      22                 : 
      23                 : /// The state of a tenant in this pageserver.
      24                 : ///
      25                 : /// ```mermaid
      26                 : /// stateDiagram-v2
      27                 : ///
      28                 : ///     [*] --> Loading: spawn_load()
      29                 : ///     [*] --> Attaching: spawn_attach()
      30                 : ///
      31                 : ///     Loading --> Activating: activate()
      32                 : ///     Attaching --> Activating: activate()
      33                 : ///     Activating --> Active: infallible
      34                 : ///
      35                 : ///     Loading --> Broken: load() failure
      36                 : ///     Attaching --> Broken: attach() failure
      37                 : ///
      38                 : ///     Active --> Stopping: set_stopping(), part of shutdown & detach
      39                 : ///     Stopping --> Broken: late error in remove_tenant_from_memory
      40                 : ///
      41                 : ///     Broken --> [*]: ignore / detach / shutdown
      42                 : ///     Stopping --> [*]: remove_from_memory complete
      43                 : ///
      44                 : ///     Active --> Broken: cfg(testing)-only tenant break point
      45                 : /// ```
      46                 : #[derive(
      47 CBC       20385 :     Clone,
      48            3115 :     PartialEq,
      49                 :     Eq,
      50             844 :     serde::Serialize,
      51              28 :     serde::Deserialize,
      52              19 :     strum_macros::Display,
      53                 :     strum_macros::EnumVariantNames,
      54 UBC           0 :     strum_macros::AsRefStr,
      55 CBC        2834 :     strum_macros::IntoStaticStr,
      56                 : )]
      57                 : #[serde(tag = "slug", content = "data")]
      58                 : pub enum TenantState {
      59                 :     /// This tenant is being loaded from local disk.
      60                 :     ///
      61                 :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      62                 :     Loading,
      63                 :     /// This tenant is being attached to the pageserver.
      64                 :     ///
      65                 :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      66                 :     Attaching,
      67                 :     /// The tenant is transitioning from Loading/Attaching to Active.
      68                 :     ///
      69                 :     /// While in this state, the individual timelines are being activated.
      70                 :     ///
      71                 :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      72                 :     Activating(ActivatingFrom),
      73                 :     /// The tenant has finished activating and is open for business.
      74                 :     ///
      75                 :     /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
      76                 :     Active,
      77                 :     /// The tenant is recognized by pageserver, but it is being detached or the
      78                 :     /// system is being shut down.
      79                 :     ///
      80                 :     /// Transitions out of this state are possible through `set_broken()`.
      81                 :     Stopping {
      82                 :         // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
      83                 :         // otherwise it will not be skipped during deserialization
      84                 :         #[serde(skip)]
      85                 :         progress: completion::Barrier,
      86                 :     },
      87                 :     /// The tenant is recognized by the pageserver, but can no longer be used for
      88                 :     /// any operations.
      89                 :     ///
      90                 :     /// If the tenant fails to load or attach, it will transition to this state
      91                 :     /// and it is guaranteed that no background tasks are running in its name.
      92                 :     ///
      93                 :     /// The other way to transition into this state is from `Stopping` state
      94                 :     /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
      95                 :     /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
      96                 :     Broken { reason: String, backtrace: String },
      97                 : }
      98                 : 
      99                 : impl TenantState {
     100                 :     pub fn attachment_status(&self) -> TenantAttachmentStatus {
     101                 :         use TenantAttachmentStatus::*;
     102                 : 
     103                 :         // Below TenantState::Activating is used as "transient" or "transparent" state for
     104                 :         // attachment_status determining.
     105 LBC         (1) :         match self {
     106                 :             // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
     107                 :             // So, technically, we can return Attached here.
     108                 :             // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
     109                 :             // But, our attach task might still be fetching the remote timelines, etc.
     110                 :             // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
     111 CBC          32 :             Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
     112                 :             // tenant mgr startup distinguishes attaching from loading via marker file.
     113                 :             // If it's loading, there is no attach marker file, i.e., attach had finished in the past.
     114              61 :             Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
     115                 :             // We only reach Active after successful load / attach.
     116                 :             // So, call atttachment status Attached.
     117             164 :             Self::Active => Attached,
     118                 :             // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
     119                 :             // However, it also becomes Broken if the regular load fails.
     120                 :             // From Console's perspective there's no practical difference
     121                 :             // because attachment_status is polled by console only during attach operation execution.
     122             103 :             Self::Broken { reason, .. } => Failed {
     123             103 :                 reason: reason.to_owned(),
     124             103 :             },
     125                 :             // Why is Stopping a Maybe case? Because, during pageserver shutdown,
     126                 :             // we set the Stopping state irrespective of whether the tenant
     127                 :             // has finished attaching or not.
     128             188 :             Self::Stopping { .. } => Maybe,
     129                 :         }
     130             548 :     }
     131                 : 
     132              64 :     pub fn broken_from_reason(reason: String) -> Self {
     133              64 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
     134              64 :         Self::Broken {
     135              64 :             reason,
     136              64 :             backtrace: backtrace_str,
     137              64 :         }
     138              64 :     }
     139                 : }
     140                 : 
     141                 : impl std::fmt::Debug for TenantState {
     142                 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     143              70 :         match self {
     144              70 :             Self::Broken { reason, backtrace } if !reason.is_empty() => {
     145              70 :                 write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
     146                 :             }
     147              19 :             _ => write!(f, "{self}"),
     148                 :         }
     149              89 :     }
     150                 : }
     151                 : 
     152                 : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
     153             245 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
     154                 : pub enum ActivatingFrom {
     155                 :     /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
     156                 :     Loading,
     157                 :     /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
     158                 :     Attaching,
     159                 : }
     160                 : 
     161                 : /// A state of a timeline in pageserver's memory.
     162         3105544 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
     163                 : pub enum TimelineState {
     164                 :     /// The timeline is recognized by the pageserver but is not yet operational.
     165                 :     /// In particular, the walreceiver connection loop is not running for this timeline.
     166                 :     /// It will eventually transition to state Active or Broken.
     167                 :     Loading,
     168                 :     /// The timeline is fully operational.
     169                 :     /// It can be queried, and the walreceiver connection loop is running.
     170                 :     Active,
     171                 :     /// The timeline was previously Loading or Active but is shutting down.
     172                 :     /// It cannot transition back into any other state.
     173                 :     Stopping,
     174                 :     /// The timeline is broken and not operational (previous states: Loading or Active).
     175                 :     Broken { reason: String, backtrace: String },
     176                 : }
     177                 : 
     178                 : #[serde_as]
     179           10176 : #[derive(Serialize, Deserialize)]
     180                 : pub struct TimelineCreateRequest {
     181                 :     #[serde_as(as = "DisplayFromStr")]
     182                 :     pub new_timeline_id: TimelineId,
     183                 :     #[serde(default)]
     184                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     185                 :     pub ancestor_timeline_id: Option<TimelineId>,
     186                 :     #[serde(default)]
     187                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     188                 :     pub ancestor_start_lsn: Option<Lsn>,
     189                 :     pub pg_version: Option<u32>,
     190                 : }
     191                 : 
     192                 : #[serde_as]
     193            9697 : #[derive(Serialize, Deserialize, Debug)]
     194                 : #[serde(deny_unknown_fields)]
     195                 : pub struct TenantCreateRequest {
     196                 :     #[serde_as(as = "DisplayFromStr")]
     197                 :     pub new_tenant_id: TenantId,
     198                 :     #[serde(default)]
     199                 :     #[serde(skip_serializing_if = "Option::is_none")]
     200                 :     pub generation: Option<u32>,
     201                 :     #[serde(flatten)]
     202                 :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
     203                 : }
     204                 : 
     205                 : #[serde_as]
     206 UBC           0 : #[derive(Deserialize, Debug)]
     207                 : #[serde(deny_unknown_fields)]
     208                 : pub struct TenantLoadRequest {
     209                 :     #[serde(default)]
     210                 :     #[serde(skip_serializing_if = "Option::is_none")]
     211                 :     pub generation: Option<u32>,
     212                 : }
     213                 : 
     214                 : impl std::ops::Deref for TenantCreateRequest {
     215                 :     type Target = TenantConfig;
     216                 : 
     217               0 :     fn deref(&self) -> &Self::Target {
     218               0 :         &self.config
     219               0 :     }
     220                 : }
     221                 : 
     222                 : /// An alternative representation of `pageserver::tenant::TenantConf` with
     223                 : /// simpler types.
     224 CBC       15957 : #[derive(Serialize, Deserialize, Debug, Default)]
     225                 : pub struct TenantConfig {
     226                 :     pub checkpoint_distance: Option<u64>,
     227                 :     pub checkpoint_timeout: Option<String>,
     228                 :     pub compaction_target_size: Option<u64>,
     229                 :     pub compaction_period: Option<String>,
     230                 :     pub compaction_threshold: Option<usize>,
     231                 :     pub gc_horizon: Option<u64>,
     232                 :     pub gc_period: Option<String>,
     233                 :     pub image_creation_threshold: Option<usize>,
     234                 :     pub pitr_interval: Option<String>,
     235                 :     pub walreceiver_connect_timeout: Option<String>,
     236                 :     pub lagging_wal_timeout: Option<String>,
     237                 :     pub max_lsn_wal_lag: Option<NonZeroU64>,
     238                 :     pub trace_read_requests: Option<bool>,
     239                 :     // We defer the parsing of the eviction_policy field to the request handler.
     240                 :     // Otherwise we'd have to move the types for eviction policy into this package.
     241                 :     // We might do that once the eviction feature has stabilizied.
     242                 :     // For now, this field is not even documented in the openapi_spec.yml.
     243                 :     pub eviction_policy: Option<serde_json::Value>,
     244                 :     pub min_resident_size_override: Option<u64>,
     245                 :     pub evictions_low_residence_duration_metric_threshold: Option<String>,
     246                 :     pub gc_feedback: Option<bool>,
     247                 : }
     248                 : 
     249                 : /// A flattened analog of a `pagesever::tenant::LocationMode`, which
     250                 : /// lists out all possible states (and the virtual "Detached" state)
     251                 : /// in a flat form rather than using rust-style enums.
     252 UBC           0 : #[derive(Serialize, Deserialize, Debug)]
     253                 : pub enum LocationConfigMode {
     254                 :     AttachedSingle,
     255                 :     AttachedMulti,
     256                 :     AttachedStale,
     257                 :     Secondary,
     258                 :     Detached,
     259                 : }
     260                 : 
     261               0 : #[derive(Serialize, Deserialize, Debug)]
     262                 : pub struct LocationConfigSecondary {
     263                 :     pub warm: bool,
     264                 : }
     265                 : 
     266                 : /// An alternative representation of `pageserver::tenant::LocationConf`,
     267                 : /// for use in external-facing APIs.
     268               0 : #[derive(Serialize, Deserialize, Debug)]
     269                 : pub struct LocationConfig {
     270                 :     pub mode: LocationConfigMode,
     271                 :     /// If attaching, in what generation?
     272                 :     #[serde(default)]
     273                 :     pub generation: Option<Generation>,
     274                 :     #[serde(default)]
     275                 :     pub secondary_conf: Option<LocationConfigSecondary>,
     276                 : 
     277                 :     // If requesting mode `Secondary`, configuration for that.
     278                 :     // Custom storage configuration for the tenant, if any
     279                 :     pub tenant_conf: TenantConfig,
     280                 : }
     281                 : 
     282                 : #[serde_as]
     283 CBC         449 : #[derive(Serialize, Deserialize)]
     284                 : #[serde(transparent)]
     285                 : pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
     286                 : 
     287             564 : #[derive(Serialize)]
     288                 : pub struct StatusResponse {
     289                 :     pub id: NodeId,
     290                 : }
     291                 : 
     292                 : #[serde_as]
     293 UBC           0 : #[derive(Serialize, Deserialize, Debug)]
     294                 : #[serde(deny_unknown_fields)]
     295                 : pub struct TenantLocationConfigRequest {
     296                 :     #[serde_as(as = "DisplayFromStr")]
     297                 :     pub tenant_id: TenantId,
     298                 :     #[serde(flatten)]
     299                 :     pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
     300                 : }
     301                 : 
     302                 : #[serde_as]
     303 CBC         420 : #[derive(Serialize, Deserialize, Debug)]
     304                 : #[serde(deny_unknown_fields)]
     305                 : pub struct TenantConfigRequest {
     306                 :     #[serde_as(as = "DisplayFromStr")]
     307                 :     pub tenant_id: TenantId,
     308                 :     #[serde(flatten)]
     309                 :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
     310                 : }
     311                 : 
     312                 : impl std::ops::Deref for TenantConfigRequest {
     313                 :     type Target = TenantConfig;
     314                 : 
     315 UBC           0 :     fn deref(&self) -> &Self::Target {
     316               0 :         &self.config
     317               0 :     }
     318                 : }
     319                 : 
     320                 : impl TenantConfigRequest {
     321               0 :     pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
     322               0 :         let config = TenantConfig {
     323               0 :             checkpoint_distance: None,
     324               0 :             checkpoint_timeout: None,
     325               0 :             compaction_target_size: None,
     326               0 :             compaction_period: None,
     327               0 :             compaction_threshold: None,
     328               0 :             gc_horizon: None,
     329               0 :             gc_period: None,
     330               0 :             image_creation_threshold: None,
     331               0 :             pitr_interval: None,
     332               0 :             walreceiver_connect_timeout: None,
     333               0 :             lagging_wal_timeout: None,
     334               0 :             max_lsn_wal_lag: None,
     335               0 :             trace_read_requests: None,
     336               0 :             eviction_policy: None,
     337               0 :             min_resident_size_override: None,
     338               0 :             evictions_low_residence_duration_metric_threshold: None,
     339               0 :             gc_feedback: None,
     340               0 :         };
     341               0 :         TenantConfigRequest { tenant_id, config }
     342               0 :     }
     343                 : }
     344                 : 
     345 CBC         195 : #[derive(Debug, Deserialize)]
     346                 : pub struct TenantAttachRequest {
     347                 :     pub config: TenantAttachConfig,
     348                 :     #[serde(default)]
     349                 :     pub generation: Option<u32>,
     350                 : }
     351                 : 
     352                 : /// Newtype to enforce deny_unknown_fields on TenantConfig for
     353                 : /// its usage inside `TenantAttachRequest`.
     354             112 : #[derive(Debug, Serialize, Deserialize)]
     355                 : #[serde(deny_unknown_fields)]
     356                 : pub struct TenantAttachConfig {
     357                 :     #[serde(flatten)]
     358                 :     allowing_unknown_fields: TenantConfig,
     359                 : }
     360                 : 
     361                 : impl std::ops::Deref for TenantAttachConfig {
     362                 :     type Target = TenantConfig;
     363                 : 
     364              55 :     fn deref(&self) -> &Self::Target {
     365              55 :         &self.allowing_unknown_fields
     366              55 :     }
     367                 : }
     368                 : 
     369                 : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
     370             653 : #[derive(Serialize, Deserialize, Clone)]
     371                 : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
     372                 : pub enum TenantAttachmentStatus {
     373                 :     Maybe,
     374                 :     Attached,
     375                 :     Failed { reason: String },
     376                 : }
     377                 : 
     378                 : #[serde_as]
     379             550 : #[derive(Serialize, Deserialize, Clone)]
     380                 : pub struct TenantInfo {
     381                 :     #[serde_as(as = "DisplayFromStr")]
     382                 :     pub id: TenantId,
     383                 :     // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
     384                 :     pub state: TenantState,
     385                 :     /// Sum of the size of all layer files.
     386                 :     /// If a layer is present in both local FS and S3, it counts only once.
     387                 :     pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
     388                 :     pub attachment_status: TenantAttachmentStatus,
     389                 : }
     390                 : 
     391                 : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
     392                 : #[serde_as]
     393           43108 : #[derive(Debug, Serialize, Deserialize, Clone)]
     394                 : pub struct TimelineInfo {
     395                 :     #[serde_as(as = "DisplayFromStr")]
     396                 :     pub tenant_id: TenantId,
     397                 :     #[serde_as(as = "DisplayFromStr")]
     398                 :     pub timeline_id: TimelineId,
     399                 : 
     400                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     401                 :     pub ancestor_timeline_id: Option<TimelineId>,
     402                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     403                 :     pub ancestor_lsn: Option<Lsn>,
     404                 :     #[serde_as(as = "DisplayFromStr")]
     405                 :     pub last_record_lsn: Lsn,
     406                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     407                 :     pub prev_record_lsn: Option<Lsn>,
     408                 :     #[serde_as(as = "DisplayFromStr")]
     409                 :     pub latest_gc_cutoff_lsn: Lsn,
     410                 :     #[serde_as(as = "DisplayFromStr")]
     411                 :     pub disk_consistent_lsn: Lsn,
     412                 : 
     413                 :     /// The LSN that we have succesfully uploaded to remote storage
     414                 :     #[serde_as(as = "DisplayFromStr")]
     415                 :     pub remote_consistent_lsn: Lsn,
     416                 : 
     417                 :     /// The LSN that we are advertizing to safekeepers
     418                 :     #[serde_as(as = "DisplayFromStr")]
     419                 :     pub remote_consistent_lsn_visible: Lsn,
     420                 : 
     421                 :     pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
     422                 :     /// Sum of the size of all layer files.
     423                 :     /// If a layer is present in both local FS and S3, it counts only once.
     424                 :     pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
     425                 :     pub current_logical_size_non_incremental: Option<u64>,
     426                 : 
     427                 :     pub timeline_dir_layer_file_size_sum: Option<u64>,
     428                 : 
     429                 :     pub wal_source_connstr: Option<String>,
     430                 :     #[serde_as(as = "Option<DisplayFromStr>")]
     431                 :     pub last_received_msg_lsn: Option<Lsn>,
     432                 :     /// the timestamp (in microseconds) of the last received message
     433                 :     pub last_received_msg_ts: Option<u128>,
     434                 :     pub pg_version: u32,
     435                 : 
     436                 :     pub state: TimelineState,
     437                 : 
     438                 :     pub walreceiver_status: String,
     439                 : }
     440                 : 
     441              85 : #[derive(Debug, Clone, Serialize)]
     442                 : pub struct LayerMapInfo {
     443                 :     pub in_memory_layers: Vec<InMemoryLayerInfo>,
     444                 :     pub historic_layers: Vec<HistoricLayerInfo>,
     445                 : }
     446                 : 
     447        43821128 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
     448                 : #[repr(usize)]
     449                 : pub enum LayerAccessKind {
     450                 :     GetValueReconstructData,
     451                 :     Iter,
     452                 :     KeyIter,
     453                 :     Dump,
     454                 : }
     455                 : 
     456            8532 : #[derive(Debug, Clone, Serialize, Deserialize)]
     457                 : pub struct LayerAccessStatFullDetails {
     458                 :     pub when_millis_since_epoch: u64,
     459                 :     pub task_kind: &'static str,
     460                 :     pub access_kind: LayerAccessKind,
     461                 : }
     462                 : 
     463                 : /// An event that impacts the layer's residence status.
     464                 : #[serde_as]
     465            8864 : #[derive(Debug, Clone, Serialize, Deserialize)]
     466                 : pub struct LayerResidenceEvent {
     467                 :     /// The time when the event occurred.
     468                 :     /// NB: this timestamp is captured while the residence status changes.
     469                 :     /// So, it might be behind/ahead of the actual residence change by a short amount of time.
     470                 :     ///
     471                 :     #[serde(rename = "timestamp_millis_since_epoch")]
     472                 :     #[serde_as(as = "serde_with::TimestampMilliSeconds")]
     473                 :     pub timestamp: SystemTime,
     474                 :     /// The new residence status of the layer.
     475                 :     pub status: LayerResidenceStatus,
     476                 :     /// The reason why we had to record this event.
     477                 :     pub reason: LayerResidenceEventReason,
     478                 : }
     479                 : 
     480                 : /// The reason for recording a given [`LayerResidenceEvent`].
     481            8864 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     482                 : pub enum LayerResidenceEventReason {
     483                 :     /// The layer map is being populated, e.g. during timeline load or attach.
     484                 :     /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
     485                 :     /// We need to record such events because there is no persistent storage for the events.
     486                 :     ///
     487                 :     // https://github.com/rust-lang/rust/issues/74481
     488                 :     /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
     489                 :     /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
     490                 :     LayerLoad,
     491                 :     /// We just created the layer (e.g., freeze_and_flush or compaction).
     492                 :     /// Such layers are always [`LayerResidenceStatus::Resident`].
     493                 :     LayerCreate,
     494                 :     /// We on-demand downloaded or evicted the given layer.
     495                 :     ResidenceChange,
     496                 : }
     497                 : 
     498                 : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
     499            8864 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     500                 : pub enum LayerResidenceStatus {
     501                 :     /// Residence status for a layer file that exists locally.
     502                 :     /// It may also exist on the remote, we don't care here.
     503                 :     Resident,
     504                 :     /// Residence status for a layer file that only exists on the remote.
     505                 :     Evicted,
     506                 : }
     507                 : 
     508                 : impl LayerResidenceEvent {
     509           57278 :     pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
     510           57278 :         Self {
     511           57278 :             status,
     512           57278 :             reason,
     513           57278 :             timestamp: SystemTime::now(),
     514           57278 :         }
     515           57278 :     }
     516                 : }
     517                 : 
     518            1775 : #[derive(Debug, Clone, Serialize)]
     519                 : pub struct LayerAccessStats {
     520                 :     pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
     521                 :     pub task_kind_access_flag: Vec<&'static str>,
     522                 :     pub first: Option<LayerAccessStatFullDetails>,
     523                 :     pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
     524                 :     pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
     525                 : }
     526                 : 
     527                 : #[serde_as]
     528              21 : #[derive(Debug, Clone, Serialize)]
     529                 : #[serde(tag = "kind")]
     530                 : pub enum InMemoryLayerInfo {
     531                 :     Open {
     532                 :         #[serde_as(as = "DisplayFromStr")]
     533                 :         lsn_start: Lsn,
     534                 :     },
     535                 :     Frozen {
     536                 :         #[serde_as(as = "DisplayFromStr")]
     537                 :         lsn_start: Lsn,
     538                 :         #[serde_as(as = "DisplayFromStr")]
     539                 :         lsn_end: Lsn,
     540                 :     },
     541                 : }
     542                 : 
     543                 : #[serde_as]
     544            1775 : #[derive(Debug, Clone, Serialize)]
     545                 : #[serde(tag = "kind")]
     546                 : pub enum HistoricLayerInfo {
     547                 :     Delta {
     548                 :         layer_file_name: String,
     549                 :         layer_file_size: u64,
     550                 : 
     551                 :         #[serde_as(as = "DisplayFromStr")]
     552                 :         lsn_start: Lsn,
     553                 :         #[serde_as(as = "DisplayFromStr")]
     554                 :         lsn_end: Lsn,
     555                 :         remote: bool,
     556                 :         access_stats: LayerAccessStats,
     557                 :     },
     558                 :     Image {
     559                 :         layer_file_name: String,
     560                 :         layer_file_size: u64,
     561                 : 
     562                 :         #[serde_as(as = "DisplayFromStr")]
     563                 :         lsn_start: Lsn,
     564                 :         remote: bool,
     565                 :         access_stats: LayerAccessStats,
     566                 :     },
     567                 : }
     568                 : 
     569               9 : #[derive(Debug, Serialize, Deserialize)]
     570                 : pub struct DownloadRemoteLayersTaskSpawnRequest {
     571                 :     pub max_concurrent_downloads: NonZeroUsize,
     572                 : }
     573                 : 
     574              25 : #[derive(Debug, Serialize, Deserialize, Clone)]
     575                 : pub struct DownloadRemoteLayersTaskInfo {
     576                 :     pub task_id: String,
     577                 :     pub state: DownloadRemoteLayersTaskState,
     578                 :     pub total_layer_count: u64,         // stable once `completed`
     579                 :     pub successful_download_count: u64, // stable once `completed`
     580                 :     pub failed_download_count: u64,     // stable once `completed`
     581                 : }
     582                 : 
     583              25 : #[derive(Debug, Serialize, Deserialize, Clone)]
     584                 : pub enum DownloadRemoteLayersTaskState {
     585                 :     Running,
     586                 :     Completed,
     587                 :     ShutDown,
     588                 : }
     589                 : 
     590                 : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
     591                 : 
     592                 : /// Information for configuring a single fail point
     593            1065 : #[derive(Debug, Serialize, Deserialize)]
     594                 : pub struct FailpointConfig {
     595                 :     /// Name of the fail point
     596                 :     pub name: String,
     597                 :     /// List of actions to take, using the format described in `fail::cfg`
     598                 :     ///
     599                 :     /// We also support `actions = "exit"` to cause the fail point to immediately exit.
     600                 :     pub actions: String,
     601                 : }
     602                 : 
     603            1113 : #[derive(Debug, Serialize, Deserialize)]
     604                 : pub struct TimelineGcRequest {
     605                 :     pub gc_horizon: Option<u64>,
     606                 : }
     607                 : 
     608                 : // Wrapped in libpq CopyData
     609               4 : #[derive(PartialEq, Eq, Debug)]
     610                 : pub enum PagestreamFeMessage {
     611                 :     Exists(PagestreamExistsRequest),
     612                 :     Nblocks(PagestreamNblocksRequest),
     613                 :     GetPage(PagestreamGetPageRequest),
     614                 :     DbSize(PagestreamDbSizeRequest),
     615                 : }
     616                 : 
     617                 : // Wrapped in libpq CopyData
     618                 : pub enum PagestreamBeMessage {
     619                 :     Exists(PagestreamExistsResponse),
     620                 :     Nblocks(PagestreamNblocksResponse),
     621                 :     GetPage(PagestreamGetPageResponse),
     622                 :     Error(PagestreamErrorResponse),
     623                 :     DbSize(PagestreamDbSizeResponse),
     624                 : }
     625                 : 
     626               1 : #[derive(Debug, PartialEq, Eq)]
     627                 : pub struct PagestreamExistsRequest {
     628                 :     pub latest: bool,
     629                 :     pub lsn: Lsn,
     630                 :     pub rel: RelTag,
     631                 : }
     632                 : 
     633               1 : #[derive(Debug, PartialEq, Eq)]
     634                 : pub struct PagestreamNblocksRequest {
     635                 :     pub latest: bool,
     636                 :     pub lsn: Lsn,
     637                 :     pub rel: RelTag,
     638                 : }
     639                 : 
     640               1 : #[derive(Debug, PartialEq, Eq)]
     641                 : pub struct PagestreamGetPageRequest {
     642                 :     pub latest: bool,
     643                 :     pub lsn: Lsn,
     644                 :     pub rel: RelTag,
     645                 :     pub blkno: u32,
     646                 : }
     647                 : 
     648               1 : #[derive(Debug, PartialEq, Eq)]
     649                 : pub struct PagestreamDbSizeRequest {
     650                 :     pub latest: bool,
     651                 :     pub lsn: Lsn,
     652                 :     pub dbnode: u32,
     653                 : }
     654                 : 
     655 UBC           0 : #[derive(Debug)]
     656                 : pub struct PagestreamExistsResponse {
     657                 :     pub exists: bool,
     658                 : }
     659                 : 
     660               0 : #[derive(Debug)]
     661                 : pub struct PagestreamNblocksResponse {
     662                 :     pub n_blocks: u32,
     663                 : }
     664                 : 
     665               0 : #[derive(Debug)]
     666                 : pub struct PagestreamGetPageResponse {
     667                 :     pub page: Bytes,
     668                 : }
     669                 : 
     670               0 : #[derive(Debug)]
     671                 : pub struct PagestreamErrorResponse {
     672                 :     pub message: String,
     673                 : }
     674                 : 
     675               0 : #[derive(Debug)]
     676                 : pub struct PagestreamDbSizeResponse {
     677                 :     pub db_size: i64,
     678                 : }
     679                 : 
     680                 : impl PagestreamFeMessage {
     681 CBC           4 :     pub fn serialize(&self) -> Bytes {
     682               4 :         let mut bytes = BytesMut::new();
     683               4 : 
     684               4 :         match self {
     685               1 :             Self::Exists(req) => {
     686               1 :                 bytes.put_u8(0);
     687               1 :                 bytes.put_u8(u8::from(req.latest));
     688               1 :                 bytes.put_u64(req.lsn.0);
     689               1 :                 bytes.put_u32(req.rel.spcnode);
     690               1 :                 bytes.put_u32(req.rel.dbnode);
     691               1 :                 bytes.put_u32(req.rel.relnode);
     692               1 :                 bytes.put_u8(req.rel.forknum);
     693               1 :             }
     694                 : 
     695               1 :             Self::Nblocks(req) => {
     696               1 :                 bytes.put_u8(1);
     697               1 :                 bytes.put_u8(u8::from(req.latest));
     698               1 :                 bytes.put_u64(req.lsn.0);
     699               1 :                 bytes.put_u32(req.rel.spcnode);
     700               1 :                 bytes.put_u32(req.rel.dbnode);
     701               1 :                 bytes.put_u32(req.rel.relnode);
     702               1 :                 bytes.put_u8(req.rel.forknum);
     703               1 :             }
     704                 : 
     705               1 :             Self::GetPage(req) => {
     706               1 :                 bytes.put_u8(2);
     707               1 :                 bytes.put_u8(u8::from(req.latest));
     708               1 :                 bytes.put_u64(req.lsn.0);
     709               1 :                 bytes.put_u32(req.rel.spcnode);
     710               1 :                 bytes.put_u32(req.rel.dbnode);
     711               1 :                 bytes.put_u32(req.rel.relnode);
     712               1 :                 bytes.put_u8(req.rel.forknum);
     713               1 :                 bytes.put_u32(req.blkno);
     714               1 :             }
     715                 : 
     716               1 :             Self::DbSize(req) => {
     717               1 :                 bytes.put_u8(3);
     718               1 :                 bytes.put_u8(u8::from(req.latest));
     719               1 :                 bytes.put_u64(req.lsn.0);
     720               1 :                 bytes.put_u32(req.dbnode);
     721               1 :             }
     722                 :         }
     723                 : 
     724               4 :         bytes.into()
     725               4 :     }
     726                 : 
     727         3779080 :     pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
     728                 :         // TODO these gets can fail
     729                 : 
     730                 :         // these correspond to the NeonMessageTag enum in pagestore_client.h
     731                 :         //
     732                 :         // TODO: consider using protobuf or serde bincode for less error prone
     733                 :         // serialization.
     734         3779080 :         let msg_tag = body.read_u8()?;
     735         3779080 :         match msg_tag {
     736                 :             0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
     737           46664 :                 latest: body.read_u8()? != 0,
     738           46664 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     739                 :                 rel: RelTag {
     740           46664 :                     spcnode: body.read_u32::<BigEndian>()?,
     741           46664 :                     dbnode: body.read_u32::<BigEndian>()?,
     742           46664 :                     relnode: body.read_u32::<BigEndian>()?,
     743           46664 :                     forknum: body.read_u8()?,
     744                 :                 },
     745                 :             })),
     746                 :             1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     747           17068 :                 latest: body.read_u8()? != 0,
     748           17068 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     749                 :                 rel: RelTag {
     750           17068 :                     spcnode: body.read_u32::<BigEndian>()?,
     751           17068 :                     dbnode: body.read_u32::<BigEndian>()?,
     752           17068 :                     relnode: body.read_u32::<BigEndian>()?,
     753           17068 :                     forknum: body.read_u8()?,
     754                 :                 },
     755                 :             })),
     756                 :             2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     757         3715342 :                 latest: body.read_u8()? != 0,
     758         3715342 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     759                 :                 rel: RelTag {
     760         3715342 :                     spcnode: body.read_u32::<BigEndian>()?,
     761         3715342 :                     dbnode: body.read_u32::<BigEndian>()?,
     762         3715342 :                     relnode: body.read_u32::<BigEndian>()?,
     763         3715342 :                     forknum: body.read_u8()?,
     764                 :                 },
     765         3715342 :                 blkno: body.read_u32::<BigEndian>()?,
     766                 :             })),
     767                 :             3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     768               6 :                 latest: body.read_u8()? != 0,
     769               6 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     770               6 :                 dbnode: body.read_u32::<BigEndian>()?,
     771                 :             })),
     772 UBC           0 :             _ => bail!("unknown smgr message tag: {:?}", msg_tag),
     773                 :         }
     774 CBC     3779080 :     }
     775                 : }
     776                 : 
     777                 : impl PagestreamBeMessage {
     778         3779057 :     pub fn serialize(&self) -> Bytes {
     779         3779057 :         let mut bytes = BytesMut::new();
     780         3779057 : 
     781         3779057 :         match self {
     782           46663 :             Self::Exists(resp) => {
     783           46663 :                 bytes.put_u8(100); /* tag from pagestore_client.h */
     784           46663 :                 bytes.put_u8(resp.exists as u8);
     785           46663 :             }
     786                 : 
     787           17067 :             Self::Nblocks(resp) => {
     788           17067 :                 bytes.put_u8(101); /* tag from pagestore_client.h */
     789           17067 :                 bytes.put_u32(resp.n_blocks);
     790           17067 :             }
     791                 : 
     792         3715321 :             Self::GetPage(resp) => {
     793         3715321 :                 bytes.put_u8(102); /* tag from pagestore_client.h */
     794         3715321 :                 bytes.put(&resp.page[..]);
     795         3715321 :             }
     796                 : 
     797               1 :             Self::Error(resp) => {
     798               1 :                 bytes.put_u8(103); /* tag from pagestore_client.h */
     799               1 :                 bytes.put(resp.message.as_bytes());
     800               1 :                 bytes.put_u8(0); // null terminator
     801               1 :             }
     802               5 :             Self::DbSize(resp) => {
     803               5 :                 bytes.put_u8(104); /* tag from pagestore_client.h */
     804               5 :                 bytes.put_i64(resp.db_size);
     805               5 :             }
     806                 :         }
     807                 : 
     808         3779057 :         bytes.into()
     809         3779057 :     }
     810                 : }
     811                 : 
     812                 : #[cfg(test)]
     813                 : mod tests {
     814                 :     use bytes::Buf;
     815                 :     use serde_json::json;
     816                 : 
     817                 :     use super::*;
     818                 : 
     819               1 :     #[test]
     820               1 :     fn test_pagestream() {
     821               1 :         // Test serialization/deserialization of PagestreamFeMessage
     822               1 :         let messages = vec![
     823               1 :             PagestreamFeMessage::Exists(PagestreamExistsRequest {
     824               1 :                 latest: true,
     825               1 :                 lsn: Lsn(4),
     826               1 :                 rel: RelTag {
     827               1 :                     forknum: 1,
     828               1 :                     spcnode: 2,
     829               1 :                     dbnode: 3,
     830               1 :                     relnode: 4,
     831               1 :                 },
     832               1 :             }),
     833               1 :             PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     834               1 :                 latest: false,
     835               1 :                 lsn: Lsn(4),
     836               1 :                 rel: RelTag {
     837               1 :                     forknum: 1,
     838               1 :                     spcnode: 2,
     839               1 :                     dbnode: 3,
     840               1 :                     relnode: 4,
     841               1 :                 },
     842               1 :             }),
     843               1 :             PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     844               1 :                 latest: true,
     845               1 :                 lsn: Lsn(4),
     846               1 :                 rel: RelTag {
     847               1 :                     forknum: 1,
     848               1 :                     spcnode: 2,
     849               1 :                     dbnode: 3,
     850               1 :                     relnode: 4,
     851               1 :                 },
     852               1 :                 blkno: 7,
     853               1 :             }),
     854               1 :             PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     855               1 :                 latest: true,
     856               1 :                 lsn: Lsn(4),
     857               1 :                 dbnode: 7,
     858               1 :             }),
     859               1 :         ];
     860               5 :         for msg in messages {
     861               4 :             let bytes = msg.serialize();
     862               4 :             let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
     863               4 :             assert!(msg == reconstructed);
     864                 :         }
     865               1 :     }
     866                 : 
     867               1 :     #[test]
     868               1 :     fn test_tenantinfo_serde() {
     869               1 :         // Test serialization/deserialization of TenantInfo
     870               1 :         let original_active = TenantInfo {
     871               1 :             id: TenantId::generate(),
     872               1 :             state: TenantState::Active,
     873               1 :             current_physical_size: Some(42),
     874               1 :             attachment_status: TenantAttachmentStatus::Attached,
     875               1 :         };
     876               1 :         let expected_active = json!({
     877               1 :             "id": original_active.id.to_string(),
     878               1 :             "state": {
     879               1 :                 "slug": "Active",
     880               1 :             },
     881               1 :             "current_physical_size": 42,
     882               1 :             "attachment_status": {
     883               1 :                 "slug":"attached",
     884               1 :             }
     885               1 :         });
     886               1 : 
     887               1 :         let original_broken = TenantInfo {
     888               1 :             id: TenantId::generate(),
     889               1 :             state: TenantState::Broken {
     890               1 :                 reason: "reason".into(),
     891               1 :                 backtrace: "backtrace info".into(),
     892               1 :             },
     893               1 :             current_physical_size: Some(42),
     894               1 :             attachment_status: TenantAttachmentStatus::Attached,
     895               1 :         };
     896               1 :         let expected_broken = json!({
     897               1 :             "id": original_broken.id.to_string(),
     898               1 :             "state": {
     899               1 :                 "slug": "Broken",
     900               1 :                 "data": {
     901               1 :                     "backtrace": "backtrace info",
     902               1 :                     "reason": "reason",
     903               1 :                 }
     904               1 :             },
     905               1 :             "current_physical_size": 42,
     906               1 :             "attachment_status": {
     907               1 :                 "slug":"attached",
     908               1 :             }
     909               1 :         });
     910               1 : 
     911               1 :         assert_eq!(
     912               1 :             serde_json::to_value(&original_active).unwrap(),
     913               1 :             expected_active
     914               1 :         );
     915                 : 
     916               1 :         assert_eq!(
     917               1 :             serde_json::to_value(&original_broken).unwrap(),
     918               1 :             expected_broken
     919               1 :         );
     920               1 :         assert!(format!("{:?}", &original_broken.state).contains("reason"));
     921               1 :         assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
     922               1 :     }
     923                 : 
     924               1 :     #[test]
     925               1 :     fn test_reject_unknown_field() {
     926               1 :         let id = TenantId::generate();
     927               1 :         let create_request = json!({
     928               1 :             "new_tenant_id": id.to_string(),
     929               1 :             "unknown_field": "unknown_value".to_string(),
     930               1 :         });
     931               1 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     932               1 :         assert!(
     933               1 :             err.to_string().contains("unknown field `unknown_field`"),
     934 UBC           0 :             "expect unknown field `unknown_field` error, got: {}",
     935                 :             err
     936                 :         );
     937                 : 
     938 CBC           1 :         let id = TenantId::generate();
     939               1 :         let config_request = json!({
     940               1 :             "tenant_id": id.to_string(),
     941               1 :             "unknown_field": "unknown_value".to_string(),
     942               1 :         });
     943               1 :         let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
     944               1 :         assert!(
     945               1 :             err.to_string().contains("unknown field `unknown_field`"),
     946 UBC           0 :             "expect unknown field `unknown_field` error, got: {}",
     947                 :             err
     948                 :         );
     949                 : 
     950 CBC           1 :         let attach_request = json!({
     951               1 :             "config": {
     952               1 :                 "unknown_field": "unknown_value".to_string(),
     953               1 :             },
     954               1 :         });
     955               1 :         let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
     956               1 :         assert!(
     957               1 :             err.to_string().contains("unknown field `unknown_field`"),
     958 UBC           0 :             "expect unknown field `unknown_field` error, got: {}",
     959                 :             err
     960                 :         );
     961 CBC           1 :     }
     962                 : 
     963               1 :     #[test]
     964               1 :     fn tenantstatus_activating_serde() {
     965               1 :         let states = [
     966               1 :             TenantState::Activating(ActivatingFrom::Loading),
     967               1 :             TenantState::Activating(ActivatingFrom::Attaching),
     968               1 :         ];
     969               1 :         let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
     970               1 : 
     971               1 :         let actual = serde_json::to_string(&states).unwrap();
     972               1 : 
     973               1 :         assert_eq!(actual, expected);
     974                 : 
     975               1 :         let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
     976               1 : 
     977               1 :         assert_eq!(states.as_slice(), &parsed);
     978               1 :     }
     979                 : 
     980               1 :     #[test]
     981               1 :     fn tenantstatus_activating_strum() {
     982               1 :         // tests added, because we use these for metrics
     983               1 :         let examples = [
     984               1 :             (line!(), TenantState::Loading, "Loading"),
     985               1 :             (line!(), TenantState::Attaching, "Attaching"),
     986               1 :             (
     987               1 :                 line!(),
     988               1 :                 TenantState::Activating(ActivatingFrom::Loading),
     989               1 :                 "Activating",
     990               1 :             ),
     991               1 :             (
     992               1 :                 line!(),
     993               1 :                 TenantState::Activating(ActivatingFrom::Attaching),
     994               1 :                 "Activating",
     995               1 :             ),
     996               1 :             (line!(), TenantState::Active, "Active"),
     997               1 :             (
     998               1 :                 line!(),
     999               1 :                 TenantState::Stopping {
    1000               1 :                     progress: utils::completion::Barrier::default(),
    1001               1 :                 },
    1002               1 :                 "Stopping",
    1003               1 :             ),
    1004               1 :             (
    1005               1 :                 line!(),
    1006               1 :                 TenantState::Broken {
    1007               1 :                     reason: "Example".into(),
    1008               1 :                     backtrace: "Looooong backtrace".into(),
    1009               1 :                 },
    1010               1 :                 "Broken",
    1011               1 :             ),
    1012               1 :         ];
    1013                 : 
    1014               8 :         for (line, rendered, expected) in examples {
    1015               7 :             let actual: &'static str = rendered.into();
    1016               7 :             assert_eq!(actual, expected, "example on {line}");
    1017                 :         }
    1018               1 :     }
    1019                 : }
        

Generated by: LCOV version 2.1-beta