|             Line data    Source code 
       1              : use bytes::{BufMut, Bytes, BytesMut};
       2              : use pageserver_api::key::CompactKey;
       3              : use prost::{DecodeError, EncodeError, Message};
       4              : use tokio::io::AsyncWriteExt;
       5              : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
       6              : use utils::lsn::Lsn;
       7              : use utils::postgres_client::{Compression, InterpretedFormat};
       8              : 
       9              : use crate::models::{
      10              :     FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, proto,
      11              : };
      12              : use crate::serialized_batch::{
      13              :     ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
      14              : };
      15              : 
      16              : #[derive(Debug, thiserror::Error)]
      17              : pub enum ToWireFormatError {
      18              :     #[error("{0}")]
      19              :     Bincode(#[from] SerializeError),
      20              :     #[error("{0}")]
      21              :     Protobuf(#[from] ProtobufSerializeError),
      22              :     #[error("{0}")]
      23              :     Compression(#[from] std::io::Error),
      24              : }
      25              : 
      26              : #[derive(Debug, thiserror::Error)]
      27              : pub enum ProtobufSerializeError {
      28              :     #[error("{0}")]
      29              :     MetadataRecord(#[from] SerializeError),
      30              :     #[error("{0}")]
      31              :     Encode(#[from] EncodeError),
      32              : }
      33              : 
      34              : #[derive(Debug, thiserror::Error)]
      35              : pub enum FromWireFormatError {
      36              :     #[error("{0}")]
      37              :     Bincode(#[from] DeserializeError),
      38              :     #[error("{0}")]
      39              :     Protobuf(#[from] ProtobufDeserializeError),
      40              :     #[error("{0}")]
      41              :     Decompress(#[from] std::io::Error),
      42              : }
      43              : 
      44              : #[derive(Debug, thiserror::Error)]
      45              : pub enum ProtobufDeserializeError {
      46              :     #[error("{0}")]
      47              :     Transcode(#[from] TranscodeError),
      48              :     #[error("{0}")]
      49              :     Decode(#[from] DecodeError),
      50              : }
      51              : 
      52              : #[derive(Debug, thiserror::Error)]
      53              : pub enum TranscodeError {
      54              :     #[error("{0}")]
      55              :     BadInput(String),
      56              :     #[error("{0}")]
      57              :     MetadataRecord(#[from] DeserializeError),
      58              : }
      59              : 
      60              : pub trait ToWireFormat {
      61              :     fn to_wire(
      62              :         self,
      63              :         format: InterpretedFormat,
      64              :         compression: Option<Compression>,
      65              :     ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
      66              : }
      67              : 
      68              : pub trait FromWireFormat {
      69              :     type T;
      70              :     fn from_wire(
      71              :         buf: &Bytes,
      72              :         format: InterpretedFormat,
      73              :         compression: Option<Compression>,
      74              :     ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
      75              : }
      76              : 
      77              : impl ToWireFormat for InterpretedWalRecords {
      78            0 :     async fn to_wire(
      79            0 :         self,
      80            0 :         format: InterpretedFormat,
      81            0 :         compression: Option<Compression>,
      82            0 :     ) -> Result<Bytes, ToWireFormatError> {
      83              :         use async_compression::Level;
      84              :         use async_compression::tokio::write::ZstdEncoder;
      85              : 
      86            0 :         let encode_res: Result<Bytes, ToWireFormatError> = match format {
      87              :             InterpretedFormat::Bincode => {
      88            0 :                 let buf = BytesMut::new();
      89            0 :                 let mut buf = buf.writer();
      90            0 :                 self.ser_into(&mut buf)?;
      91            0 :                 Ok(buf.into_inner().freeze())
      92              :             }
      93              :             InterpretedFormat::Protobuf => {
      94            0 :                 let proto: proto::InterpretedWalRecords = self.try_into()?;
      95            0 :                 let mut buf = BytesMut::new();
      96            0 :                 proto
      97            0 :                     .encode(&mut buf)
      98            0 :                     .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
      99              : 
     100            0 :                 Ok(buf.freeze())
     101              :             }
     102              :         };
     103              : 
     104            0 :         let buf = encode_res?;
     105            0 :         let compressed_buf = match compression {
     106            0 :             Some(Compression::Zstd { level }) => {
     107            0 :                 let mut encoder = ZstdEncoder::with_quality(
     108            0 :                     Vec::with_capacity(buf.len() / 4),
     109            0 :                     Level::Precise(level as i32),
     110              :                 );
     111            0 :                 encoder.write_all(&buf).await?;
     112            0 :                 encoder.shutdown().await?;
     113            0 :                 Bytes::from(encoder.into_inner())
     114              :             }
     115            0 :             None => buf,
     116              :         };
     117              : 
     118            0 :         Ok(compressed_buf)
     119            0 :     }
     120              : }
     121              : 
     122              : impl FromWireFormat for InterpretedWalRecords {
     123              :     type T = Self;
     124              : 
     125            0 :     async fn from_wire(
     126            0 :         buf: &Bytes,
     127            0 :         format: InterpretedFormat,
     128            0 :         compression: Option<Compression>,
     129            0 :     ) -> Result<Self, FromWireFormatError> {
     130            0 :         let decompressed_buf = match compression {
     131              :             Some(Compression::Zstd { .. }) => {
     132              :                 use async_compression::tokio::write::ZstdDecoder;
     133            0 :                 let mut decoded_buf = Vec::with_capacity(buf.len());
     134            0 :                 let mut decoder = ZstdDecoder::new(&mut decoded_buf);
     135            0 :                 decoder.write_all(buf).await?;
     136            0 :                 decoder.flush().await?;
     137            0 :                 Bytes::from(decoded_buf)
     138              :             }
     139            0 :             None => buf.clone(),
     140              :         };
     141              : 
     142            0 :         match format {
     143              :             InterpretedFormat::Bincode => {
     144            0 :                 InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
     145              :             }
     146              :             InterpretedFormat::Protobuf => {
     147            0 :                 let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
     148            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
     149            0 :                 InterpretedWalRecords::try_from(proto)
     150            0 :                     .map_err(|e| FromWireFormatError::Protobuf(e.into()))
     151              :             }
     152              :         }
     153            0 :     }
     154              : }
     155              : 
     156              : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
     157              :     type Error = SerializeError;
     158              : 
     159            0 :     fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
     160            0 :         let records = value
     161            0 :             .records
     162            0 :             .into_iter()
     163            0 :             .map(proto::InterpretedWalRecord::try_from)
     164            0 :             .collect::<Result<Vec<_>, _>>()?;
     165              :         Ok(proto::InterpretedWalRecords {
     166            0 :             records,
     167            0 :             next_record_lsn: Some(value.next_record_lsn.0),
     168            0 :             raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
     169              :         })
     170            0 :     }
     171              : }
     172              : 
     173              : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
     174              :     type Error = SerializeError;
     175              : 
     176            0 :     fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
     177            0 :         let metadata_record = value
     178            0 :             .metadata_record
     179            0 :             .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
     180            0 :                 let mut buf = Vec::new();
     181            0 :                 meta_rec.ser_into(&mut buf)?;
     182            0 :                 Ok(buf)
     183            0 :             })
     184            0 :             .transpose()?;
     185              : 
     186              :         Ok(proto::InterpretedWalRecord {
     187            0 :             metadata_record,
     188            0 :             batch: Some(proto::SerializedValueBatch::from(value.batch)),
     189            0 :             next_record_lsn: value.next_record_lsn.0,
     190            0 :             flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
     191            0 :             xid: value.xid,
     192              :         })
     193            0 :     }
     194              : }
     195              : 
     196              : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
     197            0 :     fn from(value: SerializedValueBatch) -> Self {
     198            0 :         proto::SerializedValueBatch {
     199            0 :             raw: value.raw,
     200            0 :             metadata: value
     201            0 :                 .metadata
     202            0 :                 .into_iter()
     203            0 :                 .map(proto::ValueMeta::from)
     204            0 :                 .collect(),
     205            0 :             max_lsn: value.max_lsn.0,
     206            0 :             len: value.len as u64,
     207            0 :         }
     208            0 :     }
     209              : }
     210              : 
     211              : impl From<ValueMeta> for proto::ValueMeta {
     212            0 :     fn from(value: ValueMeta) -> Self {
     213            0 :         match value {
     214            0 :             ValueMeta::Observed(obs) => proto::ValueMeta {
     215            0 :                 r#type: proto::ValueMetaType::Observed.into(),
     216            0 :                 key: Some(proto::CompactKey::from(obs.key)),
     217            0 :                 lsn: obs.lsn.0,
     218            0 :                 batch_offset: None,
     219            0 :                 len: None,
     220            0 :                 will_init: None,
     221            0 :             },
     222            0 :             ValueMeta::Serialized(ser) => proto::ValueMeta {
     223            0 :                 r#type: proto::ValueMetaType::Serialized.into(),
     224            0 :                 key: Some(proto::CompactKey::from(ser.key)),
     225            0 :                 lsn: ser.lsn.0,
     226            0 :                 batch_offset: Some(ser.batch_offset),
     227            0 :                 len: Some(ser.len as u64),
     228            0 :                 will_init: Some(ser.will_init),
     229            0 :             },
     230              :         }
     231            0 :     }
     232              : }
     233              : 
     234              : impl From<CompactKey> for proto::CompactKey {
     235            5 :     fn from(value: CompactKey) -> Self {
     236            5 :         proto::CompactKey {
     237            5 :             high: (value.raw() >> 64) as u64,
     238            5 :             low: value.raw() as u64,
     239            5 :         }
     240            5 :     }
     241              : }
     242              : 
     243              : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
     244              :     type Error = TranscodeError;
     245              : 
     246            0 :     fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
     247            0 :         let records = value
     248            0 :             .records
     249            0 :             .into_iter()
     250            0 :             .map(InterpretedWalRecord::try_from)
     251            0 :             .collect::<Result<_, _>>()?;
     252              : 
     253            0 :         Ok(InterpretedWalRecords {
     254            0 :             records,
     255            0 :             next_record_lsn: value
     256            0 :                 .next_record_lsn
     257            0 :                 .map(Lsn::from)
     258            0 :                 .expect("Always provided"),
     259            0 :             raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
     260            0 :         })
     261            0 :     }
     262              : }
     263              : 
     264              : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
     265              :     type Error = TranscodeError;
     266              : 
     267            0 :     fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
     268            0 :         let metadata_record = value
     269            0 :             .metadata_record
     270            0 :             .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
     271            0 :             .transpose()?;
     272              : 
     273            0 :         let batch = {
     274            0 :             let batch = value.batch.ok_or_else(|| {
     275            0 :                 TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
     276            0 :             })?;
     277              : 
     278            0 :             SerializedValueBatch::try_from(batch)?
     279              :         };
     280              : 
     281              :         Ok(InterpretedWalRecord {
     282            0 :             metadata_record,
     283            0 :             batch,
     284            0 :             next_record_lsn: Lsn(value.next_record_lsn),
     285            0 :             flush_uncommitted: if value.flush_uncommitted {
     286            0 :                 FlushUncommittedRecords::Yes
     287              :             } else {
     288            0 :                 FlushUncommittedRecords::No
     289              :             },
     290            0 :             xid: value.xid,
     291              :         })
     292            0 :     }
     293              : }
     294              : 
     295              : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
     296              :     type Error = TranscodeError;
     297              : 
     298            0 :     fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
     299            0 :         let metadata = value
     300            0 :             .metadata
     301            0 :             .into_iter()
     302            0 :             .map(ValueMeta::try_from)
     303            0 :             .collect::<Result<Vec<_>, _>>()?;
     304              : 
     305            0 :         Ok(SerializedValueBatch {
     306            0 :             raw: value.raw,
     307            0 :             metadata,
     308            0 :             max_lsn: Lsn(value.max_lsn),
     309            0 :             len: value.len as usize,
     310            0 :         })
     311            0 :     }
     312              : }
     313              : 
     314              : impl TryFrom<proto::ValueMeta> for ValueMeta {
     315              :     type Error = TranscodeError;
     316              : 
     317            0 :     fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
     318            0 :         match proto::ValueMetaType::try_from(value.r#type) {
     319              :             Ok(proto::ValueMetaType::Serialized) => {
     320              :                 Ok(ValueMeta::Serialized(SerializedValueMeta {
     321            0 :                     key: value
     322            0 :                         .key
     323            0 :                         .ok_or_else(|| {
     324            0 :                             TranscodeError::BadInput("ValueMeta::key missing".to_string())
     325            0 :                         })?
     326            0 :                         .into(),
     327            0 :                     lsn: Lsn(value.lsn),
     328            0 :                     batch_offset: value.batch_offset.ok_or_else(|| {
     329            0 :                         TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
     330            0 :                     })?,
     331            0 :                     len: value.len.ok_or_else(|| {
     332            0 :                         TranscodeError::BadInput("ValueMeta::len missing".to_string())
     333            0 :                     })? as usize,
     334            0 :                     will_init: value.will_init.ok_or_else(|| {
     335            0 :                         TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
     336            0 :                     })?,
     337              :                 }))
     338              :             }
     339              :             Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
     340            0 :                 key: value
     341            0 :                     .key
     342            0 :                     .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
     343            0 :                     .into(),
     344            0 :                 lsn: Lsn(value.lsn),
     345              :             })),
     346            0 :             Err(_) => Err(TranscodeError::BadInput(format!(
     347            0 :                 "Unexpected ValueMeta::type {}",
     348            0 :                 value.r#type
     349            0 :             ))),
     350              :         }
     351            0 :     }
     352              : }
     353              : 
     354              : impl From<proto::CompactKey> for CompactKey {
     355            5 :     fn from(value: proto::CompactKey) -> Self {
     356            5 :         (((value.high as i128) << 64) | (value.low as i128)).into()
     357            5 :     }
     358              : }
     359              : 
     360              : #[test]
     361            1 : fn test_compact_key_with_large_relnode() {
     362              :     use pageserver_api::key::Key;
     363              : 
     364            1 :     let inputs = vec![
     365            1 :         Key {
     366            1 :             field1: 0,
     367            1 :             field2: 0x100,
     368            1 :             field3: 0x200,
     369            1 :             field4: 0,
     370            1 :             field5: 0x10,
     371            1 :             field6: 0x5,
     372            1 :         },
     373            1 :         Key {
     374            1 :             field1: 0,
     375            1 :             field2: 0x100,
     376            1 :             field3: 0x200,
     377            1 :             field4: 0x007FFFFF,
     378            1 :             field5: 0x10,
     379            1 :             field6: 0x5,
     380            1 :         },
     381            1 :         Key {
     382            1 :             field1: 0,
     383            1 :             field2: 0x100,
     384            1 :             field3: 0x200,
     385            1 :             field4: 0x00800000,
     386            1 :             field5: 0x10,
     387            1 :             field6: 0x5,
     388            1 :         },
     389            1 :         Key {
     390            1 :             field1: 0,
     391            1 :             field2: 0x100,
     392            1 :             field3: 0x200,
     393            1 :             field4: 0x00800001,
     394            1 :             field5: 0x10,
     395            1 :             field6: 0x5,
     396            1 :         },
     397            1 :         Key {
     398            1 :             field1: 0,
     399            1 :             field2: 0xFFFFFFFF,
     400            1 :             field3: 0xFFFFFFFF,
     401            1 :             field4: 0xFFFFFFFF,
     402            1 :             field5: 0x0,
     403            1 :             field6: 0x0,
     404            1 :         },
     405              :     ];
     406              : 
     407            6 :     for input in inputs {
     408            5 :         assert!(input.is_valid_key_on_write_path());
     409            5 :         let compact = input.to_compact();
     410            5 :         let proto: proto::CompactKey = compact.into();
     411            5 :         let from_proto: CompactKey = proto.into();
     412              : 
     413            5 :         assert_eq!(
     414              :             compact, from_proto,
     415            0 :             "Round trip failed for key with relnode={:#x}",
     416              :             input.field4
     417              :         );
     418              :     }
     419            1 : }
         |