Line data Source code
1 : use bytes::{BufMut, Bytes, BytesMut};
2 : use pageserver_api::key::CompactKey;
3 : use prost::{DecodeError, EncodeError, Message};
4 : use tokio::io::AsyncWriteExt;
5 : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
6 : use utils::lsn::Lsn;
7 : use utils::postgres_client::{Compression, InterpretedFormat};
8 :
9 : use crate::models::{
10 : FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
11 : };
12 :
13 : use crate::serialized_batch::{
14 : ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
15 : };
16 :
17 : use crate::models::proto;
18 :
19 : #[derive(Debug, thiserror::Error)]
20 : pub enum ToWireFormatError {
21 : #[error("{0}")]
22 : Bincode(#[from] SerializeError),
23 : #[error("{0}")]
24 : Protobuf(#[from] ProtobufSerializeError),
25 : #[error("{0}")]
26 : Compression(#[from] std::io::Error),
27 : }
28 :
29 : #[derive(Debug, thiserror::Error)]
30 : pub enum ProtobufSerializeError {
31 : #[error("{0}")]
32 : MetadataRecord(#[from] SerializeError),
33 : #[error("{0}")]
34 : Encode(#[from] EncodeError),
35 : }
36 :
37 : #[derive(Debug, thiserror::Error)]
38 : pub enum FromWireFormatError {
39 : #[error("{0}")]
40 : Bincode(#[from] DeserializeError),
41 : #[error("{0}")]
42 : Protobuf(#[from] ProtobufDeserializeError),
43 : #[error("{0}")]
44 : Decompress(#[from] std::io::Error),
45 : }
46 :
47 : #[derive(Debug, thiserror::Error)]
48 : pub enum ProtobufDeserializeError {
49 : #[error("{0}")]
50 : Transcode(#[from] TranscodeError),
51 : #[error("{0}")]
52 : Decode(#[from] DecodeError),
53 : }
54 :
55 : #[derive(Debug, thiserror::Error)]
56 : pub enum TranscodeError {
57 : #[error("{0}")]
58 : BadInput(String),
59 : #[error("{0}")]
60 : MetadataRecord(#[from] DeserializeError),
61 : }
62 :
63 : pub trait ToWireFormat {
64 : fn to_wire(
65 : self,
66 : format: InterpretedFormat,
67 : compression: Option<Compression>,
68 : ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
69 : }
70 :
71 : pub trait FromWireFormat {
72 : type T;
73 : fn from_wire(
74 : buf: &Bytes,
75 : format: InterpretedFormat,
76 : compression: Option<Compression>,
77 : ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
78 : }
79 :
80 : impl ToWireFormat for InterpretedWalRecords {
81 0 : async fn to_wire(
82 0 : self,
83 0 : format: InterpretedFormat,
84 0 : compression: Option<Compression>,
85 0 : ) -> Result<Bytes, ToWireFormatError> {
86 : use async_compression::tokio::write::ZstdEncoder;
87 : use async_compression::Level;
88 :
89 0 : let encode_res: Result<Bytes, ToWireFormatError> = match format {
90 : InterpretedFormat::Bincode => {
91 0 : let buf = BytesMut::new();
92 0 : let mut buf = buf.writer();
93 0 : self.ser_into(&mut buf)?;
94 0 : Ok(buf.into_inner().freeze())
95 : }
96 : InterpretedFormat::Protobuf => {
97 0 : let proto: proto::InterpretedWalRecords = self.try_into()?;
98 0 : let mut buf = BytesMut::new();
99 0 : proto
100 0 : .encode(&mut buf)
101 0 : .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
102 :
103 0 : Ok(buf.freeze())
104 : }
105 : };
106 :
107 0 : let buf = encode_res?;
108 0 : let compressed_buf = match compression {
109 0 : Some(Compression::Zstd { level }) => {
110 0 : let mut encoder = ZstdEncoder::with_quality(
111 0 : Vec::with_capacity(buf.len() / 4),
112 0 : Level::Precise(level as i32),
113 0 : );
114 0 : encoder.write_all(&buf).await?;
115 0 : encoder.shutdown().await?;
116 0 : Bytes::from(encoder.into_inner())
117 : }
118 0 : None => buf,
119 : };
120 :
121 0 : Ok(compressed_buf)
122 0 : }
123 : }
124 :
125 : impl FromWireFormat for InterpretedWalRecords {
126 : type T = Self;
127 :
128 0 : async fn from_wire(
129 0 : buf: &Bytes,
130 0 : format: InterpretedFormat,
131 0 : compression: Option<Compression>,
132 0 : ) -> Result<Self, FromWireFormatError> {
133 0 : let decompressed_buf = match compression {
134 : Some(Compression::Zstd { .. }) => {
135 : use async_compression::tokio::write::ZstdDecoder;
136 0 : let mut decoded_buf = Vec::with_capacity(buf.len());
137 0 : let mut decoder = ZstdDecoder::new(&mut decoded_buf);
138 0 : decoder.write_all(buf).await?;
139 0 : decoder.flush().await?;
140 0 : Bytes::from(decoded_buf)
141 : }
142 0 : None => buf.clone(),
143 : };
144 :
145 0 : match format {
146 : InterpretedFormat::Bincode => {
147 0 : InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
148 : }
149 : InterpretedFormat::Protobuf => {
150 0 : let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
151 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
152 0 : InterpretedWalRecords::try_from(proto)
153 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))
154 : }
155 : }
156 0 : }
157 : }
158 :
159 : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
160 : type Error = SerializeError;
161 :
162 0 : fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
163 0 : let records = value
164 0 : .records
165 0 : .into_iter()
166 0 : .map(proto::InterpretedWalRecord::try_from)
167 0 : .collect::<Result<Vec<_>, _>>()?;
168 0 : Ok(proto::InterpretedWalRecords {
169 0 : records,
170 0 : next_record_lsn: value.next_record_lsn.map(|l| l.0),
171 0 : })
172 0 : }
173 : }
174 :
175 : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
176 : type Error = SerializeError;
177 :
178 0 : fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
179 0 : let metadata_record = value
180 0 : .metadata_record
181 0 : .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
182 0 : let mut buf = Vec::new();
183 0 : meta_rec.ser_into(&mut buf)?;
184 0 : Ok(buf)
185 0 : })
186 0 : .transpose()?;
187 :
188 : Ok(proto::InterpretedWalRecord {
189 0 : metadata_record,
190 0 : batch: Some(proto::SerializedValueBatch::from(value.batch)),
191 0 : next_record_lsn: value.next_record_lsn.0,
192 0 : flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
193 0 : xid: value.xid,
194 : })
195 0 : }
196 : }
197 :
198 : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
199 0 : fn from(value: SerializedValueBatch) -> Self {
200 0 : proto::SerializedValueBatch {
201 0 : raw: value.raw,
202 0 : metadata: value
203 0 : .metadata
204 0 : .into_iter()
205 0 : .map(proto::ValueMeta::from)
206 0 : .collect(),
207 0 : max_lsn: value.max_lsn.0,
208 0 : len: value.len as u64,
209 0 : }
210 0 : }
211 : }
212 :
213 : impl From<ValueMeta> for proto::ValueMeta {
214 0 : fn from(value: ValueMeta) -> Self {
215 0 : match value {
216 0 : ValueMeta::Observed(obs) => proto::ValueMeta {
217 0 : r#type: proto::ValueMetaType::Observed.into(),
218 0 : key: Some(proto::CompactKey::from(obs.key)),
219 0 : lsn: obs.lsn.0,
220 0 : batch_offset: None,
221 0 : len: None,
222 0 : will_init: None,
223 0 : },
224 0 : ValueMeta::Serialized(ser) => proto::ValueMeta {
225 0 : r#type: proto::ValueMetaType::Serialized.into(),
226 0 : key: Some(proto::CompactKey::from(ser.key)),
227 0 : lsn: ser.lsn.0,
228 0 : batch_offset: Some(ser.batch_offset),
229 0 : len: Some(ser.len as u64),
230 0 : will_init: Some(ser.will_init),
231 0 : },
232 : }
233 0 : }
234 : }
235 :
236 : impl From<CompactKey> for proto::CompactKey {
237 5 : fn from(value: CompactKey) -> Self {
238 5 : proto::CompactKey {
239 5 : high: (value.raw() >> 64) as u64,
240 5 : low: value.raw() as u64,
241 5 : }
242 5 : }
243 : }
244 :
245 : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
246 : type Error = TranscodeError;
247 :
248 0 : fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
249 0 : let records = value
250 0 : .records
251 0 : .into_iter()
252 0 : .map(InterpretedWalRecord::try_from)
253 0 : .collect::<Result<_, _>>()?;
254 :
255 0 : Ok(InterpretedWalRecords {
256 0 : records,
257 0 : next_record_lsn: value.next_record_lsn.map(Lsn::from),
258 0 : })
259 0 : }
260 : }
261 :
262 : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
263 : type Error = TranscodeError;
264 :
265 0 : fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
266 0 : let metadata_record = value
267 0 : .metadata_record
268 0 : .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
269 0 : .transpose()?;
270 :
271 0 : let batch = {
272 0 : let batch = value.batch.ok_or_else(|| {
273 0 : TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
274 0 : })?;
275 :
276 0 : SerializedValueBatch::try_from(batch)?
277 : };
278 :
279 : Ok(InterpretedWalRecord {
280 0 : metadata_record,
281 0 : batch,
282 0 : next_record_lsn: Lsn(value.next_record_lsn),
283 0 : flush_uncommitted: if value.flush_uncommitted {
284 0 : FlushUncommittedRecords::Yes
285 : } else {
286 0 : FlushUncommittedRecords::No
287 : },
288 0 : xid: value.xid,
289 : })
290 0 : }
291 : }
292 :
293 : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
294 : type Error = TranscodeError;
295 :
296 0 : fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
297 0 : let metadata = value
298 0 : .metadata
299 0 : .into_iter()
300 0 : .map(ValueMeta::try_from)
301 0 : .collect::<Result<Vec<_>, _>>()?;
302 :
303 0 : Ok(SerializedValueBatch {
304 0 : raw: value.raw,
305 0 : metadata,
306 0 : max_lsn: Lsn(value.max_lsn),
307 0 : len: value.len as usize,
308 0 : })
309 0 : }
310 : }
311 :
312 : impl TryFrom<proto::ValueMeta> for ValueMeta {
313 : type Error = TranscodeError;
314 :
315 0 : fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
316 0 : match proto::ValueMetaType::try_from(value.r#type) {
317 : Ok(proto::ValueMetaType::Serialized) => {
318 : Ok(ValueMeta::Serialized(SerializedValueMeta {
319 0 : key: value
320 0 : .key
321 0 : .ok_or_else(|| {
322 0 : TranscodeError::BadInput("ValueMeta::key missing".to_string())
323 0 : })?
324 0 : .into(),
325 0 : lsn: Lsn(value.lsn),
326 0 : batch_offset: value.batch_offset.ok_or_else(|| {
327 0 : TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
328 0 : })?,
329 0 : len: value.len.ok_or_else(|| {
330 0 : TranscodeError::BadInput("ValueMeta::len missing".to_string())
331 0 : })? as usize,
332 0 : will_init: value.will_init.ok_or_else(|| {
333 0 : TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
334 0 : })?,
335 : }))
336 : }
337 : Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
338 0 : key: value
339 0 : .key
340 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
341 0 : .into(),
342 0 : lsn: Lsn(value.lsn),
343 : })),
344 0 : Err(_) => Err(TranscodeError::BadInput(format!(
345 0 : "Unexpected ValueMeta::type {}",
346 0 : value.r#type
347 0 : ))),
348 : }
349 0 : }
350 : }
351 :
352 : impl From<proto::CompactKey> for CompactKey {
353 5 : fn from(value: proto::CompactKey) -> Self {
354 5 : (((value.high as i128) << 64) | (value.low as i128)).into()
355 5 : }
356 : }
357 :
358 : #[test]
359 1 : fn test_compact_key_with_large_relnode() {
360 : use pageserver_api::key::Key;
361 :
362 1 : let inputs = vec![
363 1 : Key {
364 1 : field1: 0,
365 1 : field2: 0x100,
366 1 : field3: 0x200,
367 1 : field4: 0,
368 1 : field5: 0x10,
369 1 : field6: 0x5,
370 1 : },
371 1 : Key {
372 1 : field1: 0,
373 1 : field2: 0x100,
374 1 : field3: 0x200,
375 1 : field4: 0x007FFFFF,
376 1 : field5: 0x10,
377 1 : field6: 0x5,
378 1 : },
379 1 : Key {
380 1 : field1: 0,
381 1 : field2: 0x100,
382 1 : field3: 0x200,
383 1 : field4: 0x00800000,
384 1 : field5: 0x10,
385 1 : field6: 0x5,
386 1 : },
387 1 : Key {
388 1 : field1: 0,
389 1 : field2: 0x100,
390 1 : field3: 0x200,
391 1 : field4: 0x00800001,
392 1 : field5: 0x10,
393 1 : field6: 0x5,
394 1 : },
395 1 : Key {
396 1 : field1: 0,
397 1 : field2: 0xFFFFFFFF,
398 1 : field3: 0xFFFFFFFF,
399 1 : field4: 0xFFFFFFFF,
400 1 : field5: 0x0,
401 1 : field6: 0x0,
402 1 : },
403 1 : ];
404 :
405 6 : for input in inputs {
406 5 : assert!(input.is_valid_key_on_write_path());
407 5 : let compact = input.to_compact();
408 5 : let proto: proto::CompactKey = compact.into();
409 5 : let from_proto: CompactKey = proto.into();
410 5 :
411 5 : assert_eq!(
412 : compact, from_proto,
413 0 : "Round trip failed for key with relnode={:#x}",
414 : input.field4
415 : );
416 : }
417 1 : }
|