Line data Source code
1 : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
2 : //! instances. Each batch contains a raw buffer (serialized values)
3 : //! and a list of metadata for each (key, LSN) tuple present in the batch.
4 : //!
5 : //! Such batches are created from decoded PG wal records and ingested
6 : //! by the pageserver by writing directly to the ephemeral file.
7 :
8 : use std::collections::{BTreeSet, HashMap};
9 :
10 : use bytes::{Bytes, BytesMut};
11 : use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
12 : use pageserver_api::keyspace::KeySpace;
13 : use pageserver_api::record::NeonWalRecord;
14 : use pageserver_api::reltag::RelTag;
15 : use pageserver_api::shard::ShardIdentity;
16 : use pageserver_api::value::Value;
17 : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
18 : use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
19 : use serde::{Deserialize, Serialize};
20 : use utils::bin_ser::BeSer;
21 : use utils::lsn::Lsn;
22 :
23 : use crate::models::InterpretedWalRecord;
24 :
25 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
26 :
27 : /// Accompanying metadata for the batch
28 : /// A value may be serialized and stored into the batch or just "observed".
29 : /// Shard 0 currently "observes" all values in order to accurately track
30 : /// relation sizes. In the case of "observed" values, we only need to know
31 : /// the key and LSN, so two types of metadata are supported to save on network
32 : /// bandwidth.
33 0 : #[derive(Serialize, Deserialize, Clone)]
34 : pub enum ValueMeta {
35 : Serialized(SerializedValueMeta),
36 : Observed(ObservedValueMeta),
37 : }
38 :
39 : impl ValueMeta {
40 57091945 : pub fn key(&self) -> CompactKey {
41 57091945 : match self {
42 57091945 : Self::Serialized(ser) => ser.key,
43 0 : Self::Observed(obs) => obs.key,
44 : }
45 57091945 : }
46 :
47 56800661 : pub fn lsn(&self) -> Lsn {
48 56800661 : match self {
49 56800661 : Self::Serialized(ser) => ser.lsn,
50 0 : Self::Observed(obs) => obs.lsn,
51 : }
52 56800661 : }
53 : }
54 :
55 : /// Wrapper around [`ValueMeta`] that implements ordering by
56 : /// (key, LSN) tuples
57 : struct OrderedValueMeta(ValueMeta);
58 :
59 : impl Ord for OrderedValueMeta {
60 59 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 59 : (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
62 59 : }
63 : }
64 :
65 : impl PartialOrd for OrderedValueMeta {
66 9 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
67 9 : Some(self.cmp(other))
68 9 : }
69 : }
70 :
71 : impl PartialEq for OrderedValueMeta {
72 8 : fn eq(&self, other: &Self) -> bool {
73 8 : (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
74 8 : }
75 : }
76 :
77 : impl Eq for OrderedValueMeta {}
78 :
79 : /// Metadata for a [`Value`] serialized into the batch.
80 0 : #[derive(Serialize, Deserialize, Clone)]
81 : pub struct SerializedValueMeta {
82 : pub key: CompactKey,
83 : pub lsn: Lsn,
84 : /// Starting offset of the value for the (key, LSN) tuple
85 : /// in [`SerializedValueBatch::raw`]
86 : pub batch_offset: u64,
87 : pub len: usize,
88 : pub will_init: bool,
89 : }
90 :
91 : /// Metadata for a [`Value`] observed by the batch
92 0 : #[derive(Serialize, Deserialize, Clone)]
93 : pub struct ObservedValueMeta {
94 : pub key: CompactKey,
95 : pub lsn: Lsn,
96 : }
97 :
98 : /// Batch of serialized [`Value`]s.
99 0 : #[derive(Serialize, Deserialize, Clone)]
100 : pub struct SerializedValueBatch {
101 : /// [`Value`]s serialized in EphemeralFile's native format,
102 : /// ready for disk write by the pageserver
103 : pub raw: Vec<u8>,
104 :
105 : /// Metadata to make sense of the bytes in [`Self::raw`]
106 : /// and represent "observed" values.
107 : ///
108 : /// Invariant: Metadata entries for any given key are ordered
109 : /// by LSN. Note that entries for a key do not have to be contiguous.
110 : pub metadata: Vec<ValueMeta>,
111 :
112 : /// The highest LSN of any value in the batch
113 : pub max_lsn: Lsn,
114 :
115 : /// Number of values encoded by [`Self::raw`]
116 : pub len: usize,
117 : }
118 :
119 : impl Default for SerializedValueBatch {
120 816886 : fn default() -> Self {
121 816886 : Self {
122 816886 : raw: Default::default(),
123 816886 : metadata: Default::default(),
124 816886 : max_lsn: Lsn(0),
125 816886 : len: 0,
126 816886 : }
127 816886 : }
128 : }
129 :
130 : impl SerializedValueBatch {
131 : /// Populates the given `shard_records` with value batches from this WAL record, if any,
132 : /// discarding those belonging to other shards.
133 : ///
134 : /// The batch will only contain values for keys targeting the specifiec
135 : /// shard. Shard 0 is a special case, where any keys that don't belong to
136 : /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
137 : /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
138 292310 : pub(crate) fn from_decoded_filtered(
139 292310 : decoded: DecodedWALRecord,
140 292310 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
141 292310 : next_record_lsn: Lsn,
142 292310 : pg_version: u32,
143 292310 : ) -> anyhow::Result<()> {
144 : // First determine how big the buffers need to be and allocate it up-front.
145 : // This duplicates some of the work below, but it's empirically much faster.
146 292510 : for (shard, record) in shard_records.iter_mut() {
147 292510 : assert!(record.batch.is_empty());
148 :
149 292510 : let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
150 292510 : record.batch.raw = Vec::with_capacity(estimate);
151 : }
152 :
153 292310 : for blk in decoded.blocks.iter() {
154 291284 : let rel = RelTag {
155 291284 : spcnode: blk.rnode_spcnode,
156 291284 : dbnode: blk.rnode_dbnode,
157 291284 : relnode: blk.rnode_relnode,
158 291284 : forknum: blk.forknum,
159 291284 : };
160 291284 :
161 291284 : let key = rel_block_to_key(rel, blk.blkno);
162 291284 :
163 291284 : if !key.is_valid_key_on_write_path() {
164 0 : anyhow::bail!(
165 0 : "Unsupported key decoded at LSN {}: {}",
166 0 : next_record_lsn,
167 0 : key
168 0 : );
169 291284 : }
170 :
171 291284 : for (shard, record) in shard_records.iter_mut() {
172 291284 : let key_is_local = shard.is_key_local(&key);
173 291284 :
174 291284 : tracing::debug!(
175 : lsn=%next_record_lsn,
176 : key=%key,
177 0 : "ingest: shard decision {}",
178 0 : if !key_is_local { "drop" } else { "keep" },
179 : );
180 :
181 291284 : if !key_is_local {
182 0 : if shard.is_shard_zero() {
183 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
184 : // its blkno in case it implicitly extends a relation.
185 0 : record
186 0 : .batch
187 0 : .metadata
188 0 : .push(ValueMeta::Observed(ObservedValueMeta {
189 0 : key: key.to_compact(),
190 0 : lsn: next_record_lsn,
191 0 : }))
192 0 : }
193 :
194 0 : continue;
195 291284 : }
196 :
197 : // Instead of storing full-page-image WAL record,
198 : // it is better to store extracted image: we can skip wal-redo
199 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
200 : // so them have to be copied multiple times.
201 : //
202 291284 : let val = if Self::block_is_image(&decoded, blk, pg_version) {
203 : // Extract page image from FPI record
204 48 : let img_len = blk.bimg_len as usize;
205 48 : let img_offs = blk.bimg_offset as usize;
206 48 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
207 48 : // TODO(vlad): skip the copy
208 48 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
209 48 :
210 48 : if blk.hole_length != 0 {
211 0 : let tail = image.split_off(blk.hole_offset as usize);
212 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
213 0 : image.unsplit(tail);
214 48 : }
215 : //
216 : // Match the logic of XLogReadBufferForRedoExtended:
217 : // The page may be uninitialized. If so, we can't set the LSN because
218 : // that would corrupt the page.
219 : //
220 48 : if !page_is_new(&image) {
221 36 : page_set_lsn(&mut image, next_record_lsn)
222 12 : }
223 48 : assert_eq!(image.len(), BLCKSZ as usize);
224 :
225 48 : Value::Image(image.freeze())
226 : } else {
227 : Value::WalRecord(NeonWalRecord::Postgres {
228 291236 : will_init: blk.will_init || blk.apply_image,
229 291236 : rec: decoded.record.clone(),
230 : })
231 : };
232 :
233 291284 : let relative_off = record.batch.raw.len() as u64;
234 291284 :
235 291284 : val.ser_into(&mut record.batch.raw)
236 291284 : .expect("Writing into in-memory buffer is infallible");
237 291284 :
238 291284 : let val_ser_size = record.batch.raw.len() - relative_off as usize;
239 291284 :
240 291284 : record
241 291284 : .batch
242 291284 : .metadata
243 291284 : .push(ValueMeta::Serialized(SerializedValueMeta {
244 291284 : key: key.to_compact(),
245 291284 : lsn: next_record_lsn,
246 291284 : batch_offset: relative_off,
247 291284 : len: val_ser_size,
248 291284 : will_init: val.will_init(),
249 291284 : }));
250 291284 : record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
251 291284 : record.batch.len += 1;
252 : }
253 : }
254 :
255 292310 : if cfg!(any(debug_assertions, test)) {
256 : // Validate that the batches are correct
257 292510 : for record in shard_records.values() {
258 292510 : record.batch.validate_lsn_order();
259 292510 : }
260 0 : }
261 :
262 292310 : Ok(())
263 292310 : }
264 :
265 : /// Look into the decoded PG WAL record and determine
266 : /// roughly how large the buffer for serialized values needs to be.
267 292510 : fn estimate_buffer_size(
268 292510 : decoded: &DecodedWALRecord,
269 292510 : shard: &ShardIdentity,
270 292510 : pg_version: u32,
271 292510 : ) -> usize {
272 292510 : let mut estimate: usize = 0;
273 :
274 292510 : for blk in decoded.blocks.iter() {
275 291284 : let rel = RelTag {
276 291284 : spcnode: blk.rnode_spcnode,
277 291284 : dbnode: blk.rnode_dbnode,
278 291284 : relnode: blk.rnode_relnode,
279 291284 : forknum: blk.forknum,
280 291284 : };
281 291284 :
282 291284 : let key = rel_block_to_key(rel, blk.blkno);
283 291284 :
284 291284 : if !shard.is_key_local(&key) {
285 0 : continue;
286 291284 : }
287 291284 :
288 291284 : if Self::block_is_image(decoded, blk, pg_version) {
289 48 : // 4 bytes for the Value::Image discriminator
290 48 : // 8 bytes for encoding the size of the buffer
291 48 : // BLCKSZ for the raw image
292 48 : estimate += (4 + 8 + BLCKSZ) as usize;
293 291236 : } else {
294 291236 : // 4 bytes for the Value::WalRecord discriminator
295 291236 : // 4 bytes for the NeonWalRecord::Postgres discriminator
296 291236 : // 1 bytes for NeonWalRecord::Postgres::will_init
297 291236 : // 8 bytes for encoding the size of the buffer
298 291236 : // length of the raw record
299 291236 : estimate += 8 + 1 + 8 + decoded.record.len();
300 291236 : }
301 : }
302 :
303 292510 : estimate
304 292510 : }
305 :
306 582568 : fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
307 582568 : blk.apply_image
308 240 : && blk.has_image
309 240 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
310 96 : && (decoded.xl_info == pg_constants::XLOG_FPI
311 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
312 : // compression of WAL is not yet supported: fall back to storing the original WAL record
313 96 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
314 : // do not materialize null pages because them most likely be soon replaced with real data
315 96 : && blk.bimg_len != 0
316 582568 : }
317 :
318 : /// Encode a list of values and metadata into a serialized batch
319 : ///
320 : /// This is used by the pageserver ingest code to conveniently generate
321 : /// batches for metadata writes.
322 9321961 : pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
323 9321961 : // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
324 9321961 : // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
325 9328501 : let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
326 9321961 : let mut buf = Vec::<u8>::with_capacity(buffer_size);
327 9321961 :
328 9321961 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
329 9321961 : let mut max_lsn: Lsn = Lsn(0);
330 9321961 : let len = batch.len();
331 18650462 : for (key, lsn, val_ser_size, val) in batch {
332 9328501 : let relative_off = buf.len() as u64;
333 9328501 :
334 9328501 : val.ser_into(&mut buf)
335 9328501 : .expect("Writing into in-memory buffer is infallible");
336 9328501 :
337 9328501 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
338 9328501 : key,
339 9328501 : lsn,
340 9328501 : batch_offset: relative_off,
341 9328501 : len: val_ser_size,
342 9328501 : will_init: val.will_init(),
343 9328501 : }));
344 9328501 : max_lsn = std::cmp::max(max_lsn, lsn);
345 9328501 : }
346 :
347 : // Assert that we didn't do any extra allocations while building buffer.
348 9321961 : debug_assert!(buf.len() <= buffer_size);
349 :
350 9321961 : if cfg!(any(debug_assertions, test)) {
351 9321961 : let batch = Self {
352 9321961 : raw: buf,
353 9321961 : metadata,
354 9321961 : max_lsn,
355 9321961 : len,
356 9321961 : };
357 9321961 :
358 9321961 : batch.validate_lsn_order();
359 9321961 :
360 9321961 : return batch;
361 0 : }
362 0 :
363 0 : Self {
364 0 : raw: buf,
365 0 : metadata,
366 0 : max_lsn,
367 0 : len,
368 0 : }
369 9321961 : }
370 :
371 : /// Add one value to the batch
372 : ///
373 : /// This is used by the pageserver ingest code to include metadata block
374 : /// updates for a single key.
375 561734 : pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
376 561734 : let relative_off = self.raw.len() as u64;
377 561734 : value.ser_into(&mut self.raw).unwrap();
378 561734 :
379 561734 : let val_ser_size = self.raw.len() - relative_off as usize;
380 561734 : self.metadata
381 561734 : .push(ValueMeta::Serialized(SerializedValueMeta {
382 561734 : key,
383 561734 : lsn,
384 561734 : batch_offset: relative_off,
385 561734 : len: val_ser_size,
386 561734 : will_init: value.will_init(),
387 561734 : }));
388 561734 :
389 561734 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
390 561734 : self.len += 1;
391 561734 :
392 561734 : if cfg!(any(debug_assertions, test)) {
393 561734 : self.validate_lsn_order();
394 561734 : }
395 561734 : }
396 :
397 : /// Extend with the contents of another batch
398 : ///
399 : /// One batch is generated for each decoded PG WAL record.
400 : /// They are then merged to accumulate reasonably sized writes.
401 529153 : pub fn extend(&mut self, mut other: SerializedValueBatch) {
402 529153 : let extend_batch_start_offset = self.raw.len() as u64;
403 529153 :
404 529153 : self.raw.extend(other.raw);
405 529153 :
406 529153 : // Shift the offsets in the batch we are extending with
407 533015 : other.metadata.iter_mut().for_each(|meta| match meta {
408 533015 : ValueMeta::Serialized(ser) => {
409 533015 : ser.batch_offset += extend_batch_start_offset;
410 533015 : if cfg!(debug_assertions) {
411 533015 : let value_end = ser.batch_offset + ser.len as u64;
412 533015 : assert!((value_end as usize) <= self.raw.len());
413 0 : }
414 : }
415 0 : ValueMeta::Observed(_) => {}
416 533015 : });
417 529153 : self.metadata.extend(other.metadata);
418 529153 :
419 529153 : self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
420 529153 :
421 529153 : self.len += other.len;
422 529153 :
423 529153 : if cfg!(any(debug_assertions, test)) {
424 529153 : self.validate_lsn_order();
425 529153 : }
426 529153 : }
427 :
428 : /// Add zero images for the (key, LSN) tuples specified
429 : ///
430 : /// PG versions below 16 do not zero out pages before extending
431 : /// a relation and may leave gaps. Such gaps need to be identified
432 : /// by the pageserver ingest logic and get patched up here.
433 : ///
434 : /// Note that this function does not validate that the gaps have been
435 : /// identified correctly (it does not know relation sizes), so it's up
436 : /// to the call-site to do it properly.
437 1 : pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
438 1 : // Implementation note:
439 1 : //
440 1 : // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
441 1 : // but the metadata entries should be ordered properly (see
442 1 : // [`SerializedValueBatch::metadata`]).
443 1 : //
444 1 : // Exploiting this observation we do:
445 1 : // 1. Drain all the metadata entries into an ordered set.
446 1 : // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
447 1 : // includes more than one update to the same block in the same WAL record.
448 1 : // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
449 1 : // and add an index entry to the ordered metadata set.
450 1 : // 3. Drain the ordered set back into a metadata vector
451 1 :
452 1 : let mut ordered_metas = self
453 1 : .metadata
454 1 : .drain(..)
455 1 : .map(OrderedValueMeta)
456 1 : .collect::<BTreeSet<_>>();
457 3 : for (keyspace, lsn) in gaps {
458 2 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
459 :
460 5 : for gap_range in keyspace.ranges {
461 3 : let mut key = gap_range.start;
462 13 : while key != gap_range.end {
463 10 : let relative_off = self.raw.len() as u64;
464 10 :
465 10 : // TODO(vlad): Can we be cheeky and write only one zero image, and
466 10 : // make all index entries requiring a zero page point to it?
467 10 : // Alternatively, we can change the index entry format to represent zero pages
468 10 : // without writing them at all.
469 10 : Value::Image(ZERO_PAGE.clone())
470 10 : .ser_into(&mut self.raw)
471 10 : .unwrap();
472 10 : let val_ser_size = self.raw.len() - relative_off as usize;
473 10 :
474 10 : ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
475 10 : SerializedValueMeta {
476 10 : key: key.to_compact(),
477 10 : lsn,
478 10 : batch_offset: relative_off,
479 10 : len: val_ser_size,
480 10 : will_init: true,
481 10 : },
482 10 : )));
483 10 :
484 10 : self.len += 1;
485 10 :
486 10 : key = key.next();
487 10 : }
488 : }
489 : }
490 :
491 19 : self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
492 1 :
493 1 : if cfg!(any(debug_assertions, test)) {
494 1 : self.validate_lsn_order();
495 1 : }
496 1 : }
497 :
498 : /// Checks if the batch contains any serialized or observed values
499 292711 : pub fn is_empty(&self) -> bool {
500 292711 : !self.has_data() && self.metadata.is_empty()
501 292711 : }
502 :
503 : /// Checks if the batch contains only observed values
504 0 : pub fn is_observed(&self) -> bool {
505 0 : !self.has_data() && !self.metadata.is_empty()
506 0 : }
507 :
508 : /// Checks if the batch contains data
509 : ///
510 : /// Note that if this returns false, it may still contain observed values or
511 : /// a metadata record.
512 10192847 : pub fn has_data(&self) -> bool {
513 10192847 : let empty = self.raw.is_empty();
514 10192847 :
515 10192847 : if cfg!(debug_assertions) && empty {
516 293114 : assert!(
517 293114 : self.metadata
518 293114 : .iter()
519 293114 : .all(|meta| matches!(meta, ValueMeta::Observed(_)))
520 293114 : );
521 9899733 : }
522 :
523 10192847 : !empty
524 10192847 : }
525 :
526 : /// Returns the number of values serialized in the batch
527 291306 : pub fn len(&self) -> usize {
528 291306 : self.len
529 291306 : }
530 :
531 : /// Returns the size of the buffer wrapped by the batch
532 9608478 : pub fn buffer_size(&self) -> usize {
533 9608478 : self.raw.len()
534 9608478 : }
535 :
536 0 : pub fn updates_key(&self, key: &Key) -> bool {
537 0 : self.metadata.iter().any(|meta| match meta {
538 0 : ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
539 0 : ValueMeta::Observed(_) => false,
540 0 : })
541 0 : }
542 :
543 10705365 : pub fn validate_lsn_order(&self) {
544 : use std::collections::HashMap;
545 :
546 10705365 : let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
547 :
548 56800287 : for meta in self.metadata.iter() {
549 56800287 : let lsn = meta.lsn();
550 56800287 : let key = meta.key();
551 :
552 56800287 : if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
553 15 : assert!(
554 15 : lsn >= prev_lsn,
555 0 : "Ordering violated by {}: {} < {}",
556 0 : Key::from_compact(key),
557 : lsn,
558 : prev_lsn
559 : );
560 56800272 : }
561 : }
562 10705365 : }
563 : }
564 :
565 : #[cfg(all(test, feature = "testing"))]
566 : mod tests {
567 : use super::*;
568 :
569 6 : fn validate_batch(
570 6 : batch: &SerializedValueBatch,
571 6 : values: &[(CompactKey, Lsn, usize, Value)],
572 6 : gaps: Option<&Vec<(KeySpace, Lsn)>>,
573 6 : ) {
574 : // Invariant 1: The metadata for a given entry in the batch
575 : // is correct and can be used to deserialize back to the original value.
576 28 : for (key, lsn, size, value) in values.iter() {
577 28 : let meta = batch
578 28 : .metadata
579 28 : .iter()
580 136 : .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
581 28 : .unwrap();
582 28 : let meta = match meta {
583 28 : ValueMeta::Serialized(ser) => ser,
584 0 : ValueMeta::Observed(_) => unreachable!(),
585 : };
586 :
587 28 : assert_eq!(meta.len, *size);
588 28 : assert_eq!(meta.will_init, value.will_init());
589 :
590 28 : let start = meta.batch_offset as usize;
591 28 : let end = meta.batch_offset as usize + meta.len;
592 28 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
593 28 : assert_eq!(&value_from_batch, value);
594 : }
595 :
596 28 : let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
597 6 : let mut gap_pages_count: usize = 0;
598 :
599 : // Invariant 2: Zero pages were added for identified gaps and their metadata
600 : // is correct.
601 6 : if let Some(gaps) = gaps {
602 3 : for (gap_keyspace, lsn) in gaps {
603 5 : for gap_range in &gap_keyspace.ranges {
604 3 : let mut gap_key = gap_range.start;
605 13 : while gap_key != gap_range.end {
606 10 : let meta = batch
607 10 : .metadata
608 10 : .iter()
609 104 : .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
610 10 : .unwrap();
611 10 : let meta = match meta {
612 10 : ValueMeta::Serialized(ser) => ser,
613 0 : ValueMeta::Observed(_) => unreachable!(),
614 : };
615 :
616 10 : let zero_value = Value::Image(ZERO_PAGE.clone());
617 10 : let zero_value_size = zero_value.serialized_size().unwrap() as usize;
618 10 :
619 10 : assert_eq!(meta.len, zero_value_size);
620 10 : assert_eq!(meta.will_init, zero_value.will_init());
621 :
622 10 : let start = meta.batch_offset as usize;
623 10 : let end = meta.batch_offset as usize + meta.len;
624 10 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
625 10 : assert_eq!(value_from_batch, zero_value);
626 :
627 10 : gap_pages_count += 1;
628 10 : expected_buffer_size += zero_value_size;
629 10 : gap_key = gap_key.next();
630 : }
631 : }
632 : }
633 5 : }
634 :
635 : // Invariant 3: The length of the batch is equal to the number
636 : // of values inserted, plus the number of gap pages. This extends
637 : // to the raw buffer size.
638 6 : assert_eq!(batch.len(), values.len() + gap_pages_count);
639 6 : assert_eq!(expected_buffer_size, batch.buffer_size());
640 :
641 : // Invariant 4: Metadata entries for any given key are sorted in LSN order.
642 6 : batch.validate_lsn_order();
643 6 : }
644 :
645 : #[test]
646 1 : fn test_creation_from_values() {
647 : const LSN: Lsn = Lsn(0x10);
648 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
649 1 :
650 1 : let values = vec![
651 1 : (
652 1 : key.to_compact(),
653 1 : LSN,
654 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
655 1 : ),
656 1 : (
657 1 : key.next().to_compact(),
658 1 : LSN,
659 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
660 1 : ),
661 1 : (
662 1 : key.to_compact(),
663 1 : Lsn(LSN.0 + 0x10),
664 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
665 1 : ),
666 1 : (
667 1 : key.next().next().to_compact(),
668 1 : LSN,
669 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
670 1 : ),
671 1 : ];
672 1 :
673 1 : let values = values
674 1 : .into_iter()
675 4 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
676 1 : .collect::<Vec<_>>();
677 1 : let batch = SerializedValueBatch::from_values(values.clone());
678 1 :
679 1 : validate_batch(&batch, &values, None);
680 1 :
681 1 : assert!(!batch.is_empty());
682 1 : }
683 :
684 : #[test]
685 1 : fn test_put() {
686 : const LSN: Lsn = Lsn(0x10);
687 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
688 1 :
689 1 : let values = vec![
690 1 : (
691 1 : key.to_compact(),
692 1 : LSN,
693 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
694 1 : ),
695 1 : (
696 1 : key.next().to_compact(),
697 1 : LSN,
698 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
699 1 : ),
700 1 : ];
701 1 :
702 1 : let mut values = values
703 1 : .into_iter()
704 2 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
705 1 : .collect::<Vec<_>>();
706 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
707 1 :
708 1 : validate_batch(&batch, &values, None);
709 1 :
710 1 : let value = (
711 1 : key.to_compact(),
712 1 : Lsn(LSN.0 + 0x10),
713 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
714 1 : );
715 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
716 1 : let value = (value.0, value.1, serialized_size, value.2);
717 1 : values.push(value.clone());
718 1 : batch.put(value.0, value.3, value.1);
719 1 :
720 1 : validate_batch(&batch, &values, None);
721 1 :
722 1 : let value = (
723 1 : key.next().next().to_compact(),
724 1 : LSN,
725 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
726 1 : );
727 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
728 1 : let value = (value.0, value.1, serialized_size, value.2);
729 1 : values.push(value.clone());
730 1 : batch.put(value.0, value.3, value.1);
731 1 :
732 1 : validate_batch(&batch, &values, None);
733 1 : }
734 :
735 : #[test]
736 1 : fn test_extension() {
737 : const LSN: Lsn = Lsn(0x10);
738 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
739 1 :
740 1 : let values = vec![
741 1 : (
742 1 : key.to_compact(),
743 1 : LSN,
744 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
745 1 : ),
746 1 : (
747 1 : key.next().to_compact(),
748 1 : LSN,
749 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
750 1 : ),
751 1 : (
752 1 : key.next().next().to_compact(),
753 1 : LSN,
754 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
755 1 : ),
756 1 : ];
757 1 :
758 1 : let mut values = values
759 1 : .into_iter()
760 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
761 1 : .collect::<Vec<_>>();
762 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
763 1 :
764 1 : let other_values = vec![
765 1 : (
766 1 : key.to_compact(),
767 1 : Lsn(LSN.0 + 0x10),
768 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
769 1 : ),
770 1 : (
771 1 : key.next().to_compact(),
772 1 : Lsn(LSN.0 + 0x10),
773 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
774 1 : ),
775 1 : (
776 1 : key.next().next().to_compact(),
777 1 : Lsn(LSN.0 + 0x10),
778 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
779 1 : ),
780 1 : ];
781 1 :
782 1 : let other_values = other_values
783 1 : .into_iter()
784 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
785 1 : .collect::<Vec<_>>();
786 1 : let other_batch = SerializedValueBatch::from_values(other_values.clone());
787 1 :
788 1 : values.extend(other_values);
789 1 : batch.extend(other_batch);
790 1 :
791 1 : validate_batch(&batch, &values, None);
792 1 : }
793 :
794 : #[test]
795 1 : fn test_gap_zeroing() {
796 : const LSN: Lsn = Lsn(0x10);
797 1 : let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
798 1 :
799 1 : let rel_bar_base_key = {
800 1 : let mut key = rel_foo_base_key;
801 1 : key.field4 += 1;
802 1 : key
803 1 : };
804 1 :
805 1 : let values = vec![
806 1 : (
807 1 : rel_foo_base_key.to_compact(),
808 1 : LSN,
809 1 : Value::WalRecord(NeonWalRecord::wal_append("foo1")),
810 1 : ),
811 1 : (
812 1 : rel_foo_base_key.add(1).to_compact(),
813 1 : LSN,
814 1 : Value::WalRecord(NeonWalRecord::wal_append("foo2")),
815 1 : ),
816 1 : (
817 1 : rel_foo_base_key.add(5).to_compact(),
818 1 : LSN,
819 1 : Value::WalRecord(NeonWalRecord::wal_append("foo3")),
820 1 : ),
821 1 : (
822 1 : rel_foo_base_key.add(1).to_compact(),
823 1 : Lsn(LSN.0 + 0x10),
824 1 : Value::WalRecord(NeonWalRecord::wal_append("foo4")),
825 1 : ),
826 1 : (
827 1 : rel_foo_base_key.add(10).to_compact(),
828 1 : Lsn(LSN.0 + 0x10),
829 1 : Value::WalRecord(NeonWalRecord::wal_append("foo5")),
830 1 : ),
831 1 : (
832 1 : rel_foo_base_key.add(11).to_compact(),
833 1 : Lsn(LSN.0 + 0x10),
834 1 : Value::WalRecord(NeonWalRecord::wal_append("foo6")),
835 1 : ),
836 1 : (
837 1 : rel_foo_base_key.add(12).to_compact(),
838 1 : Lsn(LSN.0 + 0x10),
839 1 : Value::WalRecord(NeonWalRecord::wal_append("foo7")),
840 1 : ),
841 1 : (
842 1 : rel_bar_base_key.to_compact(),
843 1 : LSN,
844 1 : Value::WalRecord(NeonWalRecord::wal_append("bar1")),
845 1 : ),
846 1 : (
847 1 : rel_bar_base_key.add(4).to_compact(),
848 1 : LSN,
849 1 : Value::WalRecord(NeonWalRecord::wal_append("bar2")),
850 1 : ),
851 1 : ];
852 1 :
853 1 : let values = values
854 1 : .into_iter()
855 9 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
856 1 : .collect::<Vec<_>>();
857 1 :
858 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
859 1 :
860 1 : let gaps = vec![
861 1 : (
862 1 : KeySpace {
863 1 : ranges: vec![
864 1 : rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
865 1 : rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
866 1 : ],
867 1 : },
868 1 : LSN,
869 1 : ),
870 1 : (
871 1 : KeySpace {
872 1 : ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
873 1 : },
874 1 : Lsn(LSN.0 + 0x10),
875 1 : ),
876 1 : ];
877 1 :
878 1 : batch.zero_gaps(gaps.clone());
879 1 : validate_batch(&batch, &values, Some(&gaps));
880 1 : }
881 : }
|