LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - wire_format.rs (source / functions) Coverage Total Hit
Test: 45c9170b95180e9ecfad9a53e031030abf2a178c.info Lines: 24.9 % 241 60
Test Date: 2025-02-21 15:51:08 Functions: 11.1 % 27 3

            Line data    Source code
       1              : use bytes::{BufMut, Bytes, BytesMut};
       2              : use pageserver_api::key::CompactKey;
       3              : use prost::{DecodeError, EncodeError, Message};
       4              : use tokio::io::AsyncWriteExt;
       5              : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
       6              : use utils::lsn::Lsn;
       7              : use utils::postgres_client::{Compression, InterpretedFormat};
       8              : 
       9              : use crate::models::{
      10              :     FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
      11              : };
      12              : 
      13              : use crate::serialized_batch::{
      14              :     ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
      15              : };
      16              : 
      17              : use crate::models::proto;
      18              : 
      19              : #[derive(Debug, thiserror::Error)]
      20              : pub enum ToWireFormatError {
      21              :     #[error("{0}")]
      22              :     Bincode(#[from] SerializeError),
      23              :     #[error("{0}")]
      24              :     Protobuf(#[from] ProtobufSerializeError),
      25              :     #[error("{0}")]
      26              :     Compression(#[from] std::io::Error),
      27              : }
      28              : 
      29              : #[derive(Debug, thiserror::Error)]
      30              : pub enum ProtobufSerializeError {
      31              :     #[error("{0}")]
      32              :     MetadataRecord(#[from] SerializeError),
      33              :     #[error("{0}")]
      34              :     Encode(#[from] EncodeError),
      35              : }
      36              : 
      37              : #[derive(Debug, thiserror::Error)]
      38              : pub enum FromWireFormatError {
      39              :     #[error("{0}")]
      40              :     Bincode(#[from] DeserializeError),
      41              :     #[error("{0}")]
      42              :     Protobuf(#[from] ProtobufDeserializeError),
      43              :     #[error("{0}")]
      44              :     Decompress(#[from] std::io::Error),
      45              : }
      46              : 
      47              : #[derive(Debug, thiserror::Error)]
      48              : pub enum ProtobufDeserializeError {
      49              :     #[error("{0}")]
      50              :     Transcode(#[from] TranscodeError),
      51              :     #[error("{0}")]
      52              :     Decode(#[from] DecodeError),
      53              : }
      54              : 
      55              : #[derive(Debug, thiserror::Error)]
      56              : pub enum TranscodeError {
      57              :     #[error("{0}")]
      58              :     BadInput(String),
      59              :     #[error("{0}")]
      60              :     MetadataRecord(#[from] DeserializeError),
      61              : }
      62              : 
      63              : pub trait ToWireFormat {
      64              :     fn to_wire(
      65              :         self,
      66              :         format: InterpretedFormat,
      67              :         compression: Option<Compression>,
      68              :     ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
      69              : }
      70              : 
      71              : pub trait FromWireFormat {
      72              :     type T;
      73              :     fn from_wire(
      74              :         buf: &Bytes,
      75              :         format: InterpretedFormat,
      76              :         compression: Option<Compression>,
      77              :     ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
      78              : }
      79              : 
      80              : impl ToWireFormat for InterpretedWalRecords {
      81            0 :     async fn to_wire(
      82            0 :         self,
      83            0 :         format: InterpretedFormat,
      84            0 :         compression: Option<Compression>,
      85            0 :     ) -> Result<Bytes, ToWireFormatError> {
      86              :         use async_compression::tokio::write::ZstdEncoder;
      87              :         use async_compression::Level;
      88              : 
      89            0 :         let encode_res: Result<Bytes, ToWireFormatError> = match format {
      90              :             InterpretedFormat::Bincode => {
      91            0 :                 let buf = BytesMut::new();
      92            0 :                 let mut buf = buf.writer();
      93            0 :                 self.ser_into(&mut buf)?;
      94            0 :                 Ok(buf.into_inner().freeze())
      95              :             }
      96              :             InterpretedFormat::Protobuf => {
      97            0 :                 let proto: proto::InterpretedWalRecords = self.try_into()?;
      98            0 :                 let mut buf = BytesMut::new();
      99            0 :                 proto
     100            0 :                     .encode(&mut buf)
     101            0 :                     .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
     102              : 
     103            0 :                 Ok(buf.freeze())
     104              :             }
     105              :         };
     106              : 
     107            0 :         let buf = encode_res?;
     108            0 :         let compressed_buf = match compression {
     109            0 :             Some(Compression::Zstd { level }) => {
     110            0 :                 let mut encoder = ZstdEncoder::with_quality(
     111            0 :                     Vec::with_capacity(buf.len() / 4),
     112            0 :                     Level::Precise(level as i32),
     113            0 :                 );
     114            0 :                 encoder.write_all(&buf).await?;
     115            0 :                 encoder.shutdown().await?;
     116            0 :                 Bytes::from(encoder.into_inner())
     117              :             }
     118            0 :             None => buf,
     119              :         };
     120              : 
     121            0 :         Ok(compressed_buf)
     122            0 :     }
     123              : }
     124              : 
     125              : impl FromWireFormat for InterpretedWalRecords {
     126              :     type T = Self;
     127              : 
     128            0 :     async fn from_wire(
     129            0 :         buf: &Bytes,
     130            0 :         format: InterpretedFormat,
     131            0 :         compression: Option<Compression>,
     132            0 :     ) -> Result<Self, FromWireFormatError> {
     133            0 :         let decompressed_buf = match compression {
     134              :             Some(Compression::Zstd { .. }) => {
     135              :                 use async_compression::tokio::write::ZstdDecoder;
     136            0 :                 let mut decoded_buf = Vec::with_capacity(buf.len());
     137            0 :                 let mut decoder = ZstdDecoder::new(&mut decoded_buf);
     138            0 :                 decoder.write_all(buf).await?;
     139            0 :                 decoder.flush().await?;
     140            0 :                 Bytes::from(decoded_buf)
     141              :             }
     142            0 :             None => buf.clone(),
     143              :         };
     144              : 
     145            0 :         match format {
     146              :             InterpretedFormat::Bincode => {
     147            0 :                 InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
     148              :             }
     149              :             InterpretedFormat::Protobuf => {
     150            0 :                 let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
     151            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
     152            0 :                 InterpretedWalRecords::try_from(proto)
     153            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))
     154              :             }
     155              :         }
     156            0 :     }
     157              : }
     158              : 
     159              : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
     160              :     type Error = SerializeError;
     161              : 
     162            0 :     fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
     163            0 :         let records = value
     164            0 :             .records
     165            0 :             .into_iter()
     166            0 :             .map(proto::InterpretedWalRecord::try_from)
     167            0 :             .collect::<Result<Vec<_>, _>>()?;
     168            0 :         Ok(proto::InterpretedWalRecords {
     169            0 :             records,
     170            0 :             next_record_lsn: Some(value.next_record_lsn.0),
     171            0 :             raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
     172            0 :         })
     173            0 :     }
     174              : }
     175              : 
     176              : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
     177              :     type Error = SerializeError;
     178              : 
     179            0 :     fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
     180            0 :         let metadata_record = value
     181            0 :             .metadata_record
     182            0 :             .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
     183            0 :                 let mut buf = Vec::new();
     184            0 :                 meta_rec.ser_into(&mut buf)?;
     185            0 :                 Ok(buf)
     186            0 :             })
     187            0 :             .transpose()?;
     188              : 
     189              :         Ok(proto::InterpretedWalRecord {
     190            0 :             metadata_record,
     191            0 :             batch: Some(proto::SerializedValueBatch::from(value.batch)),
     192            0 :             next_record_lsn: value.next_record_lsn.0,
     193            0 :             flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
     194            0 :             xid: value.xid,
     195              :         })
     196            0 :     }
     197              : }
     198              : 
     199              : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
     200            0 :     fn from(value: SerializedValueBatch) -> Self {
     201            0 :         proto::SerializedValueBatch {
     202            0 :             raw: value.raw,
     203            0 :             metadata: value
     204            0 :                 .metadata
     205            0 :                 .into_iter()
     206            0 :                 .map(proto::ValueMeta::from)
     207            0 :                 .collect(),
     208            0 :             max_lsn: value.max_lsn.0,
     209            0 :             len: value.len as u64,
     210            0 :         }
     211            0 :     }
     212              : }
     213              : 
     214              : impl From<ValueMeta> for proto::ValueMeta {
     215            0 :     fn from(value: ValueMeta) -> Self {
     216            0 :         match value {
     217            0 :             ValueMeta::Observed(obs) => proto::ValueMeta {
     218            0 :                 r#type: proto::ValueMetaType::Observed.into(),
     219            0 :                 key: Some(proto::CompactKey::from(obs.key)),
     220            0 :                 lsn: obs.lsn.0,
     221            0 :                 batch_offset: None,
     222            0 :                 len: None,
     223            0 :                 will_init: None,
     224            0 :             },
     225            0 :             ValueMeta::Serialized(ser) => proto::ValueMeta {
     226            0 :                 r#type: proto::ValueMetaType::Serialized.into(),
     227            0 :                 key: Some(proto::CompactKey::from(ser.key)),
     228            0 :                 lsn: ser.lsn.0,
     229            0 :                 batch_offset: Some(ser.batch_offset),
     230            0 :                 len: Some(ser.len as u64),
     231            0 :                 will_init: Some(ser.will_init),
     232            0 :             },
     233              :         }
     234            0 :     }
     235              : }
     236              : 
     237              : impl From<CompactKey> for proto::CompactKey {
     238            5 :     fn from(value: CompactKey) -> Self {
     239            5 :         proto::CompactKey {
     240            5 :             high: (value.raw() >> 64) as u64,
     241            5 :             low: value.raw() as u64,
     242            5 :         }
     243            5 :     }
     244              : }
     245              : 
     246              : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
     247              :     type Error = TranscodeError;
     248              : 
     249            0 :     fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
     250            0 :         let records = value
     251            0 :             .records
     252            0 :             .into_iter()
     253            0 :             .map(InterpretedWalRecord::try_from)
     254            0 :             .collect::<Result<_, _>>()?;
     255              : 
     256            0 :         Ok(InterpretedWalRecords {
     257            0 :             records,
     258            0 :             next_record_lsn: value
     259            0 :                 .next_record_lsn
     260            0 :                 .map(Lsn::from)
     261            0 :                 .expect("Always provided"),
     262            0 :             raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
     263            0 :         })
     264            0 :     }
     265              : }
     266              : 
     267              : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
     268              :     type Error = TranscodeError;
     269              : 
     270            0 :     fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
     271            0 :         let metadata_record = value
     272            0 :             .metadata_record
     273            0 :             .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
     274            0 :             .transpose()?;
     275              : 
     276            0 :         let batch = {
     277            0 :             let batch = value.batch.ok_or_else(|| {
     278            0 :                 TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
     279            0 :             })?;
     280              : 
     281            0 :             SerializedValueBatch::try_from(batch)?
     282              :         };
     283              : 
     284              :         Ok(InterpretedWalRecord {
     285            0 :             metadata_record,
     286            0 :             batch,
     287            0 :             next_record_lsn: Lsn(value.next_record_lsn),
     288            0 :             flush_uncommitted: if value.flush_uncommitted {
     289            0 :                 FlushUncommittedRecords::Yes
     290              :             } else {
     291            0 :                 FlushUncommittedRecords::No
     292              :             },
     293            0 :             xid: value.xid,
     294              :         })
     295            0 :     }
     296              : }
     297              : 
     298              : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
     299              :     type Error = TranscodeError;
     300              : 
     301            0 :     fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
     302            0 :         let metadata = value
     303            0 :             .metadata
     304            0 :             .into_iter()
     305            0 :             .map(ValueMeta::try_from)
     306            0 :             .collect::<Result<Vec<_>, _>>()?;
     307              : 
     308            0 :         Ok(SerializedValueBatch {
     309            0 :             raw: value.raw,
     310            0 :             metadata,
     311            0 :             max_lsn: Lsn(value.max_lsn),
     312            0 :             len: value.len as usize,
     313            0 :         })
     314            0 :     }
     315              : }
     316              : 
     317              : impl TryFrom<proto::ValueMeta> for ValueMeta {
     318              :     type Error = TranscodeError;
     319              : 
     320            0 :     fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
     321            0 :         match proto::ValueMetaType::try_from(value.r#type) {
     322              :             Ok(proto::ValueMetaType::Serialized) => {
     323              :                 Ok(ValueMeta::Serialized(SerializedValueMeta {
     324            0 :                     key: value
     325            0 :                         .key
     326            0 :                         .ok_or_else(|| {
     327            0 :                             TranscodeError::BadInput("ValueMeta::key missing".to_string())
     328            0 :                         })?
     329            0 :                         .into(),
     330            0 :                     lsn: Lsn(value.lsn),
     331            0 :                     batch_offset: value.batch_offset.ok_or_else(|| {
     332            0 :                         TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
     333            0 :                     })?,
     334            0 :                     len: value.len.ok_or_else(|| {
     335            0 :                         TranscodeError::BadInput("ValueMeta::len missing".to_string())
     336            0 :                     })? as usize,
     337            0 :                     will_init: value.will_init.ok_or_else(|| {
     338            0 :                         TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
     339            0 :                     })?,
     340              :                 }))
     341              :             }
     342              :             Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
     343            0 :                 key: value
     344            0 :                     .key
     345            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
     346            0 :                     .into(),
     347            0 :                 lsn: Lsn(value.lsn),
     348              :             })),
     349            0 :             Err(_) => Err(TranscodeError::BadInput(format!(
     350            0 :                 "Unexpected ValueMeta::type {}",
     351            0 :                 value.r#type
     352            0 :             ))),
     353              :         }
     354            0 :     }
     355              : }
     356              : 
     357              : impl From<proto::CompactKey> for CompactKey {
     358            5 :     fn from(value: proto::CompactKey) -> Self {
     359            5 :         (((value.high as i128) << 64) | (value.low as i128)).into()
     360            5 :     }
     361              : }
     362              : 
     363              : #[test]
     364            1 : fn test_compact_key_with_large_relnode() {
     365              :     use pageserver_api::key::Key;
     366              : 
     367            1 :     let inputs = vec![
     368            1 :         Key {
     369            1 :             field1: 0,
     370            1 :             field2: 0x100,
     371            1 :             field3: 0x200,
     372            1 :             field4: 0,
     373            1 :             field5: 0x10,
     374            1 :             field6: 0x5,
     375            1 :         },
     376            1 :         Key {
     377            1 :             field1: 0,
     378            1 :             field2: 0x100,
     379            1 :             field3: 0x200,
     380            1 :             field4: 0x007FFFFF,
     381            1 :             field5: 0x10,
     382            1 :             field6: 0x5,
     383            1 :         },
     384            1 :         Key {
     385            1 :             field1: 0,
     386            1 :             field2: 0x100,
     387            1 :             field3: 0x200,
     388            1 :             field4: 0x00800000,
     389            1 :             field5: 0x10,
     390            1 :             field6: 0x5,
     391            1 :         },
     392            1 :         Key {
     393            1 :             field1: 0,
     394            1 :             field2: 0x100,
     395            1 :             field3: 0x200,
     396            1 :             field4: 0x00800001,
     397            1 :             field5: 0x10,
     398            1 :             field6: 0x5,
     399            1 :         },
     400            1 :         Key {
     401            1 :             field1: 0,
     402            1 :             field2: 0xFFFFFFFF,
     403            1 :             field3: 0xFFFFFFFF,
     404            1 :             field4: 0xFFFFFFFF,
     405            1 :             field5: 0x0,
     406            1 :             field6: 0x0,
     407            1 :         },
     408            1 :     ];
     409              : 
     410            6 :     for input in inputs {
     411            5 :         assert!(input.is_valid_key_on_write_path());
     412            5 :         let compact = input.to_compact();
     413            5 :         let proto: proto::CompactKey = compact.into();
     414            5 :         let from_proto: CompactKey = proto.into();
     415            5 : 
     416            5 :         assert_eq!(
     417              :             compact, from_proto,
     418            0 :             "Round trip failed for key with relnode={:#x}",
     419              :             input.field4
     420              :         );
     421              :     }
     422            1 : }
        

Generated by: LCOV version 2.1-beta