Line data Source code
1 : use bytes::{BufMut, Bytes, BytesMut};
2 : use pageserver_api::key::CompactKey;
3 : use prost::{DecodeError, EncodeError, Message};
4 : use tokio::io::AsyncWriteExt;
5 : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
6 : use utils::lsn::Lsn;
7 : use utils::postgres_client::{Compression, InterpretedFormat};
8 :
9 : use crate::models::{
10 : FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, proto,
11 : };
12 : use crate::serialized_batch::{
13 : ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
14 : };
15 :
16 : #[derive(Debug, thiserror::Error)]
17 : pub enum ToWireFormatError {
18 : #[error("{0}")]
19 : Bincode(#[from] SerializeError),
20 : #[error("{0}")]
21 : Protobuf(#[from] ProtobufSerializeError),
22 : #[error("{0}")]
23 : Compression(#[from] std::io::Error),
24 : }
25 :
26 : #[derive(Debug, thiserror::Error)]
27 : pub enum ProtobufSerializeError {
28 : #[error("{0}")]
29 : MetadataRecord(#[from] SerializeError),
30 : #[error("{0}")]
31 : Encode(#[from] EncodeError),
32 : }
33 :
34 : #[derive(Debug, thiserror::Error)]
35 : pub enum FromWireFormatError {
36 : #[error("{0}")]
37 : Bincode(#[from] DeserializeError),
38 : #[error("{0}")]
39 : Protobuf(#[from] ProtobufDeserializeError),
40 : #[error("{0}")]
41 : Decompress(#[from] std::io::Error),
42 : }
43 :
44 : #[derive(Debug, thiserror::Error)]
45 : pub enum ProtobufDeserializeError {
46 : #[error("{0}")]
47 : Transcode(#[from] TranscodeError),
48 : #[error("{0}")]
49 : Decode(#[from] DecodeError),
50 : }
51 :
52 : #[derive(Debug, thiserror::Error)]
53 : pub enum TranscodeError {
54 : #[error("{0}")]
55 : BadInput(String),
56 : #[error("{0}")]
57 : MetadataRecord(#[from] DeserializeError),
58 : }
59 :
60 : pub trait ToWireFormat {
61 : fn to_wire(
62 : self,
63 : format: InterpretedFormat,
64 : compression: Option<Compression>,
65 : ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
66 : }
67 :
68 : pub trait FromWireFormat {
69 : type T;
70 : fn from_wire(
71 : buf: &Bytes,
72 : format: InterpretedFormat,
73 : compression: Option<Compression>,
74 : ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
75 : }
76 :
77 : impl ToWireFormat for InterpretedWalRecords {
78 0 : async fn to_wire(
79 0 : self,
80 0 : format: InterpretedFormat,
81 0 : compression: Option<Compression>,
82 0 : ) -> Result<Bytes, ToWireFormatError> {
83 : use async_compression::Level;
84 : use async_compression::tokio::write::ZstdEncoder;
85 :
86 0 : let encode_res: Result<Bytes, ToWireFormatError> = match format {
87 : InterpretedFormat::Bincode => {
88 0 : let buf = BytesMut::new();
89 0 : let mut buf = buf.writer();
90 0 : self.ser_into(&mut buf)?;
91 0 : Ok(buf.into_inner().freeze())
92 : }
93 : InterpretedFormat::Protobuf => {
94 0 : let proto: proto::InterpretedWalRecords = self.try_into()?;
95 0 : let mut buf = BytesMut::new();
96 0 : proto
97 0 : .encode(&mut buf)
98 0 : .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
99 :
100 0 : Ok(buf.freeze())
101 : }
102 : };
103 :
104 0 : let buf = encode_res?;
105 0 : let compressed_buf = match compression {
106 0 : Some(Compression::Zstd { level }) => {
107 0 : let mut encoder = ZstdEncoder::with_quality(
108 0 : Vec::with_capacity(buf.len() / 4),
109 0 : Level::Precise(level as i32),
110 0 : );
111 0 : encoder.write_all(&buf).await?;
112 0 : encoder.shutdown().await?;
113 0 : Bytes::from(encoder.into_inner())
114 : }
115 0 : None => buf,
116 : };
117 :
118 0 : Ok(compressed_buf)
119 0 : }
120 : }
121 :
122 : impl FromWireFormat for InterpretedWalRecords {
123 : type T = Self;
124 :
125 0 : async fn from_wire(
126 0 : buf: &Bytes,
127 0 : format: InterpretedFormat,
128 0 : compression: Option<Compression>,
129 0 : ) -> Result<Self, FromWireFormatError> {
130 0 : let decompressed_buf = match compression {
131 : Some(Compression::Zstd { .. }) => {
132 : use async_compression::tokio::write::ZstdDecoder;
133 0 : let mut decoded_buf = Vec::with_capacity(buf.len());
134 0 : let mut decoder = ZstdDecoder::new(&mut decoded_buf);
135 0 : decoder.write_all(buf).await?;
136 0 : decoder.flush().await?;
137 0 : Bytes::from(decoded_buf)
138 : }
139 0 : None => buf.clone(),
140 : };
141 :
142 0 : match format {
143 : InterpretedFormat::Bincode => {
144 0 : InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
145 : }
146 : InterpretedFormat::Protobuf => {
147 0 : let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
148 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
149 0 : InterpretedWalRecords::try_from(proto)
150 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))
151 : }
152 : }
153 0 : }
154 : }
155 :
156 : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
157 : type Error = SerializeError;
158 :
159 0 : fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
160 0 : let records = value
161 0 : .records
162 0 : .into_iter()
163 0 : .map(proto::InterpretedWalRecord::try_from)
164 0 : .collect::<Result<Vec<_>, _>>()?;
165 0 : Ok(proto::InterpretedWalRecords {
166 0 : records,
167 0 : next_record_lsn: Some(value.next_record_lsn.0),
168 0 : raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
169 0 : })
170 0 : }
171 : }
172 :
173 : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
174 : type Error = SerializeError;
175 :
176 0 : fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
177 0 : let metadata_record = value
178 0 : .metadata_record
179 0 : .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
180 0 : let mut buf = Vec::new();
181 0 : meta_rec.ser_into(&mut buf)?;
182 0 : Ok(buf)
183 0 : })
184 0 : .transpose()?;
185 :
186 : Ok(proto::InterpretedWalRecord {
187 0 : metadata_record,
188 0 : batch: Some(proto::SerializedValueBatch::from(value.batch)),
189 0 : next_record_lsn: value.next_record_lsn.0,
190 0 : flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
191 0 : xid: value.xid,
192 : })
193 0 : }
194 : }
195 :
196 : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
197 0 : fn from(value: SerializedValueBatch) -> Self {
198 0 : proto::SerializedValueBatch {
199 0 : raw: value.raw,
200 0 : metadata: value
201 0 : .metadata
202 0 : .into_iter()
203 0 : .map(proto::ValueMeta::from)
204 0 : .collect(),
205 0 : max_lsn: value.max_lsn.0,
206 0 : len: value.len as u64,
207 0 : }
208 0 : }
209 : }
210 :
211 : impl From<ValueMeta> for proto::ValueMeta {
212 0 : fn from(value: ValueMeta) -> Self {
213 0 : match value {
214 0 : ValueMeta::Observed(obs) => proto::ValueMeta {
215 0 : r#type: proto::ValueMetaType::Observed.into(),
216 0 : key: Some(proto::CompactKey::from(obs.key)),
217 0 : lsn: obs.lsn.0,
218 0 : batch_offset: None,
219 0 : len: None,
220 0 : will_init: None,
221 0 : },
222 0 : ValueMeta::Serialized(ser) => proto::ValueMeta {
223 0 : r#type: proto::ValueMetaType::Serialized.into(),
224 0 : key: Some(proto::CompactKey::from(ser.key)),
225 0 : lsn: ser.lsn.0,
226 0 : batch_offset: Some(ser.batch_offset),
227 0 : len: Some(ser.len as u64),
228 0 : will_init: Some(ser.will_init),
229 0 : },
230 : }
231 0 : }
232 : }
233 :
234 : impl From<CompactKey> for proto::CompactKey {
235 5 : fn from(value: CompactKey) -> Self {
236 5 : proto::CompactKey {
237 5 : high: (value.raw() >> 64) as u64,
238 5 : low: value.raw() as u64,
239 5 : }
240 5 : }
241 : }
242 :
243 : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
244 : type Error = TranscodeError;
245 :
246 0 : fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
247 0 : let records = value
248 0 : .records
249 0 : .into_iter()
250 0 : .map(InterpretedWalRecord::try_from)
251 0 : .collect::<Result<_, _>>()?;
252 :
253 0 : Ok(InterpretedWalRecords {
254 0 : records,
255 0 : next_record_lsn: value
256 0 : .next_record_lsn
257 0 : .map(Lsn::from)
258 0 : .expect("Always provided"),
259 0 : raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
260 0 : })
261 0 : }
262 : }
263 :
264 : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
265 : type Error = TranscodeError;
266 :
267 0 : fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
268 0 : let metadata_record = value
269 0 : .metadata_record
270 0 : .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
271 0 : .transpose()?;
272 :
273 0 : let batch = {
274 0 : let batch = value.batch.ok_or_else(|| {
275 0 : TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
276 0 : })?;
277 :
278 0 : SerializedValueBatch::try_from(batch)?
279 : };
280 :
281 : Ok(InterpretedWalRecord {
282 0 : metadata_record,
283 0 : batch,
284 0 : next_record_lsn: Lsn(value.next_record_lsn),
285 0 : flush_uncommitted: if value.flush_uncommitted {
286 0 : FlushUncommittedRecords::Yes
287 : } else {
288 0 : FlushUncommittedRecords::No
289 : },
290 0 : xid: value.xid,
291 : })
292 0 : }
293 : }
294 :
295 : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
296 : type Error = TranscodeError;
297 :
298 0 : fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
299 0 : let metadata = value
300 0 : .metadata
301 0 : .into_iter()
302 0 : .map(ValueMeta::try_from)
303 0 : .collect::<Result<Vec<_>, _>>()?;
304 :
305 0 : Ok(SerializedValueBatch {
306 0 : raw: value.raw,
307 0 : metadata,
308 0 : max_lsn: Lsn(value.max_lsn),
309 0 : len: value.len as usize,
310 0 : })
311 0 : }
312 : }
313 :
314 : impl TryFrom<proto::ValueMeta> for ValueMeta {
315 : type Error = TranscodeError;
316 :
317 0 : fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
318 0 : match proto::ValueMetaType::try_from(value.r#type) {
319 : Ok(proto::ValueMetaType::Serialized) => {
320 : Ok(ValueMeta::Serialized(SerializedValueMeta {
321 0 : key: value
322 0 : .key
323 0 : .ok_or_else(|| {
324 0 : TranscodeError::BadInput("ValueMeta::key missing".to_string())
325 0 : })?
326 0 : .into(),
327 0 : lsn: Lsn(value.lsn),
328 0 : batch_offset: value.batch_offset.ok_or_else(|| {
329 0 : TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
330 0 : })?,
331 0 : len: value.len.ok_or_else(|| {
332 0 : TranscodeError::BadInput("ValueMeta::len missing".to_string())
333 0 : })? as usize,
334 0 : will_init: value.will_init.ok_or_else(|| {
335 0 : TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
336 0 : })?,
337 : }))
338 : }
339 : Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
340 0 : key: value
341 0 : .key
342 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
343 0 : .into(),
344 0 : lsn: Lsn(value.lsn),
345 : })),
346 0 : Err(_) => Err(TranscodeError::BadInput(format!(
347 0 : "Unexpected ValueMeta::type {}",
348 0 : value.r#type
349 0 : ))),
350 : }
351 0 : }
352 : }
353 :
354 : impl From<proto::CompactKey> for CompactKey {
355 5 : fn from(value: proto::CompactKey) -> Self {
356 5 : (((value.high as i128) << 64) | (value.low as i128)).into()
357 5 : }
358 : }
359 :
360 : #[test]
361 1 : fn test_compact_key_with_large_relnode() {
362 : use pageserver_api::key::Key;
363 :
364 1 : let inputs = vec![
365 1 : Key {
366 1 : field1: 0,
367 1 : field2: 0x100,
368 1 : field3: 0x200,
369 1 : field4: 0,
370 1 : field5: 0x10,
371 1 : field6: 0x5,
372 1 : },
373 1 : Key {
374 1 : field1: 0,
375 1 : field2: 0x100,
376 1 : field3: 0x200,
377 1 : field4: 0x007FFFFF,
378 1 : field5: 0x10,
379 1 : field6: 0x5,
380 1 : },
381 1 : Key {
382 1 : field1: 0,
383 1 : field2: 0x100,
384 1 : field3: 0x200,
385 1 : field4: 0x00800000,
386 1 : field5: 0x10,
387 1 : field6: 0x5,
388 1 : },
389 1 : Key {
390 1 : field1: 0,
391 1 : field2: 0x100,
392 1 : field3: 0x200,
393 1 : field4: 0x00800001,
394 1 : field5: 0x10,
395 1 : field6: 0x5,
396 1 : },
397 1 : Key {
398 1 : field1: 0,
399 1 : field2: 0xFFFFFFFF,
400 1 : field3: 0xFFFFFFFF,
401 1 : field4: 0xFFFFFFFF,
402 1 : field5: 0x0,
403 1 : field6: 0x0,
404 1 : },
405 1 : ];
406 :
407 6 : for input in inputs {
408 5 : assert!(input.is_valid_key_on_write_path());
409 5 : let compact = input.to_compact();
410 5 : let proto: proto::CompactKey = compact.into();
411 5 : let from_proto: CompactKey = proto.into();
412 5 :
413 5 : assert_eq!(
414 : compact, from_proto,
415 0 : "Round trip failed for key with relnode={:#x}",
416 : input.field4
417 : );
418 : }
419 1 : }
|