LCOV - code coverage report
Current view: top level - pageserver/page_api/src - model.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 368 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 66 0

            Line data    Source code
       1              : //! Structs representing the canonical page service API.
       2              : //!
       3              : //! These mirror the autogenerated Protobuf types. The differences are:
       4              : //!
       5              : //! - Types that are in fact required by the API are not Options. The protobuf "required"
       6              : //!   attribute is deprecated and 'prost' marks a lot of members as optional because of that.
       7              : //!   (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
       8              : //!
       9              : //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
      10              : //!
      11              : //! - Validate protocol invariants, via try_from() and try_into().
      12              : //!
      13              : //! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain
      14              : //! types. This is where it matters -- the Protobuf types are less strict than the domain types, and
      15              : //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
      16              : //! stream combinators without dealing with errors, and avoids validating the same message twice.
      17              : 
      18              : use std::fmt::Display;
      19              : use std::time::{Duration, SystemTime, UNIX_EPOCH};
      20              : 
      21              : use bytes::Bytes;
      22              : use postgres_ffi_types::Oid;
      23              : // TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid
      24              : // pulling in all of their other crate dependencies when building the client.
      25              : use utils::lsn::Lsn;
      26              : 
      27              : use crate::proto;
      28              : 
      29              : /// A protocol error. Typically returned via try_from() or try_into().
      30              : #[derive(thiserror::Error, Clone, Debug)]
      31              : pub enum ProtocolError {
      32              :     #[error("field '{0}' has invalid value '{1}'")]
      33              :     Invalid(&'static str, String),
      34              :     #[error("required field '{0}' is missing")]
      35              :     Missing(&'static str),
      36              : }
      37              : 
      38              : impl ProtocolError {
      39              :     /// Helper to generate a new ProtocolError::Invalid for the given field and value.
      40            0 :     pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
      41            0 :         Self::Invalid(field, format!("{value:?}"))
      42            0 :     }
      43              : }
      44              : 
      45              : impl From<ProtocolError> for tonic::Status {
      46            0 :     fn from(err: ProtocolError) -> Self {
      47            0 :         tonic::Status::invalid_argument(format!("{err}"))
      48            0 :     }
      49              : }
      50              : 
      51              : /// The LSN a request should read at.
      52              : #[derive(Clone, Copy, Debug, Default)]
      53              : pub struct ReadLsn {
      54              :     /// The request's read LSN.
      55              :     pub request_lsn: Lsn,
      56              :     /// If given, the caller guarantees that the page has not been modified since this LSN. Must be
      57              :     /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
      58              :     /// without waiting for the request LSN to arrive. If not given, the request will read at the
      59              :     /// request_lsn and wait for it to arrive if necessary. Valid for all request types.
      60              :     ///
      61              :     /// It is undefined behaviour to make a request such that the page was, in fact, modified
      62              :     /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
      63              :     /// error, or it might return the old page version or the new page version. Setting
      64              :     /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
      65              :     /// waiting.
      66              :     pub not_modified_since_lsn: Option<Lsn>,
      67              : }
      68              : 
      69              : impl Display for ReadLsn {
      70            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      71            0 :         let req_lsn = self.request_lsn;
      72            0 :         if let Some(mod_lsn) = self.not_modified_since_lsn {
      73            0 :             write!(f, "{req_lsn}>={mod_lsn}")
      74              :         } else {
      75            0 :             req_lsn.fmt(f)
      76              :         }
      77            0 :     }
      78              : }
      79              : 
      80              : impl TryFrom<proto::ReadLsn> for ReadLsn {
      81              :     type Error = ProtocolError;
      82              : 
      83            0 :     fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
      84            0 :         if pb.request_lsn == 0 {
      85            0 :             return Err(ProtocolError::invalid("request_lsn", pb.request_lsn));
      86            0 :         }
      87            0 :         if pb.not_modified_since_lsn > pb.request_lsn {
      88            0 :             return Err(ProtocolError::invalid(
      89            0 :                 "not_modified_since_lsn",
      90            0 :                 pb.not_modified_since_lsn,
      91            0 :             ));
      92            0 :         }
      93              :         Ok(Self {
      94            0 :             request_lsn: Lsn(pb.request_lsn),
      95            0 :             not_modified_since_lsn: match pb.not_modified_since_lsn {
      96            0 :                 0 => None,
      97            0 :                 lsn => Some(Lsn(lsn)),
      98              :             },
      99              :         })
     100            0 :     }
     101              : }
     102              : 
     103              : impl From<ReadLsn> for proto::ReadLsn {
     104            0 :     fn from(read_lsn: ReadLsn) -> Self {
     105            0 :         Self {
     106            0 :             request_lsn: read_lsn.request_lsn.0,
     107            0 :             not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
     108            0 :         }
     109            0 :     }
     110              : }
     111              : 
     112              : // RelTag is defined in pageserver_api::reltag.
     113              : pub type RelTag = pageserver_api::reltag::RelTag;
     114              : 
     115              : impl TryFrom<proto::RelTag> for RelTag {
     116              :     type Error = ProtocolError;
     117              : 
     118            0 :     fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
     119              :         Ok(Self {
     120            0 :             spcnode: pb.spc_oid,
     121            0 :             dbnode: pb.db_oid,
     122            0 :             relnode: pb.rel_number,
     123            0 :             forknum: pb
     124            0 :                 .fork_number
     125            0 :                 .try_into()
     126            0 :                 .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
     127              :         })
     128            0 :     }
     129              : }
     130              : 
     131              : impl From<RelTag> for proto::RelTag {
     132            0 :     fn from(rel_tag: RelTag) -> Self {
     133            0 :         Self {
     134            0 :             spc_oid: rel_tag.spcnode,
     135            0 :             db_oid: rel_tag.dbnode,
     136            0 :             rel_number: rel_tag.relnode,
     137            0 :             fork_number: rel_tag.forknum as u32,
     138            0 :         }
     139            0 :     }
     140              : }
     141              : 
     142              : /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
     143              : #[derive(Clone, Copy, Debug)]
     144              : pub struct CheckRelExistsRequest {
     145              :     pub read_lsn: ReadLsn,
     146              :     pub rel: RelTag,
     147              : }
     148              : 
     149              : impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
     150              :     type Error = ProtocolError;
     151              : 
     152            0 :     fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
     153              :         Ok(Self {
     154            0 :             read_lsn: pb
     155            0 :                 .read_lsn
     156            0 :                 .ok_or(ProtocolError::Missing("read_lsn"))?
     157            0 :                 .try_into()?,
     158            0 :             rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
     159              :         })
     160            0 :     }
     161              : }
     162              : 
     163              : impl From<CheckRelExistsRequest> for proto::CheckRelExistsRequest {
     164            0 :     fn from(request: CheckRelExistsRequest) -> Self {
     165            0 :         Self {
     166            0 :             read_lsn: Some(request.read_lsn.into()),
     167            0 :             rel: Some(request.rel.into()),
     168            0 :         }
     169            0 :     }
     170              : }
     171              : 
     172              : pub type CheckRelExistsResponse = bool;
     173              : 
     174              : impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
     175            0 :     fn from(pb: proto::CheckRelExistsResponse) -> Self {
     176            0 :         pb.exists
     177            0 :     }
     178              : }
     179              : 
     180              : impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
     181            0 :     fn from(exists: CheckRelExistsResponse) -> Self {
     182            0 :         Self { exists }
     183            0 :     }
     184              : }
     185              : 
     186              : /// Requests a base backup.
     187              : #[derive(Clone, Copy, Debug)]
     188              : pub struct GetBaseBackupRequest {
     189              :     /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
     190              :     pub lsn: Option<Lsn>,
     191              :     /// If true, logical replication slots will not be created.
     192              :     pub replica: bool,
     193              :     /// If true, include relation files in the base backup. Mainly for debugging and tests.
     194              :     pub full: bool,
     195              :     /// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
     196              :     /// compression, so that we can cache compressed backups on the server.
     197              :     pub compression: BaseBackupCompression,
     198              : }
     199              : 
     200              : impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
     201              :     type Error = ProtocolError;
     202              : 
     203            0 :     fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
     204              :         Ok(Self {
     205            0 :             lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
     206            0 :             replica: pb.replica,
     207            0 :             full: pb.full,
     208            0 :             compression: pb.compression.try_into()?,
     209              :         })
     210            0 :     }
     211              : }
     212              : 
     213              : impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
     214            0 :     fn from(request: GetBaseBackupRequest) -> Self {
     215            0 :         Self {
     216            0 :             lsn: request.lsn.unwrap_or_default().0,
     217            0 :             replica: request.replica,
     218            0 :             full: request.full,
     219            0 :             compression: request.compression.into(),
     220            0 :         }
     221            0 :     }
     222              : }
     223              : 
     224              : /// Base backup compression algorithm.
     225              : #[derive(Clone, Copy, Debug)]
     226              : pub enum BaseBackupCompression {
     227              :     None,
     228              :     Gzip,
     229              : }
     230              : 
     231              : impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
     232              :     type Error = ProtocolError;
     233              : 
     234            0 :     fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
     235            0 :         match pb {
     236            0 :             proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
     237            0 :             proto::BaseBackupCompression::None => Ok(Self::None),
     238            0 :             proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
     239              :         }
     240            0 :     }
     241              : }
     242              : 
     243              : impl TryFrom<i32> for BaseBackupCompression {
     244              :     type Error = ProtocolError;
     245              : 
     246            0 :     fn try_from(compression: i32) -> Result<Self, Self::Error> {
     247            0 :         proto::BaseBackupCompression::try_from(compression)
     248            0 :             .map_err(|_| ProtocolError::invalid("compression", compression))
     249            0 :             .and_then(Self::try_from)
     250            0 :     }
     251              : }
     252              : 
     253              : impl From<BaseBackupCompression> for proto::BaseBackupCompression {
     254            0 :     fn from(compression: BaseBackupCompression) -> Self {
     255            0 :         match compression {
     256            0 :             BaseBackupCompression::None => Self::None,
     257            0 :             BaseBackupCompression::Gzip => Self::Gzip,
     258              :         }
     259            0 :     }
     260              : }
     261              : 
     262              : impl From<BaseBackupCompression> for i32 {
     263            0 :     fn from(compression: BaseBackupCompression) -> Self {
     264            0 :         proto::BaseBackupCompression::from(compression).into()
     265            0 :     }
     266              : }
     267              : 
     268              : pub type GetBaseBackupResponseChunk = Bytes;
     269              : 
     270              : impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
     271              :     type Error = ProtocolError;
     272              : 
     273            0 :     fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
     274            0 :         if pb.chunk.is_empty() {
     275            0 :             return Err(ProtocolError::Missing("chunk"));
     276            0 :         }
     277            0 :         Ok(pb.chunk)
     278            0 :     }
     279              : }
     280              : 
     281              : impl From<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
     282            0 :     fn from(chunk: GetBaseBackupResponseChunk) -> Self {
     283            0 :         Self { chunk }
     284            0 :     }
     285              : }
     286              : 
     287              : /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
     288              : #[derive(Clone, Copy, Debug)]
     289              : pub struct GetDbSizeRequest {
     290              :     pub read_lsn: ReadLsn,
     291              :     pub db_oid: Oid,
     292              : }
     293              : 
     294              : impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
     295              :     type Error = ProtocolError;
     296              : 
     297            0 :     fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
     298              :         Ok(Self {
     299            0 :             read_lsn: pb
     300            0 :                 .read_lsn
     301            0 :                 .ok_or(ProtocolError::Missing("read_lsn"))?
     302            0 :                 .try_into()?,
     303            0 :             db_oid: pb.db_oid,
     304              :         })
     305            0 :     }
     306              : }
     307              : 
     308              : impl From<GetDbSizeRequest> for proto::GetDbSizeRequest {
     309            0 :     fn from(request: GetDbSizeRequest) -> Self {
     310            0 :         Self {
     311            0 :             read_lsn: Some(request.read_lsn.into()),
     312            0 :             db_oid: request.db_oid,
     313            0 :         }
     314            0 :     }
     315              : }
     316              : 
     317              : pub type GetDbSizeResponse = u64;
     318              : 
     319              : impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
     320            0 :     fn from(pb: proto::GetDbSizeResponse) -> Self {
     321            0 :         pb.num_bytes
     322            0 :     }
     323              : }
     324              : 
     325              : impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
     326            0 :     fn from(num_bytes: GetDbSizeResponse) -> Self {
     327            0 :         Self { num_bytes }
     328            0 :     }
     329              : }
     330              : 
     331              : /// Requests one or more pages.
     332              : #[derive(Clone, Debug, Default)]
     333              : pub struct GetPageRequest {
     334              :     /// A request ID. Will be included in the response. Should be unique for in-flight requests on
     335              :     /// the stream.
     336              :     pub request_id: RequestID,
     337              :     /// The request class.
     338              :     pub request_class: GetPageClass,
     339              :     /// The LSN to read at.
     340              :     pub read_lsn: ReadLsn,
     341              :     /// The relation to read from.
     342              :     pub rel: RelTag,
     343              :     /// Page numbers to read. Must belong to the remote shard.
     344              :     ///
     345              :     /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
     346              :     /// costs and parallelizing them. This may increase the latency of any individual request, but
     347              :     /// improves the overall latency and throughput of the batch as a whole.
     348              :     pub block_numbers: Vec<u32>,
     349              : }
     350              : 
     351              : impl TryFrom<proto::GetPageRequest> for GetPageRequest {
     352              :     type Error = ProtocolError;
     353              : 
     354            0 :     fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
     355            0 :         if pb.block_number.is_empty() {
     356            0 :             return Err(ProtocolError::Missing("block_number"));
     357            0 :         }
     358              :         Ok(Self {
     359            0 :             request_id: pb
     360            0 :                 .request_id
     361            0 :                 .ok_or(ProtocolError::Missing("request_id"))?
     362            0 :                 .into(),
     363            0 :             request_class: pb.request_class.into(),
     364            0 :             read_lsn: pb
     365            0 :                 .read_lsn
     366            0 :                 .ok_or(ProtocolError::Missing("read_lsn"))?
     367            0 :                 .try_into()?,
     368            0 :             rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
     369            0 :             block_numbers: pb.block_number,
     370              :         })
     371            0 :     }
     372              : }
     373              : 
     374              : impl From<GetPageRequest> for proto::GetPageRequest {
     375            0 :     fn from(request: GetPageRequest) -> Self {
     376            0 :         Self {
     377            0 :             request_id: Some(request.request_id.into()),
     378            0 :             request_class: request.request_class.into(),
     379            0 :             read_lsn: Some(request.read_lsn.into()),
     380            0 :             rel: Some(request.rel.into()),
     381            0 :             block_number: request.block_numbers,
     382            0 :         }
     383            0 :     }
     384              : }
     385              : 
     386              : /// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
     387              : #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
     388              : pub struct RequestID {
     389              :     /// The base request ID.
     390              :     pub id: u64,
     391              :     // The request attempt. Starts at 0, incremented on each retry.
     392              :     pub attempt: u32,
     393              : }
     394              : 
     395              : impl RequestID {
     396              :     /// Creates a new RequestID with the given ID and an initial attempt of 0.
     397            0 :     pub fn new(id: u64) -> Self {
     398            0 :         Self { id, attempt: 0 }
     399            0 :     }
     400              : }
     401              : 
     402              : impl Display for RequestID {
     403            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     404            0 :         write!(f, "{}.{}", self.id, self.attempt)
     405            0 :     }
     406              : }
     407              : 
     408              : impl From<proto::RequestId> for RequestID {
     409            0 :     fn from(pb: proto::RequestId) -> Self {
     410            0 :         Self {
     411            0 :             id: pb.id,
     412            0 :             attempt: pb.attempt,
     413            0 :         }
     414            0 :     }
     415              : }
     416              : 
     417              : impl From<u64> for RequestID {
     418            0 :     fn from(id: u64) -> Self {
     419            0 :         Self::new(id)
     420            0 :     }
     421              : }
     422              : 
     423              : impl From<RequestID> for proto::RequestId {
     424            0 :     fn from(request_id: RequestID) -> Self {
     425            0 :         Self {
     426            0 :             id: request_id.id,
     427            0 :             attempt: request_id.attempt,
     428            0 :         }
     429            0 :     }
     430              : }
     431              : 
     432              : /// A GetPage request class.
     433              : #[derive(Clone, Copy, Debug, Default, strum_macros::Display)]
     434              : pub enum GetPageClass {
     435              :     /// Unknown class. For backwards compatibility: used when an older client version sends a class
     436              :     /// that a newer server version has removed.
     437              :     Unknown,
     438              :     /// A normal request. This is the default.
     439              :     #[default]
     440              :     Normal,
     441              :     /// A prefetch request. NB: can only be classified on pg < 18.
     442              :     Prefetch,
     443              :     /// A background request (e.g. vacuum).
     444              :     Background,
     445              : }
     446              : 
     447              : impl From<proto::GetPageClass> for GetPageClass {
     448            0 :     fn from(pb: proto::GetPageClass) -> Self {
     449            0 :         match pb {
     450            0 :             proto::GetPageClass::Unknown => Self::Unknown,
     451            0 :             proto::GetPageClass::Normal => Self::Normal,
     452            0 :             proto::GetPageClass::Prefetch => Self::Prefetch,
     453            0 :             proto::GetPageClass::Background => Self::Background,
     454              :         }
     455            0 :     }
     456              : }
     457              : 
     458              : impl From<i32> for GetPageClass {
     459            0 :     fn from(class: i32) -> Self {
     460            0 :         proto::GetPageClass::try_from(class)
     461            0 :             .unwrap_or(proto::GetPageClass::Unknown)
     462            0 :             .into()
     463            0 :     }
     464              : }
     465              : 
     466              : impl From<GetPageClass> for proto::GetPageClass {
     467            0 :     fn from(class: GetPageClass) -> Self {
     468            0 :         match class {
     469            0 :             GetPageClass::Unknown => Self::Unknown,
     470            0 :             GetPageClass::Normal => Self::Normal,
     471            0 :             GetPageClass::Prefetch => Self::Prefetch,
     472            0 :             GetPageClass::Background => Self::Background,
     473              :         }
     474            0 :     }
     475              : }
     476              : 
     477              : impl From<GetPageClass> for i32 {
     478            0 :     fn from(class: GetPageClass) -> Self {
     479            0 :         proto::GetPageClass::from(class).into()
     480            0 :     }
     481              : }
     482              : 
     483              : /// A GetPage response.
     484              : ///
     485              : /// A batch response will contain all of the requested pages. We could eagerly emit individual pages
     486              : /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
     487              : /// batch and we'll only return once the entire batch is ready, so no one can make use of the
     488              : /// individual pages.
     489              : #[derive(Clone, Debug)]
     490              : pub struct GetPageResponse {
     491              :     /// The original request's ID.
     492              :     pub request_id: RequestID,
     493              :     /// The response status code. If not OK, the `rel` and `pages` fields will be empty.
     494              :     pub status_code: GetPageStatusCode,
     495              :     /// A string describing the status, if any.
     496              :     pub reason: Option<String>,
     497              :     /// The relation that the pages belong to.
     498              :     pub rel: RelTag,
     499              :     // The page(s), in the same order as the request.
     500              :     pub pages: Vec<Page>,
     501              : }
     502              : 
     503              : impl TryFrom<proto::GetPageResponse> for GetPageResponse {
     504              :     type Error = ProtocolError;
     505              : 
     506            0 :     fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
     507              :         Ok(Self {
     508            0 :             request_id: pb
     509            0 :                 .request_id
     510            0 :                 .ok_or(ProtocolError::Missing("request_id"))?
     511            0 :                 .into(),
     512            0 :             status_code: pb.status_code.into(),
     513            0 :             reason: Some(pb.reason).filter(|r| !r.is_empty()),
     514            0 :             rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
     515            0 :             pages: pb.page.into_iter().map(Page::from).collect(),
     516              :         })
     517            0 :     }
     518              : }
     519              : 
     520              : impl From<GetPageResponse> for proto::GetPageResponse {
     521            0 :     fn from(response: GetPageResponse) -> Self {
     522            0 :         Self {
     523            0 :             request_id: Some(response.request_id.into()),
     524            0 :             status_code: response.status_code.into(),
     525            0 :             reason: response.reason.unwrap_or_default(),
     526            0 :             rel: Some(response.rel.into()),
     527            0 :             page: response.pages.into_iter().map(proto::Page::from).collect(),
     528            0 :         }
     529            0 :     }
     530              : }
     531              : 
     532              : impl GetPageResponse {
     533              :     /// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
     534              :     /// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
     535              :     /// GetPageResponse with a non-OK status code instead.
     536              :     #[allow(clippy::result_large_err)]
     537            0 :     pub fn try_from_status(
     538            0 :         status: tonic::Status,
     539            0 :         request_id: RequestID,
     540            0 :     ) -> Result<Self, tonic::Status> {
     541              :         // We shouldn't see an OK status here, because we're emitting an error.
     542            0 :         debug_assert_ne!(status.code(), tonic::Code::Ok);
     543            0 :         if status.code() == tonic::Code::Ok {
     544            0 :             return Err(tonic::Status::internal(format!(
     545            0 :                 "unexpected OK status: {status:?}",
     546            0 :             )));
     547            0 :         }
     548              : 
     549              :         // If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
     550              :         // error and we should return a tonic::Status to terminate the stream.
     551            0 :         let Ok(status_code) = status.code().try_into() else {
     552            0 :             return Err(status);
     553              :         };
     554              : 
     555              :         // Return a GetPageResponse for the status.
     556            0 :         Ok(Self {
     557            0 :             request_id,
     558            0 :             status_code,
     559            0 :             reason: Some(status.message().to_string()),
     560            0 :             rel: RelTag::default(),
     561            0 :             pages: Vec::new(),
     562            0 :         })
     563            0 :     }
     564              : }
     565              : 
     566              : // A page.
     567              : #[derive(Clone, Debug)]
     568              : pub struct Page {
     569              :     /// The page number.
     570              :     pub block_number: u32,
     571              :     /// The materialized page image, as an 8KB byte vector.
     572              :     pub image: Bytes,
     573              : }
     574              : 
     575              : impl From<proto::Page> for Page {
     576            0 :     fn from(pb: proto::Page) -> Self {
     577            0 :         Self {
     578            0 :             block_number: pb.block_number,
     579            0 :             image: pb.image,
     580            0 :         }
     581            0 :     }
     582              : }
     583              : 
     584              : impl From<Page> for proto::Page {
     585            0 :     fn from(page: Page) -> Self {
     586            0 :         Self {
     587            0 :             block_number: page.block_number,
     588            0 :             image: page.image,
     589            0 :         }
     590            0 :     }
     591              : }
     592              : 
     593              : /// A GetPage response status code.
     594              : ///
     595              : /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
     596              : /// (potentially shared by many backends), and a gRPC status response would terminate the stream so
     597              : /// we send GetPageResponse messages with these codes instead.
     598              : #[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)]
     599              : pub enum GetPageStatusCode {
     600              :     /// Unknown status. For forwards compatibility: used when an older client version receives a new
     601              :     /// status code from a newer server version.
     602              :     Unknown,
     603              :     /// The request was successful.
     604              :     Ok,
     605              :     /// The page did not exist. The tenant/timeline/shard has already been validated during stream
     606              :     /// setup.
     607              :     NotFound,
     608              :     /// The request was invalid.
     609              :     InvalidRequest,
     610              :     /// The request failed due to an internal server error.
     611              :     InternalError,
     612              :     /// The tenant is rate limited. Slow down and retry later.
     613              :     SlowDown,
     614              : }
     615              : 
     616              : impl From<proto::GetPageStatusCode> for GetPageStatusCode {
     617            0 :     fn from(pb: proto::GetPageStatusCode) -> Self {
     618            0 :         match pb {
     619            0 :             proto::GetPageStatusCode::Unknown => Self::Unknown,
     620            0 :             proto::GetPageStatusCode::Ok => Self::Ok,
     621            0 :             proto::GetPageStatusCode::NotFound => Self::NotFound,
     622            0 :             proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
     623            0 :             proto::GetPageStatusCode::InternalError => Self::InternalError,
     624            0 :             proto::GetPageStatusCode::SlowDown => Self::SlowDown,
     625              :         }
     626            0 :     }
     627              : }
     628              : 
     629              : impl From<i32> for GetPageStatusCode {
     630            0 :     fn from(status_code: i32) -> Self {
     631            0 :         proto::GetPageStatusCode::try_from(status_code)
     632            0 :             .unwrap_or(proto::GetPageStatusCode::Unknown)
     633            0 :             .into()
     634            0 :     }
     635              : }
     636              : 
     637              : impl From<GetPageStatusCode> for proto::GetPageStatusCode {
     638            0 :     fn from(status_code: GetPageStatusCode) -> Self {
     639            0 :         match status_code {
     640            0 :             GetPageStatusCode::Unknown => Self::Unknown,
     641            0 :             GetPageStatusCode::Ok => Self::Ok,
     642            0 :             GetPageStatusCode::NotFound => Self::NotFound,
     643            0 :             GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
     644            0 :             GetPageStatusCode::InternalError => Self::InternalError,
     645            0 :             GetPageStatusCode::SlowDown => Self::SlowDown,
     646              :         }
     647            0 :     }
     648              : }
     649              : 
     650              : impl From<GetPageStatusCode> for i32 {
     651            0 :     fn from(status_code: GetPageStatusCode) -> Self {
     652            0 :         proto::GetPageStatusCode::from(status_code).into()
     653            0 :     }
     654              : }
     655              : 
     656              : impl TryFrom<tonic::Code> for GetPageStatusCode {
     657              :     type Error = tonic::Code;
     658              : 
     659            0 :     fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
     660              :         use tonic::Code;
     661              : 
     662            0 :         let status_code = match code {
     663            0 :             Code::Ok => Self::Ok,
     664              : 
     665              :             // These are per-request errors, which should be returned as GetPageResponses.
     666            0 :             Code::AlreadyExists => Self::InvalidRequest,
     667            0 :             Code::DataLoss => Self::InternalError,
     668            0 :             Code::FailedPrecondition => Self::InvalidRequest,
     669            0 :             Code::InvalidArgument => Self::InvalidRequest,
     670            0 :             Code::Internal => Self::InternalError,
     671            0 :             Code::NotFound => Self::NotFound,
     672            0 :             Code::OutOfRange => Self::InvalidRequest,
     673            0 :             Code::ResourceExhausted => Self::SlowDown,
     674              : 
     675              :             // These should terminate the stream by returning a tonic::Status.
     676              :             Code::Aborted
     677              :             | Code::Cancelled
     678              :             | Code::DeadlineExceeded
     679              :             | Code::PermissionDenied
     680              :             | Code::Unauthenticated
     681              :             | Code::Unavailable
     682              :             | Code::Unimplemented
     683            0 :             | Code::Unknown => return Err(code),
     684              :         };
     685            0 :         Ok(status_code)
     686            0 :     }
     687              : }
     688              : 
     689              : impl From<GetPageStatusCode> for tonic::Code {
     690            0 :     fn from(status_code: GetPageStatusCode) -> Self {
     691              :         use tonic::Code;
     692              : 
     693            0 :         match status_code {
     694            0 :             GetPageStatusCode::Unknown => Code::Unknown,
     695            0 :             GetPageStatusCode::Ok => Code::Ok,
     696            0 :             GetPageStatusCode::NotFound => Code::NotFound,
     697            0 :             GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
     698            0 :             GetPageStatusCode::InternalError => Code::Internal,
     699            0 :             GetPageStatusCode::SlowDown => Code::ResourceExhausted,
     700              :         }
     701            0 :     }
     702              : }
     703              : 
     704              : // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
     705              : // shards will error.
     706              : #[derive(Clone, Copy, Debug)]
     707              : pub struct GetRelSizeRequest {
     708              :     pub read_lsn: ReadLsn,
     709              :     pub rel: RelTag,
     710              : }
     711              : 
     712              : impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
     713              :     type Error = ProtocolError;
     714              : 
     715            0 :     fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
     716              :         Ok(Self {
     717            0 :             read_lsn: proto
     718            0 :                 .read_lsn
     719            0 :                 .ok_or(ProtocolError::Missing("read_lsn"))?
     720            0 :                 .try_into()?,
     721            0 :             rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
     722              :         })
     723            0 :     }
     724              : }
     725              : 
     726              : impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
     727            0 :     fn from(request: GetRelSizeRequest) -> Self {
     728            0 :         Self {
     729            0 :             read_lsn: Some(request.read_lsn.into()),
     730            0 :             rel: Some(request.rel.into()),
     731            0 :         }
     732            0 :     }
     733              : }
     734              : 
     735              : pub type GetRelSizeResponse = u32;
     736              : 
     737              : impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
     738            0 :     fn from(proto: proto::GetRelSizeResponse) -> Self {
     739            0 :         proto.num_blocks
     740            0 :     }
     741              : }
     742              : 
     743              : impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
     744            0 :     fn from(num_blocks: GetRelSizeResponse) -> Self {
     745            0 :         Self { num_blocks }
     746            0 :     }
     747              : }
     748              : 
     749              : /// Requests an SLRU segment. Only valid on shard 0, other shards will error.
     750              : #[derive(Clone, Copy, Debug)]
     751              : pub struct GetSlruSegmentRequest {
     752              :     pub read_lsn: ReadLsn,
     753              :     pub kind: SlruKind,
     754              :     pub segno: u32,
     755              : }
     756              : 
     757              : impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
     758              :     type Error = ProtocolError;
     759              : 
     760            0 :     fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
     761              :         Ok(Self {
     762            0 :             read_lsn: pb
     763            0 :                 .read_lsn
     764            0 :                 .ok_or(ProtocolError::Missing("read_lsn"))?
     765            0 :                 .try_into()?,
     766            0 :             kind: u8::try_from(pb.kind)
     767            0 :                 .ok()
     768            0 :                 .and_then(SlruKind::from_repr)
     769            0 :                 .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
     770            0 :             segno: pb.segno,
     771              :         })
     772            0 :     }
     773              : }
     774              : 
     775              : impl From<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
     776            0 :     fn from(request: GetSlruSegmentRequest) -> Self {
     777            0 :         Self {
     778            0 :             read_lsn: Some(request.read_lsn.into()),
     779            0 :             kind: request.kind as u32,
     780            0 :             segno: request.segno,
     781            0 :         }
     782            0 :     }
     783              : }
     784              : 
     785              : pub type GetSlruSegmentResponse = Bytes;
     786              : 
     787              : impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
     788              :     type Error = ProtocolError;
     789              : 
     790            0 :     fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
     791            0 :         if pb.segment.is_empty() {
     792            0 :             return Err(ProtocolError::Missing("segment"));
     793            0 :         }
     794            0 :         Ok(pb.segment)
     795            0 :     }
     796              : }
     797              : 
     798              : impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
     799            0 :     fn from(segment: GetSlruSegmentResponse) -> Self {
     800            0 :         Self { segment }
     801            0 :     }
     802              : }
     803              : 
     804              : // SlruKind is defined in pageserver_api::reltag.
     805              : pub type SlruKind = pageserver_api::reltag::SlruKind;
     806              : 
     807              : /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
     808              : /// collect the LSN until the lease expires.
     809              : pub struct LeaseLsnRequest {
     810              :     /// The LSN to lease.
     811              :     pub lsn: Lsn,
     812              : }
     813              : 
     814              : impl TryFrom<proto::LeaseLsnRequest> for LeaseLsnRequest {
     815              :     type Error = ProtocolError;
     816              : 
     817            0 :     fn try_from(pb: proto::LeaseLsnRequest) -> Result<Self, Self::Error> {
     818            0 :         if pb.lsn == 0 {
     819            0 :             return Err(ProtocolError::Missing("lsn"));
     820            0 :         }
     821            0 :         Ok(Self { lsn: Lsn(pb.lsn) })
     822            0 :     }
     823              : }
     824              : 
     825              : impl From<LeaseLsnRequest> for proto::LeaseLsnRequest {
     826            0 :     fn from(request: LeaseLsnRequest) -> Self {
     827            0 :         Self { lsn: request.lsn.0 }
     828            0 :     }
     829              : }
     830              : 
     831              : /// Lease expiration time. If the lease could not be granted because the LSN has already been
     832              : /// garbage collected, a FailedPrecondition status will be returned instead.
     833              : pub type LeaseLsnResponse = SystemTime;
     834              : 
     835              : impl TryFrom<proto::LeaseLsnResponse> for LeaseLsnResponse {
     836              :     type Error = ProtocolError;
     837              : 
     838            0 :     fn try_from(pb: proto::LeaseLsnResponse) -> Result<Self, Self::Error> {
     839            0 :         let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?;
     840            0 :         UNIX_EPOCH
     841            0 :             .checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32))
     842            0 :             .ok_or_else(|| ProtocolError::invalid("expires", expires))
     843            0 :     }
     844              : }
     845              : 
     846              : impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
     847            0 :     fn from(response: LeaseLsnResponse) -> Self {
     848            0 :         let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default();
     849            0 :         Self {
     850            0 :             expires: Some(prost_types::Timestamp {
     851            0 :                 seconds: expires.as_secs() as i64,
     852            0 :                 nanos: expires.subsec_nanos() as i32,
     853            0 :             }),
     854            0 :         }
     855            0 :     }
     856              : }
        

Generated by: LCOV version 2.1-beta