Line data Source code
1 : use bytes::{BufMut, Bytes, BytesMut};
2 : use pageserver_api::key::CompactKey;
3 : use prost::{DecodeError, EncodeError, Message};
4 : use tokio::io::AsyncWriteExt;
5 : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
6 : use utils::lsn::Lsn;
7 : use utils::postgres_client::{Compression, InterpretedFormat};
8 :
9 : use crate::models::{
10 : FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
11 : };
12 :
13 : use crate::serialized_batch::{
14 : ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
15 : };
16 :
17 : use crate::models::proto::CompactKey as ProtoCompactKey;
18 : use crate::models::proto::InterpretedWalRecord as ProtoInterpretedWalRecord;
19 : use crate::models::proto::InterpretedWalRecords as ProtoInterpretedWalRecords;
20 : use crate::models::proto::SerializedValueBatch as ProtoSerializedValueBatch;
21 : use crate::models::proto::ValueMeta as ProtoValueMeta;
22 : use crate::models::proto::ValueMetaType as ProtoValueMetaType;
23 :
24 0 : #[derive(Debug, thiserror::Error)]
25 : pub enum ToWireFormatError {
26 : #[error("{0}")]
27 : Bincode(#[from] SerializeError),
28 : #[error("{0}")]
29 : Protobuf(#[from] ProtobufSerializeError),
30 : #[error("{0}")]
31 : Compression(#[from] std::io::Error),
32 : }
33 :
34 0 : #[derive(Debug, thiserror::Error)]
35 : pub enum ProtobufSerializeError {
36 : #[error("{0}")]
37 : MetadataRecord(#[from] SerializeError),
38 : #[error("{0}")]
39 : Encode(#[from] EncodeError),
40 : }
41 :
42 0 : #[derive(Debug, thiserror::Error)]
43 : pub enum FromWireFormatError {
44 : #[error("{0}")]
45 : Bincode(#[from] DeserializeError),
46 : #[error("{0}")]
47 : Protobuf(#[from] ProtobufDeserializeError),
48 : #[error("{0}")]
49 : Decompress(#[from] std::io::Error),
50 : }
51 :
52 0 : #[derive(Debug, thiserror::Error)]
53 : pub enum ProtobufDeserializeError {
54 : #[error("{0}")]
55 : Transcode(#[from] TranscodeError),
56 : #[error("{0}")]
57 : Decode(#[from] DecodeError),
58 : }
59 :
60 0 : #[derive(Debug, thiserror::Error)]
61 : pub enum TranscodeError {
62 : #[error("{0}")]
63 : BadInput(String),
64 : #[error("{0}")]
65 : MetadataRecord(#[from] DeserializeError),
66 : }
67 :
68 : pub trait ToWireFormat {
69 : fn to_wire(
70 : self,
71 : format: InterpretedFormat,
72 : compression: Option<Compression>,
73 : ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
74 : }
75 :
76 : pub trait FromWireFormat {
77 : type T;
78 : fn from_wire(
79 : buf: &Bytes,
80 : format: InterpretedFormat,
81 : compression: Option<Compression>,
82 : ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
83 : }
84 :
85 : impl ToWireFormat for InterpretedWalRecords {
86 0 : async fn to_wire(
87 0 : self,
88 0 : format: InterpretedFormat,
89 0 : compression: Option<Compression>,
90 0 : ) -> Result<Bytes, ToWireFormatError> {
91 : use async_compression::tokio::write::ZstdEncoder;
92 : use async_compression::Level;
93 :
94 0 : let encode_res: Result<Bytes, ToWireFormatError> = match format {
95 : InterpretedFormat::Bincode => {
96 0 : let buf = BytesMut::new();
97 0 : let mut buf = buf.writer();
98 0 : self.ser_into(&mut buf)?;
99 0 : Ok(buf.into_inner().freeze())
100 : }
101 : InterpretedFormat::Protobuf => {
102 0 : let proto: ProtoInterpretedWalRecords = self.try_into()?;
103 0 : let mut buf = BytesMut::new();
104 0 : proto
105 0 : .encode(&mut buf)
106 0 : .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
107 :
108 0 : Ok(buf.freeze())
109 : }
110 : };
111 :
112 0 : let buf = encode_res?;
113 0 : let compressed_buf = match compression {
114 0 : Some(Compression::Zstd { level }) => {
115 0 : let mut encoder = ZstdEncoder::with_quality(
116 0 : Vec::with_capacity(buf.len() / 4),
117 0 : Level::Precise(level as i32),
118 0 : );
119 0 : encoder.write_all(&buf).await?;
120 0 : encoder.shutdown().await?;
121 0 : Bytes::from(encoder.into_inner())
122 : }
123 0 : None => buf,
124 : };
125 :
126 0 : Ok(compressed_buf)
127 0 : }
128 : }
129 :
130 : impl FromWireFormat for InterpretedWalRecords {
131 : type T = Self;
132 :
133 0 : async fn from_wire(
134 0 : buf: &Bytes,
135 0 : format: InterpretedFormat,
136 0 : compression: Option<Compression>,
137 0 : ) -> Result<Self, FromWireFormatError> {
138 0 : let decompressed_buf = match compression {
139 : Some(Compression::Zstd { .. }) => {
140 : use async_compression::tokio::write::ZstdDecoder;
141 0 : let mut decoded_buf = Vec::with_capacity(buf.len());
142 0 : let mut decoder = ZstdDecoder::new(&mut decoded_buf);
143 0 : decoder.write_all(buf).await?;
144 0 : decoder.flush().await?;
145 0 : Bytes::from(decoded_buf)
146 : }
147 0 : None => buf.clone(),
148 : };
149 :
150 0 : match format {
151 : InterpretedFormat::Bincode => {
152 0 : InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
153 : }
154 : InterpretedFormat::Protobuf => {
155 0 : let proto = ProtoInterpretedWalRecords::decode(decompressed_buf)
156 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
157 0 : InterpretedWalRecords::try_from(proto)
158 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))
159 : }
160 : }
161 0 : }
162 : }
163 :
164 : impl TryFrom<InterpretedWalRecords> for ProtoInterpretedWalRecords {
165 : type Error = SerializeError;
166 :
167 0 : fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
168 0 : let records = value
169 0 : .records
170 0 : .into_iter()
171 0 : .map(ProtoInterpretedWalRecord::try_from)
172 0 : .collect::<Result<Vec<_>, _>>()?;
173 0 : Ok(ProtoInterpretedWalRecords {
174 0 : records,
175 0 : next_record_lsn: value.next_record_lsn.map(|l| l.0),
176 0 : })
177 0 : }
178 : }
179 :
180 : impl TryFrom<InterpretedWalRecord> for ProtoInterpretedWalRecord {
181 : type Error = SerializeError;
182 :
183 0 : fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
184 0 : let metadata_record = value
185 0 : .metadata_record
186 0 : .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
187 0 : let mut buf = Vec::new();
188 0 : meta_rec.ser_into(&mut buf)?;
189 0 : Ok(buf)
190 0 : })
191 0 : .transpose()?;
192 :
193 : Ok(ProtoInterpretedWalRecord {
194 0 : metadata_record,
195 0 : batch: Some(ProtoSerializedValueBatch::from(value.batch)),
196 0 : next_record_lsn: value.next_record_lsn.0,
197 0 : flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
198 0 : xid: value.xid,
199 : })
200 0 : }
201 : }
202 :
203 : impl From<SerializedValueBatch> for ProtoSerializedValueBatch {
204 0 : fn from(value: SerializedValueBatch) -> Self {
205 0 : ProtoSerializedValueBatch {
206 0 : raw: value.raw,
207 0 : metadata: value
208 0 : .metadata
209 0 : .into_iter()
210 0 : .map(ProtoValueMeta::from)
211 0 : .collect(),
212 0 : max_lsn: value.max_lsn.0,
213 0 : len: value.len as u64,
214 0 : }
215 0 : }
216 : }
217 :
218 : impl From<ValueMeta> for ProtoValueMeta {
219 0 : fn from(value: ValueMeta) -> Self {
220 0 : match value {
221 0 : ValueMeta::Observed(obs) => ProtoValueMeta {
222 0 : r#type: ProtoValueMetaType::Observed.into(),
223 0 : key: Some(ProtoCompactKey::from(obs.key)),
224 0 : lsn: obs.lsn.0,
225 0 : batch_offset: None,
226 0 : len: None,
227 0 : will_init: None,
228 0 : },
229 0 : ValueMeta::Serialized(ser) => ProtoValueMeta {
230 0 : r#type: ProtoValueMetaType::Serialized.into(),
231 0 : key: Some(ProtoCompactKey::from(ser.key)),
232 0 : lsn: ser.lsn.0,
233 0 : batch_offset: Some(ser.batch_offset),
234 0 : len: Some(ser.len as u64),
235 0 : will_init: Some(ser.will_init),
236 0 : },
237 : }
238 0 : }
239 : }
240 :
241 : impl From<CompactKey> for ProtoCompactKey {
242 0 : fn from(value: CompactKey) -> Self {
243 0 : ProtoCompactKey {
244 0 : high: (value.raw() >> 64) as i64,
245 0 : low: value.raw() as i64,
246 0 : }
247 0 : }
248 : }
249 :
250 : impl TryFrom<ProtoInterpretedWalRecords> for InterpretedWalRecords {
251 : type Error = TranscodeError;
252 :
253 0 : fn try_from(value: ProtoInterpretedWalRecords) -> Result<Self, Self::Error> {
254 0 : let records = value
255 0 : .records
256 0 : .into_iter()
257 0 : .map(InterpretedWalRecord::try_from)
258 0 : .collect::<Result<_, _>>()?;
259 :
260 0 : Ok(InterpretedWalRecords {
261 0 : records,
262 0 : next_record_lsn: value.next_record_lsn.map(Lsn::from),
263 0 : })
264 0 : }
265 : }
266 :
267 : impl TryFrom<ProtoInterpretedWalRecord> for InterpretedWalRecord {
268 : type Error = TranscodeError;
269 :
270 0 : fn try_from(value: ProtoInterpretedWalRecord) -> Result<Self, Self::Error> {
271 0 : let metadata_record = value
272 0 : .metadata_record
273 0 : .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
274 0 : .transpose()?;
275 :
276 0 : let batch = {
277 0 : let batch = value.batch.ok_or_else(|| {
278 0 : TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
279 0 : })?;
280 :
281 0 : SerializedValueBatch::try_from(batch)?
282 : };
283 :
284 : Ok(InterpretedWalRecord {
285 0 : metadata_record,
286 0 : batch,
287 0 : next_record_lsn: Lsn(value.next_record_lsn),
288 0 : flush_uncommitted: if value.flush_uncommitted {
289 0 : FlushUncommittedRecords::Yes
290 : } else {
291 0 : FlushUncommittedRecords::No
292 : },
293 0 : xid: value.xid,
294 : })
295 0 : }
296 : }
297 :
298 : impl TryFrom<ProtoSerializedValueBatch> for SerializedValueBatch {
299 : type Error = TranscodeError;
300 :
301 0 : fn try_from(value: ProtoSerializedValueBatch) -> Result<Self, Self::Error> {
302 0 : let metadata = value
303 0 : .metadata
304 0 : .into_iter()
305 0 : .map(ValueMeta::try_from)
306 0 : .collect::<Result<Vec<_>, _>>()?;
307 :
308 0 : Ok(SerializedValueBatch {
309 0 : raw: value.raw,
310 0 : metadata,
311 0 : max_lsn: Lsn(value.max_lsn),
312 0 : len: value.len as usize,
313 0 : })
314 0 : }
315 : }
316 :
317 : impl TryFrom<ProtoValueMeta> for ValueMeta {
318 : type Error = TranscodeError;
319 :
320 0 : fn try_from(value: ProtoValueMeta) -> Result<Self, Self::Error> {
321 0 : match ProtoValueMetaType::try_from(value.r#type) {
322 : Ok(ProtoValueMetaType::Serialized) => Ok(ValueMeta::Serialized(SerializedValueMeta {
323 0 : key: value
324 0 : .key
325 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
326 0 : .into(),
327 0 : lsn: Lsn(value.lsn),
328 0 : batch_offset: value.batch_offset.ok_or_else(|| {
329 0 : TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
330 0 : })?,
331 0 : len: value
332 0 : .len
333 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::len missing".to_string()))?
334 : as usize,
335 0 : will_init: value.will_init.ok_or_else(|| {
336 0 : TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
337 0 : })?,
338 : })),
339 : Ok(ProtoValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
340 0 : key: value
341 0 : .key
342 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
343 0 : .into(),
344 0 : lsn: Lsn(value.lsn),
345 : })),
346 0 : Err(_) => Err(TranscodeError::BadInput(format!(
347 0 : "Unexpected ValueMeta::type {}",
348 0 : value.r#type
349 0 : ))),
350 : }
351 0 : }
352 : }
353 :
354 : impl From<ProtoCompactKey> for CompactKey {
355 0 : fn from(value: ProtoCompactKey) -> Self {
356 0 : (((value.high as i128) << 64) | (value.low as i128)).into()
357 0 : }
358 : }
|