LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - wire_format.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 25.4 % 236 60
Test Date: 2025-02-20 13:11:02 Functions: 11.1 % 27 3

            Line data    Source code
       1              : use bytes::{BufMut, Bytes, BytesMut};
       2              : use pageserver_api::key::CompactKey;
       3              : use prost::{DecodeError, EncodeError, Message};
       4              : use tokio::io::AsyncWriteExt;
       5              : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
       6              : use utils::lsn::Lsn;
       7              : use utils::postgres_client::{Compression, InterpretedFormat};
       8              : 
       9              : use crate::models::{
      10              :     FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
      11              : };
      12              : 
      13              : use crate::serialized_batch::{
      14              :     ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
      15              : };
      16              : 
      17              : use crate::models::proto;
      18              : 
      19              : #[derive(Debug, thiserror::Error)]
      20              : pub enum ToWireFormatError {
      21              :     #[error("{0}")]
      22              :     Bincode(#[from] SerializeError),
      23              :     #[error("{0}")]
      24              :     Protobuf(#[from] ProtobufSerializeError),
      25              :     #[error("{0}")]
      26              :     Compression(#[from] std::io::Error),
      27              : }
      28              : 
      29              : #[derive(Debug, thiserror::Error)]
      30              : pub enum ProtobufSerializeError {
      31              :     #[error("{0}")]
      32              :     MetadataRecord(#[from] SerializeError),
      33              :     #[error("{0}")]
      34              :     Encode(#[from] EncodeError),
      35              : }
      36              : 
      37              : #[derive(Debug, thiserror::Error)]
      38              : pub enum FromWireFormatError {
      39              :     #[error("{0}")]
      40              :     Bincode(#[from] DeserializeError),
      41              :     #[error("{0}")]
      42              :     Protobuf(#[from] ProtobufDeserializeError),
      43              :     #[error("{0}")]
      44              :     Decompress(#[from] std::io::Error),
      45              : }
      46              : 
      47              : #[derive(Debug, thiserror::Error)]
      48              : pub enum ProtobufDeserializeError {
      49              :     #[error("{0}")]
      50              :     Transcode(#[from] TranscodeError),
      51              :     #[error("{0}")]
      52              :     Decode(#[from] DecodeError),
      53              : }
      54              : 
      55              : #[derive(Debug, thiserror::Error)]
      56              : pub enum TranscodeError {
      57              :     #[error("{0}")]
      58              :     BadInput(String),
      59              :     #[error("{0}")]
      60              :     MetadataRecord(#[from] DeserializeError),
      61              : }
      62              : 
      63              : pub trait ToWireFormat {
      64              :     fn to_wire(
      65              :         self,
      66              :         format: InterpretedFormat,
      67              :         compression: Option<Compression>,
      68              :     ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
      69              : }
      70              : 
      71              : pub trait FromWireFormat {
      72              :     type T;
      73              :     fn from_wire(
      74              :         buf: &Bytes,
      75              :         format: InterpretedFormat,
      76              :         compression: Option<Compression>,
      77              :     ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
      78              : }
      79              : 
      80              : impl ToWireFormat for InterpretedWalRecords {
      81            0 :     async fn to_wire(
      82            0 :         self,
      83            0 :         format: InterpretedFormat,
      84            0 :         compression: Option<Compression>,
      85            0 :     ) -> Result<Bytes, ToWireFormatError> {
      86              :         use async_compression::tokio::write::ZstdEncoder;
      87              :         use async_compression::Level;
      88              : 
      89            0 :         let encode_res: Result<Bytes, ToWireFormatError> = match format {
      90              :             InterpretedFormat::Bincode => {
      91            0 :                 let buf = BytesMut::new();
      92            0 :                 let mut buf = buf.writer();
      93            0 :                 self.ser_into(&mut buf)?;
      94            0 :                 Ok(buf.into_inner().freeze())
      95              :             }
      96              :             InterpretedFormat::Protobuf => {
      97            0 :                 let proto: proto::InterpretedWalRecords = self.try_into()?;
      98            0 :                 let mut buf = BytesMut::new();
      99            0 :                 proto
     100            0 :                     .encode(&mut buf)
     101            0 :                     .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
     102              : 
     103            0 :                 Ok(buf.freeze())
     104              :             }
     105              :         };
     106              : 
     107            0 :         let buf = encode_res?;
     108            0 :         let compressed_buf = match compression {
     109            0 :             Some(Compression::Zstd { level }) => {
     110            0 :                 let mut encoder = ZstdEncoder::with_quality(
     111            0 :                     Vec::with_capacity(buf.len() / 4),
     112            0 :                     Level::Precise(level as i32),
     113            0 :                 );
     114            0 :                 encoder.write_all(&buf).await?;
     115            0 :                 encoder.shutdown().await?;
     116            0 :                 Bytes::from(encoder.into_inner())
     117              :             }
     118            0 :             None => buf,
     119              :         };
     120              : 
     121            0 :         Ok(compressed_buf)
     122            0 :     }
     123              : }
     124              : 
     125              : impl FromWireFormat for InterpretedWalRecords {
     126              :     type T = Self;
     127              : 
     128            0 :     async fn from_wire(
     129            0 :         buf: &Bytes,
     130            0 :         format: InterpretedFormat,
     131            0 :         compression: Option<Compression>,
     132            0 :     ) -> Result<Self, FromWireFormatError> {
     133            0 :         let decompressed_buf = match compression {
     134              :             Some(Compression::Zstd { .. }) => {
     135              :                 use async_compression::tokio::write::ZstdDecoder;
     136            0 :                 let mut decoded_buf = Vec::with_capacity(buf.len());
     137            0 :                 let mut decoder = ZstdDecoder::new(&mut decoded_buf);
     138            0 :                 decoder.write_all(buf).await?;
     139            0 :                 decoder.flush().await?;
     140            0 :                 Bytes::from(decoded_buf)
     141              :             }
     142            0 :             None => buf.clone(),
     143              :         };
     144              : 
     145            0 :         match format {
     146              :             InterpretedFormat::Bincode => {
     147            0 :                 InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
     148              :             }
     149              :             InterpretedFormat::Protobuf => {
     150            0 :                 let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
     151            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
     152            0 :                 InterpretedWalRecords::try_from(proto)
     153            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))
     154              :             }
     155              :         }
     156            0 :     }
     157              : }
     158              : 
     159              : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
     160              :     type Error = SerializeError;
     161              : 
     162            0 :     fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
     163            0 :         let records = value
     164            0 :             .records
     165            0 :             .into_iter()
     166            0 :             .map(proto::InterpretedWalRecord::try_from)
     167            0 :             .collect::<Result<Vec<_>, _>>()?;
     168            0 :         Ok(proto::InterpretedWalRecords {
     169            0 :             records,
     170            0 :             next_record_lsn: value.next_record_lsn.map(|l| l.0),
     171            0 :         })
     172            0 :     }
     173              : }
     174              : 
     175              : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
     176              :     type Error = SerializeError;
     177              : 
     178            0 :     fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
     179            0 :         let metadata_record = value
     180            0 :             .metadata_record
     181            0 :             .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
     182            0 :                 let mut buf = Vec::new();
     183            0 :                 meta_rec.ser_into(&mut buf)?;
     184            0 :                 Ok(buf)
     185            0 :             })
     186            0 :             .transpose()?;
     187              : 
     188              :         Ok(proto::InterpretedWalRecord {
     189            0 :             metadata_record,
     190            0 :             batch: Some(proto::SerializedValueBatch::from(value.batch)),
     191            0 :             next_record_lsn: value.next_record_lsn.0,
     192            0 :             flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
     193            0 :             xid: value.xid,
     194              :         })
     195            0 :     }
     196              : }
     197              : 
     198              : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
     199            0 :     fn from(value: SerializedValueBatch) -> Self {
     200            0 :         proto::SerializedValueBatch {
     201            0 :             raw: value.raw,
     202            0 :             metadata: value
     203            0 :                 .metadata
     204            0 :                 .into_iter()
     205            0 :                 .map(proto::ValueMeta::from)
     206            0 :                 .collect(),
     207            0 :             max_lsn: value.max_lsn.0,
     208            0 :             len: value.len as u64,
     209            0 :         }
     210            0 :     }
     211              : }
     212              : 
     213              : impl From<ValueMeta> for proto::ValueMeta {
     214            0 :     fn from(value: ValueMeta) -> Self {
     215            0 :         match value {
     216            0 :             ValueMeta::Observed(obs) => proto::ValueMeta {
     217            0 :                 r#type: proto::ValueMetaType::Observed.into(),
     218            0 :                 key: Some(proto::CompactKey::from(obs.key)),
     219            0 :                 lsn: obs.lsn.0,
     220            0 :                 batch_offset: None,
     221            0 :                 len: None,
     222            0 :                 will_init: None,
     223            0 :             },
     224            0 :             ValueMeta::Serialized(ser) => proto::ValueMeta {
     225            0 :                 r#type: proto::ValueMetaType::Serialized.into(),
     226            0 :                 key: Some(proto::CompactKey::from(ser.key)),
     227            0 :                 lsn: ser.lsn.0,
     228            0 :                 batch_offset: Some(ser.batch_offset),
     229            0 :                 len: Some(ser.len as u64),
     230            0 :                 will_init: Some(ser.will_init),
     231            0 :             },
     232              :         }
     233            0 :     }
     234              : }
     235              : 
     236              : impl From<CompactKey> for proto::CompactKey {
     237            5 :     fn from(value: CompactKey) -> Self {
     238            5 :         proto::CompactKey {
     239            5 :             high: (value.raw() >> 64) as u64,
     240            5 :             low: value.raw() as u64,
     241            5 :         }
     242            5 :     }
     243              : }
     244              : 
     245              : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
     246              :     type Error = TranscodeError;
     247              : 
     248            0 :     fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
     249            0 :         let records = value
     250            0 :             .records
     251            0 :             .into_iter()
     252            0 :             .map(InterpretedWalRecord::try_from)
     253            0 :             .collect::<Result<_, _>>()?;
     254              : 
     255            0 :         Ok(InterpretedWalRecords {
     256            0 :             records,
     257            0 :             next_record_lsn: value.next_record_lsn.map(Lsn::from),
     258            0 :         })
     259            0 :     }
     260              : }
     261              : 
     262              : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
     263              :     type Error = TranscodeError;
     264              : 
     265            0 :     fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
     266            0 :         let metadata_record = value
     267            0 :             .metadata_record
     268            0 :             .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
     269            0 :             .transpose()?;
     270              : 
     271            0 :         let batch = {
     272            0 :             let batch = value.batch.ok_or_else(|| {
     273            0 :                 TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
     274            0 :             })?;
     275              : 
     276            0 :             SerializedValueBatch::try_from(batch)?
     277              :         };
     278              : 
     279              :         Ok(InterpretedWalRecord {
     280            0 :             metadata_record,
     281            0 :             batch,
     282            0 :             next_record_lsn: Lsn(value.next_record_lsn),
     283            0 :             flush_uncommitted: if value.flush_uncommitted {
     284            0 :                 FlushUncommittedRecords::Yes
     285              :             } else {
     286            0 :                 FlushUncommittedRecords::No
     287              :             },
     288            0 :             xid: value.xid,
     289              :         })
     290            0 :     }
     291              : }
     292              : 
     293              : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
     294              :     type Error = TranscodeError;
     295              : 
     296            0 :     fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
     297            0 :         let metadata = value
     298            0 :             .metadata
     299            0 :             .into_iter()
     300            0 :             .map(ValueMeta::try_from)
     301            0 :             .collect::<Result<Vec<_>, _>>()?;
     302              : 
     303            0 :         Ok(SerializedValueBatch {
     304            0 :             raw: value.raw,
     305            0 :             metadata,
     306            0 :             max_lsn: Lsn(value.max_lsn),
     307            0 :             len: value.len as usize,
     308            0 :         })
     309            0 :     }
     310              : }
     311              : 
     312              : impl TryFrom<proto::ValueMeta> for ValueMeta {
     313              :     type Error = TranscodeError;
     314              : 
     315            0 :     fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
     316            0 :         match proto::ValueMetaType::try_from(value.r#type) {
     317              :             Ok(proto::ValueMetaType::Serialized) => {
     318              :                 Ok(ValueMeta::Serialized(SerializedValueMeta {
     319            0 :                     key: value
     320            0 :                         .key
     321            0 :                         .ok_or_else(|| {
     322            0 :                             TranscodeError::BadInput("ValueMeta::key missing".to_string())
     323            0 :                         })?
     324            0 :                         .into(),
     325            0 :                     lsn: Lsn(value.lsn),
     326            0 :                     batch_offset: value.batch_offset.ok_or_else(|| {
     327            0 :                         TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
     328            0 :                     })?,
     329            0 :                     len: value.len.ok_or_else(|| {
     330            0 :                         TranscodeError::BadInput("ValueMeta::len missing".to_string())
     331            0 :                     })? as usize,
     332            0 :                     will_init: value.will_init.ok_or_else(|| {
     333            0 :                         TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
     334            0 :                     })?,
     335              :                 }))
     336              :             }
     337              :             Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
     338            0 :                 key: value
     339            0 :                     .key
     340            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
     341            0 :                     .into(),
     342            0 :                 lsn: Lsn(value.lsn),
     343              :             })),
     344            0 :             Err(_) => Err(TranscodeError::BadInput(format!(
     345            0 :                 "Unexpected ValueMeta::type {}",
     346            0 :                 value.r#type
     347            0 :             ))),
     348              :         }
     349            0 :     }
     350              : }
     351              : 
     352              : impl From<proto::CompactKey> for CompactKey {
     353            5 :     fn from(value: proto::CompactKey) -> Self {
     354            5 :         (((value.high as i128) << 64) | (value.low as i128)).into()
     355            5 :     }
     356              : }
     357              : 
     358              : #[test]
     359            1 : fn test_compact_key_with_large_relnode() {
     360              :     use pageserver_api::key::Key;
     361              : 
     362            1 :     let inputs = vec![
     363            1 :         Key {
     364            1 :             field1: 0,
     365            1 :             field2: 0x100,
     366            1 :             field3: 0x200,
     367            1 :             field4: 0,
     368            1 :             field5: 0x10,
     369            1 :             field6: 0x5,
     370            1 :         },
     371            1 :         Key {
     372            1 :             field1: 0,
     373            1 :             field2: 0x100,
     374            1 :             field3: 0x200,
     375            1 :             field4: 0x007FFFFF,
     376            1 :             field5: 0x10,
     377            1 :             field6: 0x5,
     378            1 :         },
     379            1 :         Key {
     380            1 :             field1: 0,
     381            1 :             field2: 0x100,
     382            1 :             field3: 0x200,
     383            1 :             field4: 0x00800000,
     384            1 :             field5: 0x10,
     385            1 :             field6: 0x5,
     386            1 :         },
     387            1 :         Key {
     388            1 :             field1: 0,
     389            1 :             field2: 0x100,
     390            1 :             field3: 0x200,
     391            1 :             field4: 0x00800001,
     392            1 :             field5: 0x10,
     393            1 :             field6: 0x5,
     394            1 :         },
     395            1 :         Key {
     396            1 :             field1: 0,
     397            1 :             field2: 0xFFFFFFFF,
     398            1 :             field3: 0xFFFFFFFF,
     399            1 :             field4: 0xFFFFFFFF,
     400            1 :             field5: 0x0,
     401            1 :             field6: 0x0,
     402            1 :         },
     403            1 :     ];
     404              : 
     405            6 :     for input in inputs {
     406            5 :         assert!(input.is_valid_key_on_write_path());
     407            5 :         let compact = input.to_compact();
     408            5 :         let proto: proto::CompactKey = compact.into();
     409            5 :         let from_proto: CompactKey = proto.into();
     410            5 : 
     411            5 :         assert_eq!(
     412              :             compact, from_proto,
     413            0 :             "Round trip failed for key with relnode={:#x}",
     414              :             input.field4
     415              :         );
     416              :     }
     417            1 : }
        

Generated by: LCOV version 2.1-beta