Line data Source code
1 : use bytes::{BufMut, Bytes, BytesMut};
2 : use pageserver_api::key::CompactKey;
3 : use prost::{DecodeError, EncodeError, Message};
4 : use tokio::io::AsyncWriteExt;
5 : use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
6 : use utils::lsn::Lsn;
7 : use utils::postgres_client::{Compression, InterpretedFormat};
8 :
9 : use crate::models::{
10 : FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
11 : };
12 :
13 : use crate::serialized_batch::{
14 : ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
15 : };
16 :
17 : use crate::models::proto;
18 :
19 : #[derive(Debug, thiserror::Error)]
20 : pub enum ToWireFormatError {
21 : #[error("{0}")]
22 : Bincode(#[from] SerializeError),
23 : #[error("{0}")]
24 : Protobuf(#[from] ProtobufSerializeError),
25 : #[error("{0}")]
26 : Compression(#[from] std::io::Error),
27 : }
28 :
29 : #[derive(Debug, thiserror::Error)]
30 : pub enum ProtobufSerializeError {
31 : #[error("{0}")]
32 : MetadataRecord(#[from] SerializeError),
33 : #[error("{0}")]
34 : Encode(#[from] EncodeError),
35 : }
36 :
37 : #[derive(Debug, thiserror::Error)]
38 : pub enum FromWireFormatError {
39 : #[error("{0}")]
40 : Bincode(#[from] DeserializeError),
41 : #[error("{0}")]
42 : Protobuf(#[from] ProtobufDeserializeError),
43 : #[error("{0}")]
44 : Decompress(#[from] std::io::Error),
45 : }
46 :
47 : #[derive(Debug, thiserror::Error)]
48 : pub enum ProtobufDeserializeError {
49 : #[error("{0}")]
50 : Transcode(#[from] TranscodeError),
51 : #[error("{0}")]
52 : Decode(#[from] DecodeError),
53 : }
54 :
55 : #[derive(Debug, thiserror::Error)]
56 : pub enum TranscodeError {
57 : #[error("{0}")]
58 : BadInput(String),
59 : #[error("{0}")]
60 : MetadataRecord(#[from] DeserializeError),
61 : }
62 :
63 : pub trait ToWireFormat {
64 : fn to_wire(
65 : self,
66 : format: InterpretedFormat,
67 : compression: Option<Compression>,
68 : ) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
69 : }
70 :
71 : pub trait FromWireFormat {
72 : type T;
73 : fn from_wire(
74 : buf: &Bytes,
75 : format: InterpretedFormat,
76 : compression: Option<Compression>,
77 : ) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
78 : }
79 :
80 : impl ToWireFormat for InterpretedWalRecords {
81 0 : async fn to_wire(
82 0 : self,
83 0 : format: InterpretedFormat,
84 0 : compression: Option<Compression>,
85 0 : ) -> Result<Bytes, ToWireFormatError> {
86 : use async_compression::tokio::write::ZstdEncoder;
87 : use async_compression::Level;
88 :
89 0 : let encode_res: Result<Bytes, ToWireFormatError> = match format {
90 : InterpretedFormat::Bincode => {
91 0 : let buf = BytesMut::new();
92 0 : let mut buf = buf.writer();
93 0 : self.ser_into(&mut buf)?;
94 0 : Ok(buf.into_inner().freeze())
95 : }
96 : InterpretedFormat::Protobuf => {
97 0 : let proto: proto::InterpretedWalRecords = self.try_into()?;
98 0 : let mut buf = BytesMut::new();
99 0 : proto
100 0 : .encode(&mut buf)
101 0 : .map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
102 :
103 0 : Ok(buf.freeze())
104 : }
105 : };
106 :
107 0 : let buf = encode_res?;
108 0 : let compressed_buf = match compression {
109 0 : Some(Compression::Zstd { level }) => {
110 0 : let mut encoder = ZstdEncoder::with_quality(
111 0 : Vec::with_capacity(buf.len() / 4),
112 0 : Level::Precise(level as i32),
113 0 : );
114 0 : encoder.write_all(&buf).await?;
115 0 : encoder.shutdown().await?;
116 0 : Bytes::from(encoder.into_inner())
117 : }
118 0 : None => buf,
119 : };
120 :
121 0 : Ok(compressed_buf)
122 0 : }
123 : }
124 :
125 : impl FromWireFormat for InterpretedWalRecords {
126 : type T = Self;
127 :
128 0 : async fn from_wire(
129 0 : buf: &Bytes,
130 0 : format: InterpretedFormat,
131 0 : compression: Option<Compression>,
132 0 : ) -> Result<Self, FromWireFormatError> {
133 0 : let decompressed_buf = match compression {
134 : Some(Compression::Zstd { .. }) => {
135 : use async_compression::tokio::write::ZstdDecoder;
136 0 : let mut decoded_buf = Vec::with_capacity(buf.len());
137 0 : let mut decoder = ZstdDecoder::new(&mut decoded_buf);
138 0 : decoder.write_all(buf).await?;
139 0 : decoder.flush().await?;
140 0 : Bytes::from(decoded_buf)
141 : }
142 0 : None => buf.clone(),
143 : };
144 :
145 0 : match format {
146 : InterpretedFormat::Bincode => {
147 0 : InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
148 : }
149 : InterpretedFormat::Protobuf => {
150 0 : let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
151 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
152 0 : InterpretedWalRecords::try_from(proto)
153 0 : .map_err(|e| FromWireFormatError::Protobuf(e.into()))
154 : }
155 : }
156 0 : }
157 : }
158 :
159 : impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
160 : type Error = SerializeError;
161 :
162 0 : fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
163 0 : let records = value
164 0 : .records
165 0 : .into_iter()
166 0 : .map(proto::InterpretedWalRecord::try_from)
167 0 : .collect::<Result<Vec<_>, _>>()?;
168 0 : Ok(proto::InterpretedWalRecords {
169 0 : records,
170 0 : next_record_lsn: Some(value.next_record_lsn.0),
171 0 : raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
172 0 : })
173 0 : }
174 : }
175 :
176 : impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
177 : type Error = SerializeError;
178 :
179 0 : fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
180 0 : let metadata_record = value
181 0 : .metadata_record
182 0 : .map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
183 0 : let mut buf = Vec::new();
184 0 : meta_rec.ser_into(&mut buf)?;
185 0 : Ok(buf)
186 0 : })
187 0 : .transpose()?;
188 :
189 : Ok(proto::InterpretedWalRecord {
190 0 : metadata_record,
191 0 : batch: Some(proto::SerializedValueBatch::from(value.batch)),
192 0 : next_record_lsn: value.next_record_lsn.0,
193 0 : flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
194 0 : xid: value.xid,
195 : })
196 0 : }
197 : }
198 :
199 : impl From<SerializedValueBatch> for proto::SerializedValueBatch {
200 0 : fn from(value: SerializedValueBatch) -> Self {
201 0 : proto::SerializedValueBatch {
202 0 : raw: value.raw,
203 0 : metadata: value
204 0 : .metadata
205 0 : .into_iter()
206 0 : .map(proto::ValueMeta::from)
207 0 : .collect(),
208 0 : max_lsn: value.max_lsn.0,
209 0 : len: value.len as u64,
210 0 : }
211 0 : }
212 : }
213 :
214 : impl From<ValueMeta> for proto::ValueMeta {
215 0 : fn from(value: ValueMeta) -> Self {
216 0 : match value {
217 0 : ValueMeta::Observed(obs) => proto::ValueMeta {
218 0 : r#type: proto::ValueMetaType::Observed.into(),
219 0 : key: Some(proto::CompactKey::from(obs.key)),
220 0 : lsn: obs.lsn.0,
221 0 : batch_offset: None,
222 0 : len: None,
223 0 : will_init: None,
224 0 : },
225 0 : ValueMeta::Serialized(ser) => proto::ValueMeta {
226 0 : r#type: proto::ValueMetaType::Serialized.into(),
227 0 : key: Some(proto::CompactKey::from(ser.key)),
228 0 : lsn: ser.lsn.0,
229 0 : batch_offset: Some(ser.batch_offset),
230 0 : len: Some(ser.len as u64),
231 0 : will_init: Some(ser.will_init),
232 0 : },
233 : }
234 0 : }
235 : }
236 :
237 : impl From<CompactKey> for proto::CompactKey {
238 5 : fn from(value: CompactKey) -> Self {
239 5 : proto::CompactKey {
240 5 : high: (value.raw() >> 64) as u64,
241 5 : low: value.raw() as u64,
242 5 : }
243 5 : }
244 : }
245 :
246 : impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
247 : type Error = TranscodeError;
248 :
249 0 : fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
250 0 : let records = value
251 0 : .records
252 0 : .into_iter()
253 0 : .map(InterpretedWalRecord::try_from)
254 0 : .collect::<Result<_, _>>()?;
255 :
256 0 : Ok(InterpretedWalRecords {
257 0 : records,
258 0 : next_record_lsn: value
259 0 : .next_record_lsn
260 0 : .map(Lsn::from)
261 0 : .expect("Always provided"),
262 0 : raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
263 0 : })
264 0 : }
265 : }
266 :
267 : impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
268 : type Error = TranscodeError;
269 :
270 0 : fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
271 0 : let metadata_record = value
272 0 : .metadata_record
273 0 : .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
274 0 : .transpose()?;
275 :
276 0 : let batch = {
277 0 : let batch = value.batch.ok_or_else(|| {
278 0 : TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
279 0 : })?;
280 :
281 0 : SerializedValueBatch::try_from(batch)?
282 : };
283 :
284 : Ok(InterpretedWalRecord {
285 0 : metadata_record,
286 0 : batch,
287 0 : next_record_lsn: Lsn(value.next_record_lsn),
288 0 : flush_uncommitted: if value.flush_uncommitted {
289 0 : FlushUncommittedRecords::Yes
290 : } else {
291 0 : FlushUncommittedRecords::No
292 : },
293 0 : xid: value.xid,
294 : })
295 0 : }
296 : }
297 :
298 : impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
299 : type Error = TranscodeError;
300 :
301 0 : fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
302 0 : let metadata = value
303 0 : .metadata
304 0 : .into_iter()
305 0 : .map(ValueMeta::try_from)
306 0 : .collect::<Result<Vec<_>, _>>()?;
307 :
308 0 : Ok(SerializedValueBatch {
309 0 : raw: value.raw,
310 0 : metadata,
311 0 : max_lsn: Lsn(value.max_lsn),
312 0 : len: value.len as usize,
313 0 : })
314 0 : }
315 : }
316 :
317 : impl TryFrom<proto::ValueMeta> for ValueMeta {
318 : type Error = TranscodeError;
319 :
320 0 : fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
321 0 : match proto::ValueMetaType::try_from(value.r#type) {
322 : Ok(proto::ValueMetaType::Serialized) => {
323 : Ok(ValueMeta::Serialized(SerializedValueMeta {
324 0 : key: value
325 0 : .key
326 0 : .ok_or_else(|| {
327 0 : TranscodeError::BadInput("ValueMeta::key missing".to_string())
328 0 : })?
329 0 : .into(),
330 0 : lsn: Lsn(value.lsn),
331 0 : batch_offset: value.batch_offset.ok_or_else(|| {
332 0 : TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
333 0 : })?,
334 0 : len: value.len.ok_or_else(|| {
335 0 : TranscodeError::BadInput("ValueMeta::len missing".to_string())
336 0 : })? as usize,
337 0 : will_init: value.will_init.ok_or_else(|| {
338 0 : TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
339 0 : })?,
340 : }))
341 : }
342 : Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
343 0 : key: value
344 0 : .key
345 0 : .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
346 0 : .into(),
347 0 : lsn: Lsn(value.lsn),
348 : })),
349 0 : Err(_) => Err(TranscodeError::BadInput(format!(
350 0 : "Unexpected ValueMeta::type {}",
351 0 : value.r#type
352 0 : ))),
353 : }
354 0 : }
355 : }
356 :
357 : impl From<proto::CompactKey> for CompactKey {
358 5 : fn from(value: proto::CompactKey) -> Self {
359 5 : (((value.high as i128) << 64) | (value.low as i128)).into()
360 5 : }
361 : }
362 :
363 : #[test]
364 1 : fn test_compact_key_with_large_relnode() {
365 : use pageserver_api::key::Key;
366 :
367 1 : let inputs = vec![
368 1 : Key {
369 1 : field1: 0,
370 1 : field2: 0x100,
371 1 : field3: 0x200,
372 1 : field4: 0,
373 1 : field5: 0x10,
374 1 : field6: 0x5,
375 1 : },
376 1 : Key {
377 1 : field1: 0,
378 1 : field2: 0x100,
379 1 : field3: 0x200,
380 1 : field4: 0x007FFFFF,
381 1 : field5: 0x10,
382 1 : field6: 0x5,
383 1 : },
384 1 : Key {
385 1 : field1: 0,
386 1 : field2: 0x100,
387 1 : field3: 0x200,
388 1 : field4: 0x00800000,
389 1 : field5: 0x10,
390 1 : field6: 0x5,
391 1 : },
392 1 : Key {
393 1 : field1: 0,
394 1 : field2: 0x100,
395 1 : field3: 0x200,
396 1 : field4: 0x00800001,
397 1 : field5: 0x10,
398 1 : field6: 0x5,
399 1 : },
400 1 : Key {
401 1 : field1: 0,
402 1 : field2: 0xFFFFFFFF,
403 1 : field3: 0xFFFFFFFF,
404 1 : field4: 0xFFFFFFFF,
405 1 : field5: 0x0,
406 1 : field6: 0x0,
407 1 : },
408 1 : ];
409 :
410 6 : for input in inputs {
411 5 : assert!(input.is_valid_key_on_write_path());
412 5 : let compact = input.to_compact();
413 5 : let proto: proto::CompactKey = compact.into();
414 5 : let from_proto: CompactKey = proto.into();
415 5 :
416 5 : assert_eq!(
417 : compact, from_proto,
418 0 : "Round trip failed for key with relnode={:#x}",
419 : input.field4
420 : );
421 : }
422 1 : }
|