LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - pagestream_api.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 32.7 % 468 153
Test Date: 2025-07-16 12:29:03 Functions: 33.3 % 12 4

            Line data    Source code
       1              : //! Rust definitions of the libpq-based pagestream API
       2              : //!
       3              : //! See also the C implementation of the same API in pgxn/neon/pagestore_client.h
       4              : 
       5              : use std::io::{BufRead, Read};
       6              : 
       7              : use crate::reltag::RelTag;
       8              : 
       9              : use byteorder::{BigEndian, ReadBytesExt};
      10              : use bytes::{Buf, BufMut, Bytes, BytesMut};
      11              : use utils::lsn::Lsn;
      12              : 
      13              : /// Block size.
      14              : ///
      15              : /// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
      16              : /// that in the protocol, because Postgres supports different block sizes as a compile
      17              : /// time option.
      18              : const BLCKSZ: usize = 8192;
      19              : 
      20              : // Wrapped in libpq CopyData
      21              : #[derive(PartialEq, Eq, Debug)]
      22              : pub enum PagestreamFeMessage {
      23              :     Exists(PagestreamExistsRequest),
      24              :     Nblocks(PagestreamNblocksRequest),
      25              :     GetPage(PagestreamGetPageRequest),
      26              :     DbSize(PagestreamDbSizeRequest),
      27              :     GetSlruSegment(PagestreamGetSlruSegmentRequest),
      28              :     #[cfg(feature = "testing")]
      29              :     Test(PagestreamTestRequest),
      30              : }
      31              : 
      32              : // Wrapped in libpq CopyData
      33              : #[derive(Debug, strum_macros::EnumProperty)]
      34              : pub enum PagestreamBeMessage {
      35              :     Exists(PagestreamExistsResponse),
      36              :     Nblocks(PagestreamNblocksResponse),
      37              :     GetPage(PagestreamGetPageResponse),
      38              :     Error(PagestreamErrorResponse),
      39              :     DbSize(PagestreamDbSizeResponse),
      40              :     GetSlruSegment(PagestreamGetSlruSegmentResponse),
      41              :     #[cfg(feature = "testing")]
      42              :     Test(PagestreamTestResponse),
      43              : }
      44              : 
      45              : // Keep in sync with `pagestore_client.h`
      46              : #[repr(u8)]
      47              : enum PagestreamFeMessageTag {
      48              :     Exists = 0,
      49              :     Nblocks = 1,
      50              :     GetPage = 2,
      51              :     DbSize = 3,
      52              :     GetSlruSegment = 4,
      53              :     /* future tags above this line */
      54              :     /// For testing purposes, not available in production.
      55              :     #[cfg(feature = "testing")]
      56              :     Test = 99,
      57              : }
      58              : 
      59              : // Keep in sync with `pagestore_client.h`
      60              : #[repr(u8)]
      61              : enum PagestreamBeMessageTag {
      62              :     Exists = 100,
      63              :     Nblocks = 101,
      64              :     GetPage = 102,
      65              :     Error = 103,
      66              :     DbSize = 104,
      67              :     GetSlruSegment = 105,
      68              :     /* future tags above this line */
      69              :     /// For testing purposes, not available in production.
      70              :     #[cfg(feature = "testing")]
      71              :     Test = 199,
      72              : }
      73              : 
      74              : impl TryFrom<u8> for PagestreamFeMessageTag {
      75              :     type Error = u8;
      76            4 :     fn try_from(value: u8) -> Result<Self, u8> {
      77            4 :         match value {
      78            1 :             0 => Ok(PagestreamFeMessageTag::Exists),
      79            1 :             1 => Ok(PagestreamFeMessageTag::Nblocks),
      80            1 :             2 => Ok(PagestreamFeMessageTag::GetPage),
      81            1 :             3 => Ok(PagestreamFeMessageTag::DbSize),
      82            0 :             4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
      83              :             #[cfg(feature = "testing")]
      84            0 :             99 => Ok(PagestreamFeMessageTag::Test),
      85            0 :             _ => Err(value),
      86              :         }
      87            4 :     }
      88              : }
      89              : 
      90              : impl TryFrom<u8> for PagestreamBeMessageTag {
      91              :     type Error = u8;
      92            0 :     fn try_from(value: u8) -> Result<Self, u8> {
      93            0 :         match value {
      94            0 :             100 => Ok(PagestreamBeMessageTag::Exists),
      95            0 :             101 => Ok(PagestreamBeMessageTag::Nblocks),
      96            0 :             102 => Ok(PagestreamBeMessageTag::GetPage),
      97            0 :             103 => Ok(PagestreamBeMessageTag::Error),
      98            0 :             104 => Ok(PagestreamBeMessageTag::DbSize),
      99            0 :             105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
     100              :             #[cfg(feature = "testing")]
     101            0 :             199 => Ok(PagestreamBeMessageTag::Test),
     102            0 :             _ => Err(value),
     103              :         }
     104            0 :     }
     105              : }
     106              : 
     107              : // A GetPage request contains two LSN values:
     108              : //
     109              : // request_lsn: Get the page version at this point in time.  Lsn::Max is a special value that means
     110              : // "get the latest version present". It's used by the primary server, which knows that no one else
     111              : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
     112              : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
     113              : //
     114              : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
     115              : // modified between 'not_modified_since' and the request LSN. It's always correct to set
     116              : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
     117              : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
     118              : // request without waiting for 'request_lsn' to arrive.
     119              : //
     120              : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
     121              : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
     122              : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
     123              : // standby to request a page at a particular non-latest LSN, and also include the
     124              : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
     125              : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
     126              : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
     127              : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
     128              : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
     129              : // difference in the responses between V1 and V2.
     130              : //
     131              : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
     132              : // as well as other fields from requests, which allows to verify that we receive response for our request.
     133              : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
     134              : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
     135              : //
     136              : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     137              : pub enum PagestreamProtocolVersion {
     138              :     V2,
     139              :     V3,
     140              : }
     141              : 
     142              : pub type RequestId = u64;
     143              : 
     144              : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
     145              : pub struct PagestreamRequest {
     146              :     pub reqid: RequestId,
     147              :     pub request_lsn: Lsn,
     148              :     pub not_modified_since: Lsn,
     149              : }
     150              : 
     151              : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     152              : pub struct PagestreamExistsRequest {
     153              :     pub hdr: PagestreamRequest,
     154              :     pub rel: RelTag,
     155              : }
     156              : 
     157              : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     158              : pub struct PagestreamNblocksRequest {
     159              :     pub hdr: PagestreamRequest,
     160              :     pub rel: RelTag,
     161              : }
     162              : 
     163              : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
     164              : pub struct PagestreamGetPageRequest {
     165              :     pub hdr: PagestreamRequest,
     166              :     pub rel: RelTag,
     167              :     pub blkno: u32,
     168              : }
     169              : 
     170              : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     171              : pub struct PagestreamDbSizeRequest {
     172              :     pub hdr: PagestreamRequest,
     173              :     pub dbnode: u32,
     174              : }
     175              : 
     176              : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     177              : pub struct PagestreamGetSlruSegmentRequest {
     178              :     pub hdr: PagestreamRequest,
     179              :     pub kind: u8,
     180              :     pub segno: u32,
     181              : }
     182              : 
     183              : #[derive(Debug)]
     184              : pub struct PagestreamExistsResponse {
     185              :     pub req: PagestreamExistsRequest,
     186              :     pub exists: bool,
     187              : }
     188              : 
     189              : #[derive(Debug)]
     190              : pub struct PagestreamNblocksResponse {
     191              :     pub req: PagestreamNblocksRequest,
     192              :     pub n_blocks: u32,
     193              : }
     194              : 
     195              : #[derive(Debug)]
     196              : pub struct PagestreamGetPageResponse {
     197              :     pub req: PagestreamGetPageRequest,
     198              :     pub page: Bytes,
     199              : }
     200              : 
     201              : #[derive(Debug)]
     202              : pub struct PagestreamGetSlruSegmentResponse {
     203              :     pub req: PagestreamGetSlruSegmentRequest,
     204              :     pub segment: Bytes,
     205              : }
     206              : 
     207              : #[derive(Debug)]
     208              : pub struct PagestreamErrorResponse {
     209              :     pub req: PagestreamRequest,
     210              :     pub message: String,
     211              : }
     212              : 
     213              : #[derive(Debug)]
     214              : pub struct PagestreamDbSizeResponse {
     215              :     pub req: PagestreamDbSizeRequest,
     216              :     pub db_size: i64,
     217              : }
     218              : 
     219              : #[cfg(feature = "testing")]
     220              : #[derive(Debug, PartialEq, Eq, Clone)]
     221              : pub struct PagestreamTestRequest {
     222              :     pub hdr: PagestreamRequest,
     223              :     pub batch_key: u64,
     224              :     pub message: String,
     225              : }
     226              : 
     227              : #[cfg(feature = "testing")]
     228              : #[derive(Debug)]
     229              : pub struct PagestreamTestResponse {
     230              :     pub req: PagestreamTestRequest,
     231              : }
     232              : 
     233              : impl PagestreamFeMessage {
     234              :     /// Serialize a compute -> pageserver message. This is currently only used in testing
     235              :     /// tools. Always uses protocol version 3.
     236            4 :     pub fn serialize(&self) -> Bytes {
     237            4 :         let mut bytes = BytesMut::new();
     238              : 
     239            4 :         match self {
     240            1 :             Self::Exists(req) => {
     241            1 :                 bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
     242            1 :                 bytes.put_u64(req.hdr.reqid);
     243            1 :                 bytes.put_u64(req.hdr.request_lsn.0);
     244            1 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     245            1 :                 bytes.put_u32(req.rel.spcnode);
     246            1 :                 bytes.put_u32(req.rel.dbnode);
     247            1 :                 bytes.put_u32(req.rel.relnode);
     248            1 :                 bytes.put_u8(req.rel.forknum);
     249            1 :             }
     250              : 
     251            1 :             Self::Nblocks(req) => {
     252            1 :                 bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
     253            1 :                 bytes.put_u64(req.hdr.reqid);
     254            1 :                 bytes.put_u64(req.hdr.request_lsn.0);
     255            1 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     256            1 :                 bytes.put_u32(req.rel.spcnode);
     257            1 :                 bytes.put_u32(req.rel.dbnode);
     258            1 :                 bytes.put_u32(req.rel.relnode);
     259            1 :                 bytes.put_u8(req.rel.forknum);
     260            1 :             }
     261              : 
     262            1 :             Self::GetPage(req) => {
     263            1 :                 bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
     264            1 :                 bytes.put_u64(req.hdr.reqid);
     265            1 :                 bytes.put_u64(req.hdr.request_lsn.0);
     266            1 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     267            1 :                 bytes.put_u32(req.rel.spcnode);
     268            1 :                 bytes.put_u32(req.rel.dbnode);
     269            1 :                 bytes.put_u32(req.rel.relnode);
     270            1 :                 bytes.put_u8(req.rel.forknum);
     271            1 :                 bytes.put_u32(req.blkno);
     272            1 :             }
     273              : 
     274            1 :             Self::DbSize(req) => {
     275            1 :                 bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
     276            1 :                 bytes.put_u64(req.hdr.reqid);
     277            1 :                 bytes.put_u64(req.hdr.request_lsn.0);
     278            1 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     279            1 :                 bytes.put_u32(req.dbnode);
     280            1 :             }
     281              : 
     282            0 :             Self::GetSlruSegment(req) => {
     283            0 :                 bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
     284            0 :                 bytes.put_u64(req.hdr.reqid);
     285            0 :                 bytes.put_u64(req.hdr.request_lsn.0);
     286            0 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     287            0 :                 bytes.put_u8(req.kind);
     288            0 :                 bytes.put_u32(req.segno);
     289            0 :             }
     290              :             #[cfg(feature = "testing")]
     291            0 :             Self::Test(req) => {
     292            0 :                 bytes.put_u8(PagestreamFeMessageTag::Test as u8);
     293            0 :                 bytes.put_u64(req.hdr.reqid);
     294            0 :                 bytes.put_u64(req.hdr.request_lsn.0);
     295            0 :                 bytes.put_u64(req.hdr.not_modified_since.0);
     296            0 :                 bytes.put_u64(req.batch_key);
     297            0 :                 let message = req.message.as_bytes();
     298            0 :                 bytes.put_u64(message.len() as u64);
     299            0 :                 bytes.put_slice(message);
     300            0 :             }
     301              :         }
     302              : 
     303            4 :         bytes.into()
     304            4 :     }
     305              : 
     306            4 :     pub fn parse<R: std::io::Read>(
     307            4 :         body: &mut R,
     308            4 :         protocol_version: PagestreamProtocolVersion,
     309            4 :     ) -> anyhow::Result<PagestreamFeMessage> {
     310              :         // these correspond to the NeonMessageTag enum in pagestore_client.h
     311              :         //
     312              :         // TODO: consider using protobuf or serde bincode for less error prone
     313              :         // serialization.
     314            4 :         let msg_tag = body.read_u8()?;
     315            4 :         let (reqid, request_lsn, not_modified_since) = match protocol_version {
     316              :             PagestreamProtocolVersion::V2 => (
     317              :                 0,
     318            0 :                 Lsn::from(body.read_u64::<BigEndian>()?),
     319            0 :                 Lsn::from(body.read_u64::<BigEndian>()?),
     320              :             ),
     321              :             PagestreamProtocolVersion::V3 => (
     322            4 :                 body.read_u64::<BigEndian>()?,
     323            4 :                 Lsn::from(body.read_u64::<BigEndian>()?),
     324            4 :                 Lsn::from(body.read_u64::<BigEndian>()?),
     325              :             ),
     326              :         };
     327              : 
     328            4 :         match PagestreamFeMessageTag::try_from(msg_tag)
     329            4 :             .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
     330              :         {
     331              :             PagestreamFeMessageTag::Exists => {
     332              :                 Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
     333            1 :                     hdr: PagestreamRequest {
     334            1 :                         reqid,
     335            1 :                         request_lsn,
     336            1 :                         not_modified_since,
     337            1 :                     },
     338              :                     rel: RelTag {
     339            1 :                         spcnode: body.read_u32::<BigEndian>()?,
     340            1 :                         dbnode: body.read_u32::<BigEndian>()?,
     341            1 :                         relnode: body.read_u32::<BigEndian>()?,
     342            1 :                         forknum: body.read_u8()?,
     343              :                     },
     344              :                 }))
     345              :             }
     346              :             PagestreamFeMessageTag::Nblocks => {
     347              :                 Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     348            1 :                     hdr: PagestreamRequest {
     349            1 :                         reqid,
     350            1 :                         request_lsn,
     351            1 :                         not_modified_since,
     352            1 :                     },
     353              :                     rel: RelTag {
     354            1 :                         spcnode: body.read_u32::<BigEndian>()?,
     355            1 :                         dbnode: body.read_u32::<BigEndian>()?,
     356            1 :                         relnode: body.read_u32::<BigEndian>()?,
     357            1 :                         forknum: body.read_u8()?,
     358              :                     },
     359              :                 }))
     360              :             }
     361              :             PagestreamFeMessageTag::GetPage => {
     362              :                 Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     363            1 :                     hdr: PagestreamRequest {
     364            1 :                         reqid,
     365            1 :                         request_lsn,
     366            1 :                         not_modified_since,
     367            1 :                     },
     368              :                     rel: RelTag {
     369            1 :                         spcnode: body.read_u32::<BigEndian>()?,
     370            1 :                         dbnode: body.read_u32::<BigEndian>()?,
     371            1 :                         relnode: body.read_u32::<BigEndian>()?,
     372            1 :                         forknum: body.read_u8()?,
     373              :                     },
     374            1 :                     blkno: body.read_u32::<BigEndian>()?,
     375              :                 }))
     376              :             }
     377              :             PagestreamFeMessageTag::DbSize => {
     378              :                 Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     379            1 :                     hdr: PagestreamRequest {
     380            1 :                         reqid,
     381            1 :                         request_lsn,
     382            1 :                         not_modified_since,
     383            1 :                     },
     384            1 :                     dbnode: body.read_u32::<BigEndian>()?,
     385              :                 }))
     386              :             }
     387              :             PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
     388              :                 PagestreamGetSlruSegmentRequest {
     389            0 :                     hdr: PagestreamRequest {
     390            0 :                         reqid,
     391            0 :                         request_lsn,
     392            0 :                         not_modified_since,
     393            0 :                     },
     394            0 :                     kind: body.read_u8()?,
     395            0 :                     segno: body.read_u32::<BigEndian>()?,
     396              :                 },
     397              :             )),
     398              :             #[cfg(feature = "testing")]
     399              :             PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
     400            0 :                 hdr: PagestreamRequest {
     401            0 :                     reqid,
     402            0 :                     request_lsn,
     403            0 :                     not_modified_since,
     404            0 :                 },
     405            0 :                 batch_key: body.read_u64::<BigEndian>()?,
     406              :                 message: {
     407            0 :                     let len = body.read_u64::<BigEndian>()?;
     408            0 :                     let mut buf = vec![0; len as usize];
     409            0 :                     body.read_exact(&mut buf)?;
     410            0 :                     String::from_utf8(buf)?
     411              :                 },
     412              :             })),
     413              :         }
     414            4 :     }
     415              : }
     416              : 
     417              : impl PagestreamBeMessage {
     418            0 :     pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
     419            0 :         let mut bytes = BytesMut::new();
     420              : 
     421              :         use PagestreamBeMessageTag as Tag;
     422            0 :         match protocol_version {
     423              :             PagestreamProtocolVersion::V2 => {
     424            0 :                 match self {
     425            0 :                     Self::Exists(resp) => {
     426            0 :                         bytes.put_u8(Tag::Exists as u8);
     427            0 :                         bytes.put_u8(resp.exists as u8);
     428            0 :                     }
     429              : 
     430            0 :                     Self::Nblocks(resp) => {
     431            0 :                         bytes.put_u8(Tag::Nblocks as u8);
     432            0 :                         bytes.put_u32(resp.n_blocks);
     433            0 :                     }
     434              : 
     435            0 :                     Self::GetPage(resp) => {
     436            0 :                         bytes.put_u8(Tag::GetPage as u8);
     437            0 :                         bytes.put(&resp.page[..])
     438              :                     }
     439              : 
     440            0 :                     Self::Error(resp) => {
     441            0 :                         bytes.put_u8(Tag::Error as u8);
     442            0 :                         bytes.put(resp.message.as_bytes());
     443            0 :                         bytes.put_u8(0); // null terminator
     444            0 :                     }
     445            0 :                     Self::DbSize(resp) => {
     446            0 :                         bytes.put_u8(Tag::DbSize as u8);
     447            0 :                         bytes.put_i64(resp.db_size);
     448            0 :                     }
     449              : 
     450            0 :                     Self::GetSlruSegment(resp) => {
     451            0 :                         bytes.put_u8(Tag::GetSlruSegment as u8);
     452            0 :                         bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
     453            0 :                         bytes.put(&resp.segment[..]);
     454            0 :                     }
     455              : 
     456              :                     #[cfg(feature = "testing")]
     457            0 :                     Self::Test(resp) => {
     458            0 :                         bytes.put_u8(Tag::Test as u8);
     459            0 :                         bytes.put_u64(resp.req.batch_key);
     460            0 :                         let message = resp.req.message.as_bytes();
     461            0 :                         bytes.put_u64(message.len() as u64);
     462            0 :                         bytes.put_slice(message);
     463            0 :                     }
     464              :                 }
     465              :             }
     466              :             PagestreamProtocolVersion::V3 => {
     467            0 :                 match self {
     468            0 :                     Self::Exists(resp) => {
     469            0 :                         bytes.put_u8(Tag::Exists as u8);
     470            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     471            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     472            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     473            0 :                         bytes.put_u32(resp.req.rel.spcnode);
     474            0 :                         bytes.put_u32(resp.req.rel.dbnode);
     475            0 :                         bytes.put_u32(resp.req.rel.relnode);
     476            0 :                         bytes.put_u8(resp.req.rel.forknum);
     477            0 :                         bytes.put_u8(resp.exists as u8);
     478            0 :                     }
     479              : 
     480            0 :                     Self::Nblocks(resp) => {
     481            0 :                         bytes.put_u8(Tag::Nblocks as u8);
     482            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     483            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     484            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     485            0 :                         bytes.put_u32(resp.req.rel.spcnode);
     486            0 :                         bytes.put_u32(resp.req.rel.dbnode);
     487            0 :                         bytes.put_u32(resp.req.rel.relnode);
     488            0 :                         bytes.put_u8(resp.req.rel.forknum);
     489            0 :                         bytes.put_u32(resp.n_blocks);
     490            0 :                     }
     491              : 
     492            0 :                     Self::GetPage(resp) => {
     493            0 :                         bytes.put_u8(Tag::GetPage as u8);
     494            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     495            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     496            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     497            0 :                         bytes.put_u32(resp.req.rel.spcnode);
     498            0 :                         bytes.put_u32(resp.req.rel.dbnode);
     499            0 :                         bytes.put_u32(resp.req.rel.relnode);
     500            0 :                         bytes.put_u8(resp.req.rel.forknum);
     501            0 :                         bytes.put_u32(resp.req.blkno);
     502            0 :                         bytes.put(&resp.page[..])
     503              :                     }
     504              : 
     505            0 :                     Self::Error(resp) => {
     506            0 :                         bytes.put_u8(Tag::Error as u8);
     507            0 :                         bytes.put_u64(resp.req.reqid);
     508            0 :                         bytes.put_u64(resp.req.request_lsn.0);
     509            0 :                         bytes.put_u64(resp.req.not_modified_since.0);
     510            0 :                         bytes.put(resp.message.as_bytes());
     511            0 :                         bytes.put_u8(0); // null terminator
     512            0 :                     }
     513            0 :                     Self::DbSize(resp) => {
     514            0 :                         bytes.put_u8(Tag::DbSize as u8);
     515            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     516            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     517            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     518            0 :                         bytes.put_u32(resp.req.dbnode);
     519            0 :                         bytes.put_i64(resp.db_size);
     520            0 :                     }
     521              : 
     522            0 :                     Self::GetSlruSegment(resp) => {
     523            0 :                         bytes.put_u8(Tag::GetSlruSegment as u8);
     524            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     525            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     526            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     527            0 :                         bytes.put_u8(resp.req.kind);
     528            0 :                         bytes.put_u32(resp.req.segno);
     529            0 :                         bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
     530            0 :                         bytes.put(&resp.segment[..]);
     531            0 :                     }
     532              : 
     533              :                     #[cfg(feature = "testing")]
     534            0 :                     Self::Test(resp) => {
     535            0 :                         bytes.put_u8(Tag::Test as u8);
     536            0 :                         bytes.put_u64(resp.req.hdr.reqid);
     537            0 :                         bytes.put_u64(resp.req.hdr.request_lsn.0);
     538            0 :                         bytes.put_u64(resp.req.hdr.not_modified_since.0);
     539            0 :                         bytes.put_u64(resp.req.batch_key);
     540            0 :                         let message = resp.req.message.as_bytes();
     541            0 :                         bytes.put_u64(message.len() as u64);
     542            0 :                         bytes.put_slice(message);
     543            0 :                     }
     544              :                 }
     545              :             }
     546              :         }
     547            0 :         bytes.into()
     548            0 :     }
     549              : 
     550            0 :     pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
     551            0 :         let mut buf = buf.reader();
     552            0 :         let msg_tag = buf.read_u8()?;
     553              : 
     554              :         use PagestreamBeMessageTag as Tag;
     555            0 :         let ok =
     556            0 :             match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
     557              :                 Tag::Exists => {
     558            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     559            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     560            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     561            0 :                     let rel = RelTag {
     562            0 :                         spcnode: buf.read_u32::<BigEndian>()?,
     563            0 :                         dbnode: buf.read_u32::<BigEndian>()?,
     564            0 :                         relnode: buf.read_u32::<BigEndian>()?,
     565            0 :                         forknum: buf.read_u8()?,
     566              :                     };
     567            0 :                     let exists = buf.read_u8()? != 0;
     568            0 :                     Self::Exists(PagestreamExistsResponse {
     569            0 :                         req: PagestreamExistsRequest {
     570            0 :                             hdr: PagestreamRequest {
     571            0 :                                 reqid,
     572            0 :                                 request_lsn,
     573            0 :                                 not_modified_since,
     574            0 :                             },
     575            0 :                             rel,
     576            0 :                         },
     577            0 :                         exists,
     578            0 :                     })
     579              :                 }
     580              :                 Tag::Nblocks => {
     581            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     582            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     583            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     584            0 :                     let rel = RelTag {
     585            0 :                         spcnode: buf.read_u32::<BigEndian>()?,
     586            0 :                         dbnode: buf.read_u32::<BigEndian>()?,
     587            0 :                         relnode: buf.read_u32::<BigEndian>()?,
     588            0 :                         forknum: buf.read_u8()?,
     589              :                     };
     590            0 :                     let n_blocks = buf.read_u32::<BigEndian>()?;
     591            0 :                     Self::Nblocks(PagestreamNblocksResponse {
     592            0 :                         req: PagestreamNblocksRequest {
     593            0 :                             hdr: PagestreamRequest {
     594            0 :                                 reqid,
     595            0 :                                 request_lsn,
     596            0 :                                 not_modified_since,
     597            0 :                             },
     598            0 :                             rel,
     599            0 :                         },
     600            0 :                         n_blocks,
     601            0 :                     })
     602              :                 }
     603              :                 Tag::GetPage => {
     604            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     605            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     606            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     607            0 :                     let rel = RelTag {
     608            0 :                         spcnode: buf.read_u32::<BigEndian>()?,
     609            0 :                         dbnode: buf.read_u32::<BigEndian>()?,
     610            0 :                         relnode: buf.read_u32::<BigEndian>()?,
     611            0 :                         forknum: buf.read_u8()?,
     612              :                     };
     613            0 :                     let blkno = buf.read_u32::<BigEndian>()?;
     614            0 :                     let mut page = vec![0; 8192]; // TODO: use MaybeUninit
     615            0 :                     buf.read_exact(&mut page)?;
     616            0 :                     Self::GetPage(PagestreamGetPageResponse {
     617            0 :                         req: PagestreamGetPageRequest {
     618            0 :                             hdr: PagestreamRequest {
     619            0 :                                 reqid,
     620            0 :                                 request_lsn,
     621            0 :                                 not_modified_since,
     622            0 :                             },
     623            0 :                             rel,
     624            0 :                             blkno,
     625            0 :                         },
     626            0 :                         page: page.into(),
     627            0 :                     })
     628              :                 }
     629              :                 Tag::Error => {
     630            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     631            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     632            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     633            0 :                     let mut msg = Vec::new();
     634            0 :                     buf.read_until(0, &mut msg)?;
     635            0 :                     let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
     636            0 :                     let rust_str = cstring.to_str()?;
     637            0 :                     Self::Error(PagestreamErrorResponse {
     638            0 :                         req: PagestreamRequest {
     639            0 :                             reqid,
     640            0 :                             request_lsn,
     641            0 :                             not_modified_since,
     642            0 :                         },
     643            0 :                         message: rust_str.to_owned(),
     644            0 :                     })
     645              :                 }
     646              :                 Tag::DbSize => {
     647            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     648            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     649            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     650            0 :                     let dbnode = buf.read_u32::<BigEndian>()?;
     651            0 :                     let db_size = buf.read_i64::<BigEndian>()?;
     652            0 :                     Self::DbSize(PagestreamDbSizeResponse {
     653            0 :                         req: PagestreamDbSizeRequest {
     654            0 :                             hdr: PagestreamRequest {
     655            0 :                                 reqid,
     656            0 :                                 request_lsn,
     657            0 :                                 not_modified_since,
     658            0 :                             },
     659            0 :                             dbnode,
     660            0 :                         },
     661            0 :                         db_size,
     662            0 :                     })
     663              :                 }
     664              :                 Tag::GetSlruSegment => {
     665            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     666            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     667            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     668            0 :                     let kind = buf.read_u8()?;
     669            0 :                     let segno = buf.read_u32::<BigEndian>()?;
     670            0 :                     let n_blocks = buf.read_u32::<BigEndian>()?;
     671            0 :                     let mut segment = vec![0; n_blocks as usize * BLCKSZ];
     672            0 :                     buf.read_exact(&mut segment)?;
     673            0 :                     Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
     674            0 :                         req: PagestreamGetSlruSegmentRequest {
     675            0 :                             hdr: PagestreamRequest {
     676            0 :                                 reqid,
     677            0 :                                 request_lsn,
     678            0 :                                 not_modified_since,
     679            0 :                             },
     680            0 :                             kind,
     681            0 :                             segno,
     682            0 :                         },
     683            0 :                         segment: segment.into(),
     684            0 :                     })
     685              :                 }
     686              :                 #[cfg(feature = "testing")]
     687              :                 Tag::Test => {
     688            0 :                     let reqid = buf.read_u64::<BigEndian>()?;
     689            0 :                     let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
     690            0 :                     let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
     691            0 :                     let batch_key = buf.read_u64::<BigEndian>()?;
     692            0 :                     let len = buf.read_u64::<BigEndian>()?;
     693            0 :                     let mut msg = vec![0; len as usize];
     694            0 :                     buf.read_exact(&mut msg)?;
     695            0 :                     let message = String::from_utf8(msg)?;
     696            0 :                     Self::Test(PagestreamTestResponse {
     697            0 :                         req: PagestreamTestRequest {
     698            0 :                             hdr: PagestreamRequest {
     699            0 :                                 reqid,
     700            0 :                                 request_lsn,
     701            0 :                                 not_modified_since,
     702            0 :                             },
     703            0 :                             batch_key,
     704            0 :                             message,
     705            0 :                         },
     706            0 :                     })
     707              :                 }
     708              :             };
     709            0 :         let remaining = buf.into_inner();
     710            0 :         if !remaining.is_empty() {
     711            0 :             anyhow::bail!(
     712            0 :                 "remaining bytes in msg with tag={msg_tag}: {}",
     713            0 :                 remaining.len()
     714              :             );
     715            0 :         }
     716            0 :         Ok(ok)
     717            0 :     }
     718              : 
     719            0 :     pub fn kind(&self) -> &'static str {
     720            0 :         match self {
     721            0 :             Self::Exists(_) => "Exists",
     722            0 :             Self::Nblocks(_) => "Nblocks",
     723            0 :             Self::GetPage(_) => "GetPage",
     724            0 :             Self::Error(_) => "Error",
     725            0 :             Self::DbSize(_) => "DbSize",
     726            0 :             Self::GetSlruSegment(_) => "GetSlruSegment",
     727              :             #[cfg(feature = "testing")]
     728            0 :             Self::Test(_) => "Test",
     729              :         }
     730            0 :     }
     731              : }
     732              : 
     733              : #[cfg(test)]
     734              : mod tests {
     735              :     use super::*;
     736              : 
     737              :     #[test]
     738            1 :     fn test_pagestream() {
     739              :         // Test serialization/deserialization of PagestreamFeMessage
     740            1 :         let messages = vec![
     741            1 :             PagestreamFeMessage::Exists(PagestreamExistsRequest {
     742            1 :                 hdr: PagestreamRequest {
     743            1 :                     reqid: 0,
     744            1 :                     request_lsn: Lsn(4),
     745            1 :                     not_modified_since: Lsn(3),
     746            1 :                 },
     747            1 :                 rel: RelTag {
     748            1 :                     forknum: 1,
     749            1 :                     spcnode: 2,
     750            1 :                     dbnode: 3,
     751            1 :                     relnode: 4,
     752            1 :                 },
     753            1 :             }),
     754            1 :             PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
     755            1 :                 hdr: PagestreamRequest {
     756            1 :                     reqid: 0,
     757            1 :                     request_lsn: Lsn(4),
     758            1 :                     not_modified_since: Lsn(4),
     759            1 :                 },
     760            1 :                 rel: RelTag {
     761            1 :                     forknum: 1,
     762            1 :                     spcnode: 2,
     763            1 :                     dbnode: 3,
     764            1 :                     relnode: 4,
     765            1 :                 },
     766            1 :             }),
     767            1 :             PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     768            1 :                 hdr: PagestreamRequest {
     769            1 :                     reqid: 0,
     770            1 :                     request_lsn: Lsn(4),
     771            1 :                     not_modified_since: Lsn(3),
     772            1 :                 },
     773            1 :                 rel: RelTag {
     774            1 :                     forknum: 1,
     775            1 :                     spcnode: 2,
     776            1 :                     dbnode: 3,
     777            1 :                     relnode: 4,
     778            1 :                 },
     779            1 :                 blkno: 7,
     780            1 :             }),
     781            1 :             PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
     782            1 :                 hdr: PagestreamRequest {
     783            1 :                     reqid: 0,
     784            1 :                     request_lsn: Lsn(4),
     785            1 :                     not_modified_since: Lsn(3),
     786            1 :                 },
     787            1 :                 dbnode: 7,
     788            1 :             }),
     789              :         ];
     790            5 :         for msg in messages {
     791            4 :             let bytes = msg.serialize();
     792            4 :             let reconstructed =
     793            4 :                 PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
     794            4 :                     .unwrap();
     795            4 :             assert!(msg == reconstructed);
     796              :         }
     797            1 :     }
     798              : }
        

Generated by: LCOV version 2.1-beta