LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - wire_format.rs (source / functions) Coverage Total Hit
Test: a2f0f8a80fbf1089336086fa360ce27fa555cb1a.info Lines: 0.0 % 187 0
Test Date: 2024-11-20 17:59:39 Functions: 0.0 % 47 0

            Line data    Source code
       1              : use bytes::{BufMut, Bytes, BytesMut};
       2              : use pageserver_api::key::CompactKey;
       3              : use prost::{DecodeError, EncodeError, Message};
       4              : use tokio::io::AsyncWriteExt;
       5              : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
       6              : use utils::lsn::Lsn;
       7              : use utils::postgres_client::{Compression, InterpretedFormat};
       8              : 
       9              : use crate::models::{
      10              :     FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
      11              : };
      12              : 
      13              : use crate::serialized_batch::{
      14              :     ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
      15              : };
      16              : 
      17              : use crate::models::proto::CompactKey as ProtoCompactKey;
      18              : use crate::models::proto::InterpretedWalRecord as ProtoInterpretedWalRecord;
      19              : use crate::models::proto::InterpretedWalRecords as ProtoInterpretedWalRecords;
      20              : use crate::models::proto::SerializedValueBatch as ProtoSerializedValueBatch;
      21              : use crate::models::proto::ValueMeta as ProtoValueMeta;
      22              : use crate::models::proto::ValueMetaType as ProtoValueMetaType;
      23              : 
      24            0 : #[derive(Debug, thiserror::Error)]
      25              : pub enum ToWireFormatError {
      26              :     #[error("{0}")]
      27              :     Bincode(#[from] SerializeError),
      28              :     #[error("{0}")]
      29              :     Protobuf(#[from] ProtobufSerializeError),
      30              :     #[error("{0}")]
      31              :     Compression(#[from] std::io::Error),
      32              : }
      33              : 
      34            0 : #[derive(Debug, thiserror::Error)]
      35              : pub enum ProtobufSerializeError {
      36              :     #[error("{0}")]
      37              :     MetadataRecord(#[from] SerializeError),
      38              :     #[error("{0}")]
      39              :     Encode(#[from] EncodeError),
      40              : }
      41              : 
      42            0 : #[derive(Debug, thiserror::Error)]
      43              : pub enum FromWireFormatError {
      44              :     #[error("{0}")]
      45              :     Bincode(#[from] DeserializeError),
      46              :     #[error("{0}")]
      47              :     Protobuf(#[from] ProtobufDeserializeError),
      48              :     #[error("{0}")]
      49              :     Decompress(#[from] std::io::Error),
      50              : }
      51              : 
      52            0 : #[derive(Debug, thiserror::Error)]
      53              : pub enum ProtobufDeserializeError {
      54              :     #[error("{0}")]
      55              :     Transcode(#[from] TranscodeError),
      56              :     #[error("{0}")]
      57              :     Decode(#[from] DecodeError),
      58              : }
      59              : 
      60            0 : #[derive(Debug, thiserror::Error)]
      61              : pub enum TranscodeError {
      62              :     #[error("{0}")]
      63              :     BadInput(String),
      64              :     #[error("{0}")]
      65              :     MetadataRecord(#[from] DeserializeError),
      66              : }
      67              : 
      68              : pub trait ToWireFormat {
      69              :     fn to_wire(
      70              :         self,
      71              :         format: InterpretedFormat,
      72              :         compression: Option<Compression>,
      73              :     ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
      74              : }
      75              : 
      76              : pub trait FromWireFormat {
      77              :     type T;
      78              :     fn from_wire(
      79              :         buf: &Bytes,
      80              :         format: InterpretedFormat,
      81              :         compression: Option<Compression>,
      82              :     ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
      83              : }
      84              : 
      85              : impl ToWireFormat for InterpretedWalRecords {
      86            0 :     async fn to_wire(
      87            0 :         self,
      88            0 :         format: InterpretedFormat,
      89            0 :         compression: Option<Compression>,
      90            0 :     ) -> Result<Bytes, ToWireFormatError> {
      91              :         use async_compression::tokio::write::ZstdEncoder;
      92              :         use async_compression::Level;
      93              : 
      94            0 :         let encode_res: Result<Bytes, ToWireFormatError> = match format {
      95              :             InterpretedFormat::Bincode => {
      96            0 :                 let buf = BytesMut::new();
      97            0 :                 let mut buf = buf.writer();
      98            0 :                 self.ser_into(&mut buf)?;
      99            0 :                 Ok(buf.into_inner().freeze())
     100              :             }
     101              :             InterpretedFormat::Protobuf => {
     102            0 :                 let proto: ProtoInterpretedWalRecords = self.try_into()?;
     103            0 :                 let mut buf = BytesMut::new();
     104            0 :                 proto
     105            0 :                     .encode(&mut buf)
     106            0 :                     .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
     107              : 
     108            0 :                 Ok(buf.freeze())
     109              :             }
     110              :         };
     111              : 
     112            0 :         let buf = encode_res?;
     113            0 :         let compressed_buf = match compression {
     114            0 :             Some(Compression::Zstd { level }) => {
     115            0 :                 let mut encoder = ZstdEncoder::with_quality(
     116            0 :                     Vec::with_capacity(buf.len() / 4),
     117            0 :                     Level::Precise(level as i32),
     118            0 :                 );
     119            0 :                 encoder.write_all(&buf).await?;
     120            0 :                 encoder.shutdown().await?;
     121            0 :                 Bytes::from(encoder.into_inner())
     122              :             }
     123            0 :             None => buf,
     124              :         };
     125              : 
     126            0 :         Ok(compressed_buf)
     127            0 :     }
     128              : }
     129              : 
     130              : impl FromWireFormat for InterpretedWalRecords {
     131              :     type T = Self;
     132              : 
     133            0 :     async fn from_wire(
     134            0 :         buf: &Bytes,
     135            0 :         format: InterpretedFormat,
     136            0 :         compression: Option<Compression>,
     137            0 :     ) -> Result<Self, FromWireFormatError> {
     138            0 :         let decompressed_buf = match compression {
     139              :             Some(Compression::Zstd { .. }) => {
     140              :                 use async_compression::tokio::write::ZstdDecoder;
     141            0 :                 let mut decoded_buf = Vec::with_capacity(buf.len());
     142            0 :                 let mut decoder = ZstdDecoder::new(&mut decoded_buf);
     143            0 :                 decoder.write_all(buf).await?;
     144            0 :                 decoder.flush().await?;
     145            0 :                 Bytes::from(decoded_buf)
     146              :             }
     147            0 :             None => buf.clone(),
     148              :         };
     149              : 
     150            0 :         match format {
     151              :             InterpretedFormat::Bincode => {
     152            0 :                 InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
     153              :             }
     154              :             InterpretedFormat::Protobuf => {
     155            0 :                 let proto = ProtoInterpretedWalRecords::decode(decompressed_buf)
     156            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
     157            0 :                 InterpretedWalRecords::try_from(proto)
     158            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))
     159              :             }
     160              :         }
     161            0 :     }
     162              : }
     163              : 
     164              : impl TryFrom<InterpretedWalRecords> for ProtoInterpretedWalRecords {
     165              :     type Error = SerializeError;
     166              : 
     167            0 :     fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
     168            0 :         let records = value
     169            0 :             .records
     170            0 :             .into_iter()
     171            0 :             .map(ProtoInterpretedWalRecord::try_from)
     172            0 :             .collect::<Result<Vec<_>, _>>()?;
     173            0 :         Ok(ProtoInterpretedWalRecords {
     174            0 :             records,
     175            0 :             next_record_lsn: value.next_record_lsn.map(|l| l.0),
     176            0 :         })
     177            0 :     }
     178              : }
     179              : 
     180              : impl TryFrom<InterpretedWalRecord> for ProtoInterpretedWalRecord {
     181              :     type Error = SerializeError;
     182              : 
     183            0 :     fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
     184            0 :         let metadata_record = value
     185            0 :             .metadata_record
     186            0 :             .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
     187            0 :                 let mut buf = Vec::new();
     188            0 :                 meta_rec.ser_into(&mut buf)?;
     189            0 :                 Ok(buf)
     190            0 :             })
     191            0 :             .transpose()?;
     192              : 
     193              :         Ok(ProtoInterpretedWalRecord {
     194            0 :             metadata_record,
     195            0 :             batch: Some(ProtoSerializedValueBatch::from(value.batch)),
     196            0 :             next_record_lsn: value.next_record_lsn.0,
     197            0 :             flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
     198            0 :             xid: value.xid,
     199              :         })
     200            0 :     }
     201              : }
     202              : 
     203              : impl From<SerializedValueBatch> for ProtoSerializedValueBatch {
     204            0 :     fn from(value: SerializedValueBatch) -> Self {
     205            0 :         ProtoSerializedValueBatch {
     206            0 :             raw: value.raw,
     207            0 :             metadata: value
     208            0 :                 .metadata
     209            0 :                 .into_iter()
     210            0 :                 .map(ProtoValueMeta::from)
     211            0 :                 .collect(),
     212            0 :             max_lsn: value.max_lsn.0,
     213            0 :             len: value.len as u64,
     214            0 :         }
     215            0 :     }
     216              : }
     217              : 
     218              : impl From<ValueMeta> for ProtoValueMeta {
     219            0 :     fn from(value: ValueMeta) -> Self {
     220            0 :         match value {
     221            0 :             ValueMeta::Observed(obs) => ProtoValueMeta {
     222            0 :                 r#type: ProtoValueMetaType::Observed.into(),
     223            0 :                 key: Some(ProtoCompactKey::from(obs.key)),
     224            0 :                 lsn: obs.lsn.0,
     225            0 :                 batch_offset: None,
     226            0 :                 len: None,
     227            0 :                 will_init: None,
     228            0 :             },
     229            0 :             ValueMeta::Serialized(ser) => ProtoValueMeta {
     230            0 :                 r#type: ProtoValueMetaType::Serialized.into(),
     231            0 :                 key: Some(ProtoCompactKey::from(ser.key)),
     232            0 :                 lsn: ser.lsn.0,
     233            0 :                 batch_offset: Some(ser.batch_offset),
     234            0 :                 len: Some(ser.len as u64),
     235            0 :                 will_init: Some(ser.will_init),
     236            0 :             },
     237              :         }
     238            0 :     }
     239              : }
     240              : 
     241              : impl From<CompactKey> for ProtoCompactKey {
     242            0 :     fn from(value: CompactKey) -> Self {
     243            0 :         ProtoCompactKey {
     244            0 :             high: (value.raw() >> 64) as i64,
     245            0 :             low: value.raw() as i64,
     246            0 :         }
     247            0 :     }
     248              : }
     249              : 
     250              : impl TryFrom<ProtoInterpretedWalRecords> for InterpretedWalRecords {
     251              :     type Error = TranscodeError;
     252              : 
     253            0 :     fn try_from(value: ProtoInterpretedWalRecords) -> Result<Self, Self::Error> {
     254            0 :         let records = value
     255            0 :             .records
     256            0 :             .into_iter()
     257            0 :             .map(InterpretedWalRecord::try_from)
     258            0 :             .collect::<Result<_, _>>()?;
     259              : 
     260            0 :         Ok(InterpretedWalRecords {
     261            0 :             records,
     262            0 :             next_record_lsn: value.next_record_lsn.map(Lsn::from),
     263            0 :         })
     264            0 :     }
     265              : }
     266              : 
     267              : impl TryFrom<ProtoInterpretedWalRecord> for InterpretedWalRecord {
     268              :     type Error = TranscodeError;
     269              : 
     270            0 :     fn try_from(value: ProtoInterpretedWalRecord) -> Result<Self, Self::Error> {
     271            0 :         let metadata_record = value
     272            0 :             .metadata_record
     273            0 :             .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
     274            0 :             .transpose()?;
     275              : 
     276            0 :         let batch = {
     277            0 :             let batch = value.batch.ok_or_else(|| {
     278            0 :                 TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
     279            0 :             })?;
     280              : 
     281            0 :             SerializedValueBatch::try_from(batch)?
     282              :         };
     283              : 
     284              :         Ok(InterpretedWalRecord {
     285            0 :             metadata_record,
     286            0 :             batch,
     287            0 :             next_record_lsn: Lsn(value.next_record_lsn),
     288            0 :             flush_uncommitted: if value.flush_uncommitted {
     289            0 :                 FlushUncommittedRecords::Yes
     290              :             } else {
     291            0 :                 FlushUncommittedRecords::No
     292              :             },
     293            0 :             xid: value.xid,
     294              :         })
     295            0 :     }
     296              : }
     297              : 
     298              : impl TryFrom<ProtoSerializedValueBatch> for SerializedValueBatch {
     299              :     type Error = TranscodeError;
     300              : 
     301            0 :     fn try_from(value: ProtoSerializedValueBatch) -> Result<Self, Self::Error> {
     302            0 :         let metadata = value
     303            0 :             .metadata
     304            0 :             .into_iter()
     305            0 :             .map(ValueMeta::try_from)
     306            0 :             .collect::<Result<Vec<_>, _>>()?;
     307              : 
     308            0 :         Ok(SerializedValueBatch {
     309            0 :             raw: value.raw,
     310            0 :             metadata,
     311            0 :             max_lsn: Lsn(value.max_lsn),
     312            0 :             len: value.len as usize,
     313            0 :         })
     314            0 :     }
     315              : }
     316              : 
     317              : impl TryFrom<ProtoValueMeta> for ValueMeta {
     318              :     type Error = TranscodeError;
     319              : 
     320            0 :     fn try_from(value: ProtoValueMeta) -> Result<Self, Self::Error> {
     321            0 :         match ProtoValueMetaType::try_from(value.r#type) {
     322              :             Ok(ProtoValueMetaType::Serialized) => Ok(ValueMeta::Serialized(SerializedValueMeta {
     323            0 :                 key: value
     324            0 :                     .key
     325            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
     326            0 :                     .into(),
     327            0 :                 lsn: Lsn(value.lsn),
     328            0 :                 batch_offset: value.batch_offset.ok_or_else(|| {
     329            0 :                     TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
     330            0 :                 })?,
     331            0 :                 len: value
     332            0 :                     .len
     333            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::len missing".to_string()))?
     334              :                     as usize,
     335            0 :                 will_init: value.will_init.ok_or_else(|| {
     336            0 :                     TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
     337            0 :                 })?,
     338              :             })),
     339              :             Ok(ProtoValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
     340            0 :                 key: value
     341            0 :                     .key
     342            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
     343            0 :                     .into(),
     344            0 :                 lsn: Lsn(value.lsn),
     345              :             })),
     346            0 :             Err(_) => Err(TranscodeError::BadInput(format!(
     347            0 :                 "Unexpected ValueMeta::type {}",
     348            0 :                 value.r#type
     349            0 :             ))),
     350              :         }
     351            0 :     }
     352              : }
     353              : 
     354              : impl From<ProtoCompactKey> for CompactKey {
     355            0 :     fn from(value: ProtoCompactKey) -> Self {
     356            0 :         (((value.high as i128) << 64) | (value.low as i128)).into()
     357            0 :     }
     358              : }
        

Generated by: LCOV version 2.1-beta