LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - models.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 88.3 % 386 341
Test Date: 2023-09-06 10:18:01 Functions: 25.1 % 645 162

            Line data    Source code
       1              : use std::{
       2              :     collections::HashMap,
       3              :     num::{NonZeroU64, NonZeroUsize},
       4              :     time::SystemTime,
       5              : };
       6              : 
       7              : use byteorder::{BigEndian, ReadBytesExt};
       8              : use serde::{Deserialize, Serialize};
       9              : use serde_with::{serde_as, DisplayFromStr};
      10              : use strum_macros;
      11              : use utils::{
      12              :     completion,
      13              :     history_buffer::HistoryBufferWithDropCounter,
      14              :     id::{NodeId, TenantId, TimelineId},
      15              :     lsn::Lsn,
      16              : };
      17              : 
      18              : use crate::reltag::RelTag;
      19              : use anyhow::bail;
      20              : use bytes::{BufMut, Bytes, BytesMut};
      21              : 
      22              : /// The state of a tenant in this pageserver.
      23              : ///
      24              : /// ```mermaid
      25              : /// stateDiagram-v2
      26              : ///
      27              : ///     [*] --> Loading: spawn_load()
      28              : ///     [*] --> Attaching: spawn_attach()
      29              : ///
      30              : ///     Loading --> Activating: activate()
      31              : ///     Attaching --> Activating: activate()
      32              : ///     Activating --> Active: infallible
      33              : ///
      34              : ///     Loading --> Broken: load() failure
      35              : ///     Attaching --> Broken: attach() failure
      36              : ///
      37              : ///     Active --> Stopping: set_stopping(), part of shutdown & detach
      38              : ///     Stopping --> Broken: late error in remove_tenant_from_memory
      39              : ///
      40              : ///     Broken --> [*]: ignore / detach / shutdown
      41              : ///     Stopping --> [*]: remove_from_memory complete
      42              : ///
      43              : ///     Active --> Broken: cfg(testing)-only tenant break point
      44              : /// ```
      45              : #[derive(
      46        14838 :     Clone,
      47         7090 :     PartialEq,
      48              :     Eq,
      49          842 :     serde::Serialize,
      50           28 :     serde::Deserialize,
      51           18 :     strum_macros::Display,
      52              :     strum_macros::EnumVariantNames,
      53            0 :     strum_macros::AsRefStr,
      54         2950 :     strum_macros::IntoStaticStr,
      55              : )]
      56              : #[serde(tag = "slug", content = "data")]
      57              : pub enum TenantState {
      58              :     /// This tenant is being loaded from local disk.
      59              :     ///
      60              :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      61              :     Loading,
      62              :     /// This tenant is being attached to the pageserver.
      63              :     ///
      64              :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      65              :     Attaching,
      66              :     /// The tenant is transitioning from Loading/Attaching to Active.
      67              :     ///
      68              :     /// While in this state, the individual timelines are being activated.
      69              :     ///
      70              :     /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
      71              :     Activating(ActivatingFrom),
      72              :     /// The tenant has finished activating and is open for business.
      73              :     ///
      74              :     /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
      75              :     Active,
      76              :     /// The tenant is recognized by pageserver, but it is being detached or the
      77              :     /// system is being shut down.
      78              :     ///
      79              :     /// Transitions out of this state are possible through `set_broken()`.
      80              :     Stopping {
      81              :         // Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
      82              :         // otherwise it will not be skipped during deserialization
      83              :         #[serde(skip)]
      84              :         progress: completion::Barrier,
      85              :     },
      86              :     /// The tenant is recognized by the pageserver, but can no longer be used for
      87              :     /// any operations.
      88              :     ///
      89              :     /// If the tenant fails to load or attach, it will transition to this state
      90              :     /// and it is guaranteed that no background tasks are running in its name.
      91              :     ///
      92              :     /// The other way to transition into this state is from `Stopping` state
      93              :     /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
      94              :     /// if the cleanup future executed by `remove_tenant_from_memory()` fails.
      95              :     Broken { reason: String, backtrace: String },
      96              : }
      97              : 
      98              : impl TenantState {
      99              :     pub fn attachment_status(&self) -> TenantAttachmentStatus {
     100              :         use TenantAttachmentStatus::*;
     101              : 
     102              :         // Below TenantState::Activating is used as "transient" or "transparent" state for
     103              :         // attachment_status determining.
     104            0 :         match self {
     105              :             // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
     106              :             // So, technically, we can return Attached here.
     107              :             // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
     108              :             // But, our attach task might still be fetching the remote timelines, etc.
     109              :             // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
     110           23 :             Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
     111              :             // tenant mgr startup distinguishes attaching from loading via marker file.
     112              :             // If it's loading, there is no attach marker file, i.e., attach had finished in the past.
     113           57 :             Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
     114              :             // We only reach Active after successful load / attach.
     115              :             // So, call atttachment status Attached.
     116          181 :             Self::Active => Attached,
     117              :             // If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
     118              :             // However, it also becomes Broken if the regular load fails.
     119              :             // From Console's perspective there's no practical difference
     120              :             // because attachment_status is polled by console only during attach operation execution.
     121          115 :             Self::Broken { reason, .. } => Failed {
     122          115 :                 reason: reason.to_owned(),
     123          115 :             },
     124              :             // Why is Stopping a Maybe case? Because, during pageserver shutdown,
     125              :             // we set the Stopping state irrespective of whether the tenant
     126              :             // has finished attaching or not.
     127          173 :             Self::Stopping { .. } => Maybe,
     128              :         }
     129          549 :     }
     130              : 
     131           85 :     pub fn broken_from_reason(reason: String) -> Self {
     132           85 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
     133           85 :         Self::Broken {
     134           85 :             reason,
     135           85 :             backtrace: backtrace_str,
     136           85 :         }
     137           85 :     }
     138              : }
     139              : 
     140              : impl std::fmt::Debug for TenantState {
     141              :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     142          135 :         match self {
     143          135 :             Self::Broken { reason, backtrace } if !reason.is_empty() => {
     144          135 :                 write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
     145              :             }
     146           18 :             _ => write!(f, "{self}"),
     147              :         }
     148          153 :     }
     149              : }
     150              : 
     151              : /// The only [`TenantState`] variants we could be `TenantState::Activating` from.
     152          247 : #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
     153              : pub enum ActivatingFrom {
     154              :     /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
     155              :     Loading,
     156              :     /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
     157              :     Attaching,
     158              : }
     159              : 
     160              : /// A state of a timeline in pageserver's memory.
     161      3085855 : #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
     162              : pub enum TimelineState {
     163              :     /// The timeline is recognized by the pageserver but is not yet operational.
     164              :     /// In particular, the walreceiver connection loop is not running for this timeline.
     165              :     /// It will eventually transition to state Active or Broken.
     166              :     Loading,
     167              :     /// The timeline is fully operational.
     168              :     /// It can be queried, and the walreceiver connection loop is running.
     169              :     Active,
     170              :     /// The timeline was previously Loading or Active but is shutting down.
     171              :     /// It cannot transition back into any other state.
     172              :     Stopping,
     173              :     /// The timeline is broken and not operational (previous states: Loading or Active).
     174              :     Broken { reason: String, backtrace: String },
     175              : }
     176              : 
     177              : #[serde_as]
     178        10884 : #[derive(Serialize, Deserialize)]
     179              : pub struct TimelineCreateRequest {
     180              :     #[serde_as(as = "DisplayFromStr")]
     181              :     pub new_timeline_id: TimelineId,
     182              :     #[serde(default)]
     183              :     #[serde_as(as = "Option<DisplayFromStr>")]
     184              :     pub ancestor_timeline_id: Option<TimelineId>,
     185              :     #[serde(default)]
     186              :     #[serde_as(as = "Option<DisplayFromStr>")]
     187              :     pub ancestor_start_lsn: Option<Lsn>,
     188              :     pub pg_version: Option<u32>,
     189              : }
     190              : 
     191              : #[serde_as]
     192        10417 : #[derive(Serialize, Deserialize, Debug)]
     193              : #[serde(deny_unknown_fields)]
     194              : pub struct TenantCreateRequest {
     195              :     #[serde_as(as = "DisplayFromStr")]
     196              :     pub new_tenant_id: TenantId,
     197              :     #[serde(flatten)]
     198              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
     199              : }
     200              : 
     201              : impl std::ops::Deref for TenantCreateRequest {
     202              :     type Target = TenantConfig;
     203              : 
     204            0 :     fn deref(&self) -> &Self::Target {
     205            0 :         &self.config
     206            0 :     }
     207              : }
     208              : 
     209        17168 : #[derive(Serialize, Deserialize, Debug, Default)]
     210              : pub struct TenantConfig {
     211              :     pub checkpoint_distance: Option<u64>,
     212              :     pub checkpoint_timeout: Option<String>,
     213              :     pub compaction_target_size: Option<u64>,
     214              :     pub compaction_period: Option<String>,
     215              :     pub compaction_threshold: Option<usize>,
     216              :     pub gc_horizon: Option<u64>,
     217              :     pub gc_period: Option<String>,
     218              :     pub image_creation_threshold: Option<usize>,
     219              :     pub pitr_interval: Option<String>,
     220              :     pub walreceiver_connect_timeout: Option<String>,
     221              :     pub lagging_wal_timeout: Option<String>,
     222              :     pub max_lsn_wal_lag: Option<NonZeroU64>,
     223              :     pub trace_read_requests: Option<bool>,
     224              :     // We defer the parsing of the eviction_policy field to the request handler.
     225              :     // Otherwise we'd have to move the types for eviction policy into this package.
     226              :     // We might do that once the eviction feature has stabilizied.
     227              :     // For now, this field is not even documented in the openapi_spec.yml.
     228              :     pub eviction_policy: Option<serde_json::Value>,
     229              :     pub min_resident_size_override: Option<u64>,
     230              :     pub evictions_low_residence_duration_metric_threshold: Option<String>,
     231              :     pub gc_feedback: Option<bool>,
     232              : }
     233              : 
     234              : #[serde_as]
     235          479 : #[derive(Serialize, Deserialize)]
     236              : #[serde(transparent)]
     237              : pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
     238              : 
     239          579 : #[derive(Serialize)]
     240              : pub struct StatusResponse {
     241              :     pub id: NodeId,
     242              : }
     243              : 
     244              : impl TenantCreateRequest {
     245            0 :     pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
     246            0 :         TenantCreateRequest {
     247            0 :             new_tenant_id,
     248            0 :             config: TenantConfig::default(),
     249            0 :         }
     250            0 :     }
     251              : }
     252              : 
     253              : #[serde_as]
     254          420 : #[derive(Serialize, Deserialize, Debug)]
     255              : #[serde(deny_unknown_fields)]
     256              : pub struct TenantConfigRequest {
     257              :     #[serde_as(as = "DisplayFromStr")]
     258              :     pub tenant_id: TenantId,
     259              :     #[serde(flatten)]
     260              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
     261              : }
     262              : 
     263              : impl std::ops::Deref for TenantConfigRequest {
     264              :     type Target = TenantConfig;
     265              : 
     266            0 :     fn deref(&self) -> &Self::Target {
     267            0 :         &self.config
     268            0 :     }
     269              : }
     270              : 
     271              : impl TenantConfigRequest {
     272            0 :     pub fn new(tenant_id: TenantId) -> TenantConfigRequest {
     273            0 :         let config = TenantConfig {
     274            0 :             checkpoint_distance: None,
     275            0 :             checkpoint_timeout: None,
     276            0 :             compaction_target_size: None,
     277            0 :             compaction_period: None,
     278            0 :             compaction_threshold: None,
     279            0 :             gc_horizon: None,
     280            0 :             gc_period: None,
     281            0 :             image_creation_threshold: None,
     282            0 :             pitr_interval: None,
     283            0 :             walreceiver_connect_timeout: None,
     284            0 :             lagging_wal_timeout: None,
     285            0 :             max_lsn_wal_lag: None,
     286            0 :             trace_read_requests: None,
     287            0 :             eviction_policy: None,
     288            0 :             min_resident_size_override: None,
     289            0 :             evictions_low_residence_duration_metric_threshold: None,
     290            0 :             gc_feedback: None,
     291            0 :         };
     292            0 :         TenantConfigRequest { tenant_id, config }
     293            0 :     }
     294              : }
     295              : 
     296          150 : #[derive(Debug, Serialize, Deserialize)]
     297              : pub struct TenantAttachRequest {
     298              :     pub config: TenantAttachConfig,
     299              : }
     300              : 
     301              : /// Newtype to enforce deny_unknown_fields on TenantConfig for
     302              : /// its usage inside `TenantAttachRequest`.
     303           94 : #[derive(Debug, Serialize, Deserialize)]
     304              : #[serde(deny_unknown_fields)]
     305              : pub struct TenantAttachConfig {
     306              :     #[serde(flatten)]
     307              :     allowing_unknown_fields: TenantConfig,
     308              : }
     309              : 
     310              : impl std::ops::Deref for TenantAttachConfig {
     311              :     type Target = TenantConfig;
     312              : 
     313           46 :     fn deref(&self) -> &Self::Target {
     314           46 :         &self.allowing_unknown_fields
     315           46 :     }
     316              : }
     317              : 
     318              : /// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
     319          666 : #[derive(Serialize, Deserialize, Clone)]
     320              : #[serde(tag = "slug", content = "data", rename_all = "snake_case")]
     321              : pub enum TenantAttachmentStatus {
     322              :     Maybe,
     323              :     Attached,
     324              :     Failed { reason: String },
     325              : }
     326              : 
     327              : #[serde_as]
     328          551 : #[derive(Serialize, Deserialize, Clone)]
     329              : pub struct TenantInfo {
     330              :     #[serde_as(as = "DisplayFromStr")]
     331              :     pub id: TenantId,
     332              :     // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
     333              :     pub state: TenantState,
     334              :     /// Sum of the size of all layer files.
     335              :     /// If a layer is present in both local FS and S3, it counts only once.
     336              :     pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
     337              :     pub attachment_status: TenantAttachmentStatus,
     338              : }
     339              : 
     340              : /// This represents the output of the "timeline_detail" and "timeline_list" API calls.
     341              : #[serde_as]
     342        43005 : #[derive(Debug, Serialize, Deserialize, Clone)]
     343              : pub struct TimelineInfo {
     344              :     #[serde_as(as = "DisplayFromStr")]
     345              :     pub tenant_id: TenantId,
     346              :     #[serde_as(as = "DisplayFromStr")]
     347              :     pub timeline_id: TimelineId,
     348              : 
     349              :     #[serde_as(as = "Option<DisplayFromStr>")]
     350              :     pub ancestor_timeline_id: Option<TimelineId>,
     351              :     #[serde_as(as = "Option<DisplayFromStr>")]
     352              :     pub ancestor_lsn: Option<Lsn>,
     353              :     #[serde_as(as = "DisplayFromStr")]
     354              :     pub last_record_lsn: Lsn,
     355              :     #[serde_as(as = "Option<DisplayFromStr>")]
     356              :     pub prev_record_lsn: Option<Lsn>,
     357              :     #[serde_as(as = "DisplayFromStr")]
     358              :     pub latest_gc_cutoff_lsn: Lsn,
     359              :     #[serde_as(as = "DisplayFromStr")]
     360              :     pub disk_consistent_lsn: Lsn,
     361              :     #[serde_as(as = "DisplayFromStr")]
     362              :     pub remote_consistent_lsn: Lsn,
     363              :     pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
     364              :     /// Sum of the size of all layer files.
     365              :     /// If a layer is present in both local FS and S3, it counts only once.
     366              :     pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
     367              :     pub current_logical_size_non_incremental: Option<u64>,
     368              : 
     369              :     pub timeline_dir_layer_file_size_sum: Option<u64>,
     370              : 
     371              :     pub wal_source_connstr: Option<String>,
     372              :     #[serde_as(as = "Option<DisplayFromStr>")]
     373              :     pub last_received_msg_lsn: Option<Lsn>,
     374              :     /// the timestamp (in microseconds) of the last received message
     375              :     pub last_received_msg_ts: Option<u128>,
     376              :     pub pg_version: u32,
     377              : 
     378              :     pub state: TimelineState,
     379              : }
     380              : 
     381          101 : #[derive(Debug, Clone, Serialize)]
     382              : pub struct LayerMapInfo {
     383              :     pub in_memory_layers: Vec<InMemoryLayerInfo>,
     384              :     pub historic_layers: Vec<HistoricLayerInfo>,
     385              : }
     386              : 
     387     87830160 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, enum_map::Enum)]
     388              : #[repr(usize)]
     389              : pub enum LayerAccessKind {
     390              :     GetValueReconstructData,
     391              :     Iter,
     392              :     KeyIter,
     393              :     Dump,
     394              : }
     395              : 
     396         8585 : #[derive(Debug, Clone, Serialize, Deserialize)]
     397              : pub struct LayerAccessStatFullDetails {
     398              :     pub when_millis_since_epoch: u64,
     399              :     pub task_kind: &'static str,
     400              :     pub access_kind: LayerAccessKind,
     401              : }
     402              : 
     403              : /// An event that impacts the layer's residence status.
     404              : #[serde_as]
     405         7922 : #[derive(Debug, Clone, Serialize, Deserialize)]
     406              : pub struct LayerResidenceEvent {
     407              :     /// The time when the event occurred.
     408              :     /// NB: this timestamp is captured while the residence status changes.
     409              :     /// So, it might be behind/ahead of the actual residence change by a short amount of time.
     410              :     ///
     411              :     #[serde(rename = "timestamp_millis_since_epoch")]
     412              :     #[serde_as(as = "serde_with::TimestampMilliSeconds")]
     413              :     pub timestamp: SystemTime,
     414              :     /// The new residence status of the layer.
     415              :     pub status: LayerResidenceStatus,
     416              :     /// The reason why we had to record this event.
     417              :     pub reason: LayerResidenceEventReason,
     418              : }
     419              : 
     420              : /// The reason for recording a given [`LayerResidenceEvent`].
     421         7922 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     422              : pub enum LayerResidenceEventReason {
     423              :     /// The layer map is being populated, e.g. during timeline load or attach.
     424              :     /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`].
     425              :     /// We need to record such events because there is no persistent storage for the events.
     426              :     ///
     427              :     // https://github.com/rust-lang/rust/issues/74481
     428              :     /// [`RemoteLayer`]: ../../tenant/storage_layer/struct.RemoteLayer.html
     429              :     /// [`reconcile_with_remote`]: ../../tenant/struct.Timeline.html#method.reconcile_with_remote
     430              :     LayerLoad,
     431              :     /// We just created the layer (e.g., freeze_and_flush or compaction).
     432              :     /// Such layers are always [`LayerResidenceStatus::Resident`].
     433              :     LayerCreate,
     434              :     /// We on-demand downloaded or evicted the given layer.
     435              :     ResidenceChange,
     436              : }
     437              : 
     438              : /// The residence status of the layer, after the given [`LayerResidenceEvent`].
     439         7922 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     440              : pub enum LayerResidenceStatus {
     441              :     /// Residence status for a layer file that exists locally.
     442              :     /// It may also exist on the remote, we don't care here.
     443              :     Resident,
     444              :     /// Residence status for a layer file that only exists on the remote.
     445              :     Evicted,
     446              : }
     447              : 
     448              : impl LayerResidenceEvent {
     449        52372 :     pub fn new(status: LayerResidenceStatus, reason: LayerResidenceEventReason) -> Self {
     450        52372 :         Self {
     451        52372 :             status,
     452        52372 :             reason,
     453        52372 :             timestamp: SystemTime::now(),
     454        52372 :         }
     455        52372 :     }
     456              : }
     457              : 
     458         2273 : #[derive(Debug, Clone, Serialize)]
     459              : pub struct LayerAccessStats {
     460              :     pub access_count_by_access_kind: HashMap<LayerAccessKind, u64>,
     461              :     pub task_kind_access_flag: Vec<&'static str>,
     462              :     pub first: Option<LayerAccessStatFullDetails>,
     463              :     pub accesses_history: HistoryBufferWithDropCounter<LayerAccessStatFullDetails, 16>,
     464              :     pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
     465              : }
     466              : 
     467              : #[serde_as]
     468           23 : #[derive(Debug, Clone, Serialize)]
     469              : #[serde(tag = "kind")]
     470              : pub enum InMemoryLayerInfo {
     471              :     Open {
     472              :         #[serde_as(as = "DisplayFromStr")]
     473              :         lsn_start: Lsn,
     474              :     },
     475              :     Frozen {
     476              :         #[serde_as(as = "DisplayFromStr")]
     477              :         lsn_start: Lsn,
     478              :         #[serde_as(as = "DisplayFromStr")]
     479              :         lsn_end: Lsn,
     480              :     },
     481              : }
     482              : 
     483              : #[serde_as]
     484         2273 : #[derive(Debug, Clone, Serialize)]
     485              : #[serde(tag = "kind")]
     486              : pub enum HistoricLayerInfo {
     487              :     Delta {
     488              :         layer_file_name: String,
     489              :         layer_file_size: u64,
     490              : 
     491              :         #[serde_as(as = "DisplayFromStr")]
     492              :         lsn_start: Lsn,
     493              :         #[serde_as(as = "DisplayFromStr")]
     494              :         lsn_end: Lsn,
     495              :         remote: bool,
     496              :         access_stats: LayerAccessStats,
     497              :     },
     498              :     Image {
     499              :         layer_file_name: String,
     500              :         layer_file_size: u64,
     501              : 
     502              :         #[serde_as(as = "DisplayFromStr")]
     503              :         lsn_start: Lsn,
     504              :         remote: bool,
     505              :         access_stats: LayerAccessStats,
     506              :     },
     507              : }
     508              : 
     509            9 : #[derive(Debug, Serialize, Deserialize)]
     510              : pub struct DownloadRemoteLayersTaskSpawnRequest {
     511              :     pub max_concurrent_downloads: NonZeroUsize,
     512              : }
     513              : 
     514           24 : #[derive(Debug, Serialize, Deserialize, Clone)]
     515              : pub struct DownloadRemoteLayersTaskInfo {
     516              :     pub task_id: String,
     517              :     pub state: DownloadRemoteLayersTaskState,
     518              :     pub total_layer_count: u64,         // stable once `completed`
     519              :     pub successful_download_count: u64, // stable once `completed`
     520              :     pub failed_download_count: u64,     // stable once `completed`
     521              : }
     522              : 
     523           24 : #[derive(Debug, Serialize, Deserialize, Clone)]
     524              : pub enum DownloadRemoteLayersTaskState {
     525              :     Running,
     526              :     Completed,
     527              :     ShutDown,
     528              : }
     529              : 
     530              : pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
     531              : 
     532              : /// Information for configuring a single fail point
     533         1255 : #[derive(Debug, Serialize, Deserialize)]
     534              : pub struct FailpointConfig {
     535              :     /// Name of the fail point
     536              :     pub name: String,
     537              :     /// List of actions to take, using the format described in `fail::cfg`
     538              :     ///
     539              :     /// We also support `actions = "exit"` to cause the fail point to immediately exit.
     540              :     pub actions: String,
     541              : }
     542              : 
     543         1515 : #[derive(Debug, Serialize, Deserialize)]
     544              : pub struct TimelineGcRequest {
     545              :     pub gc_horizon: Option<u64>,
     546              : }
     547              : 
     548              : // Wrapped in libpq CopyData
     549            4 : #[derive(PartialEq, Eq, Debug)]
     550              : pub enum PagestreamFeMessage {
     551              :     Exists(PagestreamExistsRequest),
     552              :     Nblocks(PagestreamNblocksRequest),
     553              :     GetPage(PagestreamGetPageRequest),
     554              :     DbSize(PagestreamDbSizeRequest),
     555              : }
     556              : 
     557              : // Wrapped in libpq CopyData
     558              : pub enum PagestreamBeMessage {
     559              :     Exists(PagestreamExistsResponse),
     560              :     Nblocks(PagestreamNblocksResponse),
     561              :     GetPage(PagestreamGetPageResponse),
     562              :     Error(PagestreamErrorResponse),
     563              :     DbSize(PagestreamDbSizeResponse),
     564              : }
     565              : 
     566            1 : #[derive(Debug, PartialEq, Eq)]
     567              : pub struct PagestreamExistsRequest {
     568              :     pub latest: bool,
     569              :     pub lsn: Lsn,
     570              :     pub rel: RelTag,
     571              : }
     572              : 
     573            1 : #[derive(Debug, PartialEq, Eq)]
     574              : pub struct PagestreamNblocksRequest {
     575              :     pub latest: bool,
     576              :     pub lsn: Lsn,
     577              :     pub rel: RelTag,
     578              : }
     579              : 
     580            1 : #[derive(Debug, PartialEq, Eq)]
     581              : pub struct PagestreamGetPageRequest {
     582              :     pub latest: bool,
     583              :     pub lsn: Lsn,
     584              :     pub rel: RelTag,
     585              :     pub blkno: u32,
     586              : }
     587              : 
     588            1 : #[derive(Debug, PartialEq, Eq)]
     589              : pub struct PagestreamDbSizeRequest {
     590              :     pub latest: bool,
     591              :     pub lsn: Lsn,
     592              :     pub dbnode: u32,
     593              : }
     594              : 
     595            0 : #[derive(Debug)]
     596              : pub struct PagestreamExistsResponse {
     597              :     pub exists: bool,
     598              : }
     599              : 
     600            0 : #[derive(Debug)]
     601              : pub struct PagestreamNblocksResponse {
     602              :     pub n_blocks: u32,
     603              : }
     604              : 
     605            0 : #[derive(Debug)]
     606              : pub struct PagestreamGetPageResponse {
     607              :     pub page: Bytes,
     608              : }
     609              : 
     610            0 : #[derive(Debug)]
     611              : pub struct PagestreamErrorResponse {
     612              :     pub message: String,
     613              : }
     614              : 
     615            0 : #[derive(Debug)]
     616              : pub struct PagestreamDbSizeResponse {
     617              :     pub db_size: i64,
     618              : }
     619              : 
     620              : impl PagestreamFeMessage {
     621            4 :     pub fn serialize(&self) -> Bytes {
     622            4 :         let mut bytes = BytesMut::new();
     623            4 : 
     624            4 :         match self {
     625            1 :             Self::Exists(req) => {
     626            1 :                 bytes.put_u8(0);
     627            1 :                 bytes.put_u8(u8::from(req.latest));
     628            1 :                 bytes.put_u64(req.lsn.0);
     629            1 :                 bytes.put_u32(req.rel.spcnode);
     630            1 :                 bytes.put_u32(req.rel.dbnode);
     631            1 :                 bytes.put_u32(req.rel.relnode);
     632            1 :                 bytes.put_u8(req.rel.forknum);
     633            1 :             }
     634              : 
     635            1 :             Self::Nblocks(req) => {
     636            1 :                 bytes.put_u8(1);
     637            1 :                 bytes.put_u8(u8::from(req.latest));
     638            1 :                 bytes.put_u64(req.lsn.0);
     639            1 :                 bytes.put_u32(req.rel.spcnode);
     640            1 :                 bytes.put_u32(req.rel.dbnode);
     641            1 :                 bytes.put_u32(req.rel.relnode);
     642            1 :                 bytes.put_u8(req.rel.forknum);
     643            1 :             }
     644              : 
     645            1 :             Self::GetPage(req) => {
     646            1 :                 bytes.put_u8(2);
     647            1 :                 bytes.put_u8(u8::from(req.latest));
     648            1 :                 bytes.put_u64(req.lsn.0);
     649            1 :                 bytes.put_u32(req.rel.spcnode);
     650            1 :                 bytes.put_u32(req.rel.dbnode);
     651            1 :                 bytes.put_u32(req.rel.relnode);
     652            1 :                 bytes.put_u8(req.rel.forknum);
     653            1 :                 bytes.put_u32(req.blkno);
     654            1 :             }
     655              : 
     656            1 :             Self::DbSize(req) => {
     657            1 :                 bytes.put_u8(3);
     658            1 :                 bytes.put_u8(u8::from(req.latest));
     659            1 :                 bytes.put_u64(req.lsn.0);
     660            1 :                 bytes.put_u32(req.dbnode);
     661            1 :             }
     662              :         }
     663              : 
     664            4 :         bytes.into()
     665            4 :     }
     666              : 
     667      4599619 :     pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
     668              :         // TODO these gets can fail
     669              : 
     670              :         // these correspond to the NeonMessageTag enum in pagestore_client.h
     671              :         //
     672              :         // TODO: consider using protobuf or serde bincode for less error prone
     673              :         // serialization.
     674      4599619 :         let msg_tag = body.read_u8()?;
     675      4599619 :         match msg_tag {
     676              :             0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
     677        47956 :                 latest: body.read_u8()? != 0,
     678        47956 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     679              :                 rel: RelTag {
     680        47956 :                     spcnode: body.read_u32::<BigEndian>()?,
     681        47956 :                     dbnode: body.read_u32::<BigEndian>()?,
     682        47956 :                     relnode: body.read_u32::<BigEndian>()?,
     683        47956 :                     forknum: body.read_u8()?,
     684              :                 },
     685              :             })),
     686              :             1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     687        18196 :                 latest: body.read_u8()? != 0,
     688        18196 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     689              :                 rel: RelTag {
     690        18196 :                     spcnode: body.read_u32::<BigEndian>()?,
     691        18196 :                     dbnode: body.read_u32::<BigEndian>()?,
     692        18196 :                     relnode: body.read_u32::<BigEndian>()?,
     693        18196 :                     forknum: body.read_u8()?,
     694              :                 },
     695              :             })),
     696              :             2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     697      4533461 :                 latest: body.read_u8()? != 0,
     698      4533461 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     699              :                 rel: RelTag {
     700      4533461 :                     spcnode: body.read_u32::<BigEndian>()?,
     701      4533461 :                     dbnode: body.read_u32::<BigEndian>()?,
     702      4533461 :                     relnode: body.read_u32::<BigEndian>()?,
     703      4533461 :                     forknum: body.read_u8()?,
     704              :                 },
     705      4533461 :                 blkno: body.read_u32::<BigEndian>()?,
     706              :             })),
     707              :             3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     708            6 :                 latest: body.read_u8()? != 0,
     709            6 :                 lsn: Lsn::from(body.read_u64::<BigEndian>()?),
     710            6 :                 dbnode: body.read_u32::<BigEndian>()?,
     711              :             })),
     712            0 :             _ => bail!("unknown smgr message tag: {:?}", msg_tag),
     713              :         }
     714      4599619 :     }
     715              : }
     716              : 
     717              : impl PagestreamBeMessage {
     718      4599585 :     pub fn serialize(&self) -> Bytes {
     719      4599585 :         let mut bytes = BytesMut::new();
     720      4599585 : 
     721      4599585 :         match self {
     722        47955 :             Self::Exists(resp) => {
     723        47955 :                 bytes.put_u8(100); /* tag from pagestore_client.h */
     724        47955 :                 bytes.put_u8(resp.exists as u8);
     725        47955 :             }
     726              : 
     727        18195 :             Self::Nblocks(resp) => {
     728        18195 :                 bytes.put_u8(101); /* tag from pagestore_client.h */
     729        18195 :                 bytes.put_u32(resp.n_blocks);
     730        18195 :             }
     731              : 
     732      4533429 :             Self::GetPage(resp) => {
     733      4533429 :                 bytes.put_u8(102); /* tag from pagestore_client.h */
     734      4533429 :                 bytes.put(&resp.page[..]);
     735      4533429 :             }
     736              : 
     737            1 :             Self::Error(resp) => {
     738            1 :                 bytes.put_u8(103); /* tag from pagestore_client.h */
     739            1 :                 bytes.put(resp.message.as_bytes());
     740            1 :                 bytes.put_u8(0); // null terminator
     741            1 :             }
     742            5 :             Self::DbSize(resp) => {
     743            5 :                 bytes.put_u8(104); /* tag from pagestore_client.h */
     744            5 :                 bytes.put_i64(resp.db_size);
     745            5 :             }
     746              :         }
     747              : 
     748      4599585 :         bytes.into()
     749      4599585 :     }
     750              : }
     751              : 
     752              : #[cfg(test)]
     753              : mod tests {
     754              :     use bytes::Buf;
     755              :     use serde_json::json;
     756              : 
     757              :     use super::*;
     758              : 
     759            1 :     #[test]
     760            1 :     fn test_pagestream() {
     761            1 :         // Test serialization/deserialization of PagestreamFeMessage
     762            1 :         let messages = vec![
     763            1 :             PagestreamFeMessage::Exists(PagestreamExistsRequest {
     764            1 :                 latest: true,
     765            1 :                 lsn: Lsn(4),
     766            1 :                 rel: RelTag {
     767            1 :                     forknum: 1,
     768            1 :                     spcnode: 2,
     769            1 :                     dbnode: 3,
     770            1 :                     relnode: 4,
     771            1 :                 },
     772            1 :             }),
     773            1 :             PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     774            1 :                 latest: false,
     775            1 :                 lsn: Lsn(4),
     776            1 :                 rel: RelTag {
     777            1 :                     forknum: 1,
     778            1 :                     spcnode: 2,
     779            1 :                     dbnode: 3,
     780            1 :                     relnode: 4,
     781            1 :                 },
     782            1 :             }),
     783            1 :             PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     784            1 :                 latest: true,
     785            1 :                 lsn: Lsn(4),
     786            1 :                 rel: RelTag {
     787            1 :                     forknum: 1,
     788            1 :                     spcnode: 2,
     789            1 :                     dbnode: 3,
     790            1 :                     relnode: 4,
     791            1 :                 },
     792            1 :                 blkno: 7,
     793            1 :             }),
     794            1 :             PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     795            1 :                 latest: true,
     796            1 :                 lsn: Lsn(4),
     797            1 :                 dbnode: 7,
     798            1 :             }),
     799            1 :         ];
     800            5 :         for msg in messages {
     801            4 :             let bytes = msg.serialize();
     802            4 :             let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
     803            4 :             assert!(msg == reconstructed);
     804              :         }
     805            1 :     }
     806              : 
     807            1 :     #[test]
     808            1 :     fn test_tenantinfo_serde() {
     809            1 :         // Test serialization/deserialization of TenantInfo
     810            1 :         let original_active = TenantInfo {
     811            1 :             id: TenantId::generate(),
     812            1 :             state: TenantState::Active,
     813            1 :             current_physical_size: Some(42),
     814            1 :             attachment_status: TenantAttachmentStatus::Attached,
     815            1 :         };
     816            1 :         let expected_active = json!({
     817            1 :             "id": original_active.id.to_string(),
     818            1 :             "state": {
     819            1 :                 "slug": "Active",
     820            1 :             },
     821            1 :             "current_physical_size": 42,
     822            1 :             "attachment_status": {
     823            1 :                 "slug":"attached",
     824            1 :             }
     825            1 :         });
     826            1 : 
     827            1 :         let original_broken = TenantInfo {
     828            1 :             id: TenantId::generate(),
     829            1 :             state: TenantState::Broken {
     830            1 :                 reason: "reason".into(),
     831            1 :                 backtrace: "backtrace info".into(),
     832            1 :             },
     833            1 :             current_physical_size: Some(42),
     834            1 :             attachment_status: TenantAttachmentStatus::Attached,
     835            1 :         };
     836            1 :         let expected_broken = json!({
     837            1 :             "id": original_broken.id.to_string(),
     838            1 :             "state": {
     839            1 :                 "slug": "Broken",
     840            1 :                 "data": {
     841            1 :                     "backtrace": "backtrace info",
     842            1 :                     "reason": "reason",
     843            1 :                 }
     844            1 :             },
     845            1 :             "current_physical_size": 42,
     846            1 :             "attachment_status": {
     847            1 :                 "slug":"attached",
     848            1 :             }
     849            1 :         });
     850            1 : 
     851            1 :         assert_eq!(
     852            1 :             serde_json::to_value(&original_active).unwrap(),
     853            1 :             expected_active
     854            1 :         );
     855              : 
     856            1 :         assert_eq!(
     857            1 :             serde_json::to_value(&original_broken).unwrap(),
     858            1 :             expected_broken
     859            1 :         );
     860            1 :         assert!(format!("{:?}", &original_broken.state).contains("reason"));
     861            1 :         assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
     862            1 :     }
     863              : 
     864            1 :     #[test]
     865            1 :     fn test_reject_unknown_field() {
     866            1 :         let id = TenantId::generate();
     867            1 :         let create_request = json!({
     868            1 :             "new_tenant_id": id.to_string(),
     869            1 :             "unknown_field": "unknown_value".to_string(),
     870            1 :         });
     871            1 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     872            1 :         assert!(
     873            1 :             err.to_string().contains("unknown field `unknown_field`"),
     874            0 :             "expect unknown field `unknown_field` error, got: {}",
     875              :             err
     876              :         );
     877              : 
     878            1 :         let id = TenantId::generate();
     879            1 :         let config_request = json!({
     880            1 :             "tenant_id": id.to_string(),
     881            1 :             "unknown_field": "unknown_value".to_string(),
     882            1 :         });
     883            1 :         let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
     884            1 :         assert!(
     885            1 :             err.to_string().contains("unknown field `unknown_field`"),
     886            0 :             "expect unknown field `unknown_field` error, got: {}",
     887              :             err
     888              :         );
     889              : 
     890            1 :         let attach_request = json!({
     891            1 :             "config": {
     892            1 :                 "unknown_field": "unknown_value".to_string(),
     893            1 :             },
     894            1 :         });
     895            1 :         let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
     896            1 :         assert!(
     897            1 :             err.to_string().contains("unknown field `unknown_field`"),
     898            0 :             "expect unknown field `unknown_field` error, got: {}",
     899              :             err
     900              :         );
     901            1 :     }
     902              : 
     903            1 :     #[test]
     904            1 :     fn tenantstatus_activating_serde() {
     905            1 :         let states = [
     906            1 :             TenantState::Activating(ActivatingFrom::Loading),
     907            1 :             TenantState::Activating(ActivatingFrom::Attaching),
     908            1 :         ];
     909            1 :         let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
     910            1 : 
     911            1 :         let actual = serde_json::to_string(&states).unwrap();
     912            1 : 
     913            1 :         assert_eq!(actual, expected);
     914              : 
     915            1 :         let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
     916            1 : 
     917            1 :         assert_eq!(states.as_slice(), &parsed);
     918            1 :     }
     919              : 
     920            1 :     #[test]
     921            1 :     fn tenantstatus_activating_strum() {
     922            1 :         // tests added, because we use these for metrics
     923            1 :         let examples = [
     924            1 :             (line!(), TenantState::Loading, "Loading"),
     925            1 :             (line!(), TenantState::Attaching, "Attaching"),
     926            1 :             (
     927            1 :                 line!(),
     928            1 :                 TenantState::Activating(ActivatingFrom::Loading),
     929            1 :                 "Activating",
     930            1 :             ),
     931            1 :             (
     932            1 :                 line!(),
     933            1 :                 TenantState::Activating(ActivatingFrom::Attaching),
     934            1 :                 "Activating",
     935            1 :             ),
     936            1 :             (line!(), TenantState::Active, "Active"),
     937            1 :             (
     938            1 :                 line!(),
     939            1 :                 TenantState::Stopping {
     940            1 :                     progress: utils::completion::Barrier::default(),
     941            1 :                 },
     942            1 :                 "Stopping",
     943            1 :             ),
     944            1 :             (
     945            1 :                 line!(),
     946            1 :                 TenantState::Broken {
     947            1 :                     reason: "Example".into(),
     948            1 :                     backtrace: "Looooong backtrace".into(),
     949            1 :                 },
     950            1 :                 "Broken",
     951            1 :             ),
     952            1 :         ];
     953              : 
     954            8 :         for (line, rendered, expected) in examples {
     955            7 :             let actual: &'static str = rendered.into();
     956            7 :             assert_eq!(actual, expected, "example on {line}");
     957              :         }
     958            1 :     }
     959              : }
        

Generated by: LCOV version 2.1-beta