Line data Source code
1 : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
2 : //! instances. Each batch contains a raw buffer (serialized values)
3 : //! and a list of metadata for each (key, LSN) tuple present in the batch.
4 : //!
5 : //! Such batches are created from decoded PG wal records and ingested
6 : //! by the pageserver by writing directly to the ephemeral file.
7 :
8 : use std::collections::BTreeSet;
9 :
10 : use bytes::{Bytes, BytesMut};
11 : use pageserver_api::key::rel_block_to_key;
12 : use pageserver_api::keyspace::KeySpace;
13 : use pageserver_api::record::NeonWalRecord;
14 : use pageserver_api::reltag::RelTag;
15 : use pageserver_api::shard::ShardIdentity;
16 : use pageserver_api::{key::CompactKey, value::Value};
17 : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
18 : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
19 : use serde::{Deserialize, Serialize};
20 : use utils::bin_ser::BeSer;
21 : use utils::lsn::Lsn;
22 :
23 : use pageserver_api::key::Key;
24 :
25 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
26 :
27 : /// Accompanying metadata for the batch
28 : /// A value may be serialized and stored into the batch or just "observed".
29 : /// Shard 0 currently "observes" all values in order to accurately track
30 : /// relation sizes. In the case of "observed" values, we only need to know
31 : /// the key and LSN, so two types of metadata are supported to save on network
32 : /// bandwidth.
33 0 : #[derive(Serialize, Deserialize)]
34 : pub enum ValueMeta {
35 : Serialized(SerializedValueMeta),
36 : Observed(ObservedValueMeta),
37 : }
38 :
39 : impl ValueMeta {
40 28545945 : pub fn key(&self) -> CompactKey {
41 28545945 : match self {
42 28545945 : Self::Serialized(ser) => ser.key,
43 0 : Self::Observed(obs) => obs.key,
44 : }
45 28545945 : }
46 :
47 28400303 : pub fn lsn(&self) -> Lsn {
48 28400303 : match self {
49 28400303 : Self::Serialized(ser) => ser.lsn,
50 0 : Self::Observed(obs) => obs.lsn,
51 : }
52 28400303 : }
53 : }
54 :
55 : /// Wrapper around [`ValueMeta`] that implements ordering by
56 : /// (key, LSN) tuples
57 : struct OrderedValueMeta(ValueMeta);
58 :
59 : impl Ord for OrderedValueMeta {
60 59 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 59 : (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
62 59 : }
63 : }
64 :
65 : impl PartialOrd for OrderedValueMeta {
66 9 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
67 9 : Some(self.cmp(other))
68 9 : }
69 : }
70 :
71 : impl PartialEq for OrderedValueMeta {
72 8 : fn eq(&self, other: &Self) -> bool {
73 8 : (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
74 8 : }
75 : }
76 :
77 : impl Eq for OrderedValueMeta {}
78 :
79 : /// Metadata for a [`Value`] serialized into the batch.
80 0 : #[derive(Serialize, Deserialize)]
81 : pub struct SerializedValueMeta {
82 : pub key: CompactKey,
83 : pub lsn: Lsn,
84 : /// Starting offset of the value for the (key, LSN) tuple
85 : /// in [`SerializedValueBatch::raw`]
86 : pub batch_offset: u64,
87 : pub len: usize,
88 : pub will_init: bool,
89 : }
90 :
91 : /// Metadata for a [`Value`] observed by the batch
92 0 : #[derive(Serialize, Deserialize)]
93 : pub struct ObservedValueMeta {
94 : pub key: CompactKey,
95 : pub lsn: Lsn,
96 : }
97 :
98 : /// Batch of serialized [`Value`]s.
99 0 : #[derive(Serialize, Deserialize)]
100 : pub struct SerializedValueBatch {
101 : /// [`Value`]s serialized in EphemeralFile's native format,
102 : /// ready for disk write by the pageserver
103 : pub raw: Vec<u8>,
104 :
105 : /// Metadata to make sense of the bytes in [`Self::raw`]
106 : /// and represent "observed" values.
107 : ///
108 : /// Invariant: Metadata entries for any given key are ordered
109 : /// by LSN. Note that entries for a key do not have to be contiguous.
110 : pub metadata: Vec<ValueMeta>,
111 :
112 : /// The highest LSN of any value in the batch
113 : pub max_lsn: Lsn,
114 :
115 : /// Number of values encoded by [`Self::raw`]
116 : pub len: usize,
117 : }
118 :
119 : impl Default for SerializedValueBatch {
120 262188 : fn default() -> Self {
121 262188 : Self {
122 262188 : raw: Default::default(),
123 262188 : metadata: Default::default(),
124 262188 : max_lsn: Lsn(0),
125 262188 : len: 0,
126 262188 : }
127 262188 : }
128 : }
129 :
130 : impl SerializedValueBatch {
131 : /// Build a batch of serialized values from a decoded PG WAL record
132 : ///
133 : /// The batch will only contain values for keys targeting the specifiec
134 : /// shard. Shard 0 is a special case, where any keys that don't belong to
135 : /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
136 : /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
137 145852 : pub(crate) fn from_decoded_filtered(
138 145852 : decoded: DecodedWALRecord,
139 145852 : shard: &ShardIdentity,
140 145852 : next_record_lsn: Lsn,
141 145852 : pg_version: u32,
142 145852 : ) -> anyhow::Result<SerializedValueBatch> {
143 145852 : // First determine how big the buffer needs to be and allocate it up-front.
144 145852 : // This duplicates some of the work below, but it's empirically much faster.
145 145852 : let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
146 145852 : let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
147 145852 :
148 145852 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
149 145852 : let mut max_lsn: Lsn = Lsn(0);
150 145852 : let mut len: usize = 0;
151 145852 : for blk in decoded.blocks.iter() {
152 145642 : let relative_off = buf.len() as u64;
153 145642 :
154 145642 : let rel = RelTag {
155 145642 : spcnode: blk.rnode_spcnode,
156 145642 : dbnode: blk.rnode_dbnode,
157 145642 : relnode: blk.rnode_relnode,
158 145642 : forknum: blk.forknum,
159 145642 : };
160 145642 :
161 145642 : let key = rel_block_to_key(rel, blk.blkno);
162 145642 :
163 145642 : if !key.is_valid_key_on_write_path() {
164 0 : anyhow::bail!(
165 0 : "Unsupported key decoded at LSN {}: {}",
166 0 : next_record_lsn,
167 0 : key
168 0 : );
169 145642 : }
170 145642 :
171 145642 : let key_is_local = shard.is_key_local(&key);
172 145642 :
173 145642 : tracing::debug!(
174 : lsn=%next_record_lsn,
175 : key=%key,
176 0 : "ingest: shard decision {}",
177 0 : if !key_is_local { "drop" } else { "keep" },
178 : );
179 :
180 145642 : if !key_is_local {
181 0 : if shard.is_shard_zero() {
182 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
183 : // its blkno in case it implicitly extends a relation.
184 0 : metadata.push(ValueMeta::Observed(ObservedValueMeta {
185 0 : key: key.to_compact(),
186 0 : lsn: next_record_lsn,
187 0 : }))
188 0 : }
189 :
190 0 : continue;
191 145642 : }
192 :
193 : // Instead of storing full-page-image WAL record,
194 : // it is better to store extracted image: we can skip wal-redo
195 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
196 : // so them have to be copied multiple times.
197 : //
198 145642 : let val = if Self::block_is_image(&decoded, blk, pg_version) {
199 : // Extract page image from FPI record
200 24 : let img_len = blk.bimg_len as usize;
201 24 : let img_offs = blk.bimg_offset as usize;
202 24 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
203 24 : // TODO(vlad): skip the copy
204 24 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
205 24 :
206 24 : if blk.hole_length != 0 {
207 0 : let tail = image.split_off(blk.hole_offset as usize);
208 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
209 0 : image.unsplit(tail);
210 24 : }
211 : //
212 : // Match the logic of XLogReadBufferForRedoExtended:
213 : // The page may be uninitialized. If so, we can't set the LSN because
214 : // that would corrupt the page.
215 : //
216 24 : if !page_is_new(&image) {
217 18 : page_set_lsn(&mut image, next_record_lsn)
218 6 : }
219 24 : assert_eq!(image.len(), BLCKSZ as usize);
220 :
221 24 : Value::Image(image.freeze())
222 : } else {
223 : Value::WalRecord(NeonWalRecord::Postgres {
224 145618 : will_init: blk.will_init || blk.apply_image,
225 145618 : rec: decoded.record.clone(),
226 : })
227 : };
228 :
229 145642 : val.ser_into(&mut buf)
230 145642 : .expect("Writing into in-memory buffer is infallible");
231 145642 :
232 145642 : let val_ser_size = buf.len() - relative_off as usize;
233 145642 :
234 145642 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
235 145642 : key: key.to_compact(),
236 145642 : lsn: next_record_lsn,
237 145642 : batch_offset: relative_off,
238 145642 : len: val_ser_size,
239 145642 : will_init: val.will_init(),
240 145642 : }));
241 145642 : max_lsn = std::cmp::max(max_lsn, next_record_lsn);
242 145642 : len += 1;
243 : }
244 :
245 145852 : if cfg!(any(debug_assertions, test)) {
246 145852 : let batch = Self {
247 145852 : raw: buf,
248 145852 : metadata,
249 145852 : max_lsn,
250 145852 : len,
251 145852 : };
252 145852 :
253 145852 : batch.validate_lsn_order();
254 145852 :
255 145852 : return Ok(batch);
256 0 : }
257 0 :
258 0 : Ok(Self {
259 0 : raw: buf,
260 0 : metadata,
261 0 : max_lsn,
262 0 : len,
263 0 : })
264 145852 : }
265 :
266 : /// Look into the decoded PG WAL record and determine
267 : /// roughly how large the buffer for serialized values needs to be.
268 145852 : fn estimate_buffer_size(
269 145852 : decoded: &DecodedWALRecord,
270 145852 : shard: &ShardIdentity,
271 145852 : pg_version: u32,
272 145852 : ) -> usize {
273 145852 : let mut estimate: usize = 0;
274 :
275 145852 : for blk in decoded.blocks.iter() {
276 145642 : let rel = RelTag {
277 145642 : spcnode: blk.rnode_spcnode,
278 145642 : dbnode: blk.rnode_dbnode,
279 145642 : relnode: blk.rnode_relnode,
280 145642 : forknum: blk.forknum,
281 145642 : };
282 145642 :
283 145642 : let key = rel_block_to_key(rel, blk.blkno);
284 145642 :
285 145642 : if !shard.is_key_local(&key) {
286 0 : continue;
287 145642 : }
288 145642 :
289 145642 : if Self::block_is_image(decoded, blk, pg_version) {
290 24 : // 4 bytes for the Value::Image discriminator
291 24 : // 8 bytes for encoding the size of the buffer
292 24 : // BLCKSZ for the raw image
293 24 : estimate += (4 + 8 + BLCKSZ) as usize;
294 145618 : } else {
295 145618 : // 4 bytes for the Value::WalRecord discriminator
296 145618 : // 4 bytes for the NeonWalRecord::Postgres discriminator
297 145618 : // 1 bytes for NeonWalRecord::Postgres::will_init
298 145618 : // 8 bytes for encoding the size of the buffer
299 145618 : // length of the raw record
300 145618 : estimate += 8 + 1 + 8 + decoded.record.len();
301 145618 : }
302 : }
303 :
304 145852 : estimate
305 145852 : }
306 :
307 291284 : fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
308 291284 : blk.apply_image
309 120 : && blk.has_image
310 120 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
311 48 : && (decoded.xl_info == pg_constants::XLOG_FPI
312 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
313 : // compression of WAL is not yet supported: fall back to storing the original WAL record
314 48 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
315 : // do not materialize null pages because them most likely be soon replaced with real data
316 48 : && blk.bimg_len != 0
317 291284 : }
318 :
319 : /// Encode a list of values and metadata into a serialized batch
320 : ///
321 : /// This is used by the pageserver ingest code to conveniently generate
322 : /// batches for metadata writes.
323 4660945 : pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
324 4660945 : // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
325 4660945 : // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
326 4664001 : let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
327 4660945 : let mut buf = Vec::<u8>::with_capacity(buffer_size);
328 4660945 :
329 4660945 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
330 4660945 : let mut max_lsn: Lsn = Lsn(0);
331 4660945 : let len = batch.len();
332 9324946 : for (key, lsn, val_ser_size, val) in batch {
333 4664001 : let relative_off = buf.len() as u64;
334 4664001 :
335 4664001 : val.ser_into(&mut buf)
336 4664001 : .expect("Writing into in-memory buffer is infallible");
337 4664001 :
338 4664001 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
339 4664001 : key,
340 4664001 : lsn,
341 4664001 : batch_offset: relative_off,
342 4664001 : len: val_ser_size,
343 4664001 : will_init: val.will_init(),
344 4664001 : }));
345 4664001 : max_lsn = std::cmp::max(max_lsn, lsn);
346 4664001 : }
347 :
348 : // Assert that we didn't do any extra allocations while building buffer.
349 4660945 : debug_assert!(buf.len() <= buffer_size);
350 :
351 4660945 : if cfg!(any(debug_assertions, test)) {
352 4660945 : let batch = Self {
353 4660945 : raw: buf,
354 4660945 : metadata,
355 4660945 : max_lsn,
356 4660945 : len,
357 4660945 : };
358 4660945 :
359 4660945 : batch.validate_lsn_order();
360 4660945 :
361 4660945 : return batch;
362 0 : }
363 0 :
364 0 : Self {
365 0 : raw: buf,
366 0 : metadata,
367 0 : max_lsn,
368 0 : len,
369 0 : }
370 4660945 : }
371 :
372 : /// Add one value to the batch
373 : ///
374 : /// This is used by the pageserver ingest code to include metadata block
375 : /// updates for a single key.
376 280868 : pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
377 280868 : let relative_off = self.raw.len() as u64;
378 280868 : value.ser_into(&mut self.raw).unwrap();
379 280868 :
380 280868 : let val_ser_size = self.raw.len() - relative_off as usize;
381 280868 : self.metadata
382 280868 : .push(ValueMeta::Serialized(SerializedValueMeta {
383 280868 : key,
384 280868 : lsn,
385 280868 : batch_offset: relative_off,
386 280868 : len: val_ser_size,
387 280868 : will_init: value.will_init(),
388 280868 : }));
389 280868 :
390 280868 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
391 280868 : self.len += 1;
392 280868 :
393 280868 : if cfg!(any(debug_assertions, test)) {
394 280868 : self.validate_lsn_order();
395 280868 : }
396 280868 : }
397 :
398 : /// Extend with the contents of another batch
399 : ///
400 : /// One batch is generated for each decoded PG WAL record.
401 : /// They are then merged to accumulate reasonably sized writes.
402 264577 : pub fn extend(&mut self, mut other: SerializedValueBatch) {
403 264577 : let extend_batch_start_offset = self.raw.len() as u64;
404 264577 :
405 264577 : self.raw.extend(other.raw);
406 264577 :
407 264577 : // Shift the offsets in the batch we are extending with
408 266509 : other.metadata.iter_mut().for_each(|meta| match meta {
409 266509 : ValueMeta::Serialized(ser) => {
410 266509 : ser.batch_offset += extend_batch_start_offset;
411 266509 : if cfg!(debug_assertions) {
412 266509 : let value_end = ser.batch_offset + ser.len as u64;
413 266509 : assert!((value_end as usize) <= self.raw.len());
414 0 : }
415 : }
416 0 : ValueMeta::Observed(_) => {}
417 266509 : });
418 264577 : self.metadata.extend(other.metadata);
419 264577 :
420 264577 : self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
421 264577 :
422 264577 : self.len += other.len;
423 264577 :
424 264577 : if cfg!(any(debug_assertions, test)) {
425 264577 : self.validate_lsn_order();
426 264577 : }
427 264577 : }
428 :
429 : /// Add zero images for the (key, LSN) tuples specified
430 : ///
431 : /// PG versions below 16 do not zero out pages before extending
432 : /// a relation and may leave gaps. Such gaps need to be identified
433 : /// by the pageserver ingest logic and get patched up here.
434 : ///
435 : /// Note that this function does not validate that the gaps have been
436 : /// identified correctly (it does not know relation sizes), so it's up
437 : /// to the call-site to do it properly.
438 1 : pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
439 1 : // Implementation note:
440 1 : //
441 1 : // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
442 1 : // but the metadata entries should be ordered properly (see
443 1 : // [`SerializedValueBatch::metadata`]).
444 1 : //
445 1 : // Exploiting this observation we do:
446 1 : // 1. Drain all the metadata entries into an ordered set.
447 1 : // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
448 1 : // includes more than one update to the same block in the same WAL record.
449 1 : // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
450 1 : // and add an index entry to the ordered metadata set.
451 1 : // 3. Drain the ordered set back into a metadata vector
452 1 :
453 1 : let mut ordered_metas = self
454 1 : .metadata
455 1 : .drain(..)
456 1 : .map(OrderedValueMeta)
457 1 : .collect::<BTreeSet<_>>();
458 3 : for (keyspace, lsn) in gaps {
459 2 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
460 :
461 5 : for gap_range in keyspace.ranges {
462 3 : let mut key = gap_range.start;
463 13 : while key != gap_range.end {
464 10 : let relative_off = self.raw.len() as u64;
465 10 :
466 10 : // TODO(vlad): Can we be cheeky and write only one zero image, and
467 10 : // make all index entries requiring a zero page point to it?
468 10 : // Alternatively, we can change the index entry format to represent zero pages
469 10 : // without writing them at all.
470 10 : Value::Image(ZERO_PAGE.clone())
471 10 : .ser_into(&mut self.raw)
472 10 : .unwrap();
473 10 : let val_ser_size = self.raw.len() - relative_off as usize;
474 10 :
475 10 : ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
476 10 : SerializedValueMeta {
477 10 : key: key.to_compact(),
478 10 : lsn,
479 10 : batch_offset: relative_off,
480 10 : len: val_ser_size,
481 10 : will_init: true,
482 10 : },
483 10 : )));
484 10 :
485 10 : self.len += 1;
486 10 :
487 10 : key = key.next();
488 10 : }
489 : }
490 : }
491 :
492 19 : self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
493 1 :
494 1 : if cfg!(any(debug_assertions, test)) {
495 1 : self.validate_lsn_order();
496 1 : }
497 1 : }
498 :
499 : /// Checks if the batch contains any serialized or observed values
500 1 : pub fn is_empty(&self) -> bool {
501 1 : !self.has_data() && self.metadata.is_empty()
502 1 : }
503 :
504 : /// Checks if the batch contains data
505 : ///
506 : /// Note that if this returns false, it may still contain observed values or
507 : /// a metadata record.
508 4950035 : pub fn has_data(&self) -> bool {
509 4950035 : let empty = self.raw.is_empty();
510 4950035 :
511 4950035 : if cfg!(debug_assertions) && empty {
512 202 : assert!(self
513 202 : .metadata
514 202 : .iter()
515 202 : .all(|meta| matches!(meta, ValueMeta::Observed(_))));
516 4949833 : }
517 :
518 4950035 : !empty
519 4950035 : }
520 :
521 : /// Returns the number of values serialized in the batch
522 145656 : pub fn len(&self) -> usize {
523 145656 : self.len
524 145656 : }
525 :
526 : /// Returns the size of the buffer wrapped by the batch
527 4804208 : pub fn buffer_size(&self) -> usize {
528 4804208 : self.raw.len()
529 4804208 : }
530 :
531 0 : pub fn updates_key(&self, key: &Key) -> bool {
532 0 : self.metadata.iter().any(|meta| match meta {
533 0 : ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
534 0 : ValueMeta::Observed(_) => false,
535 0 : })
536 0 : }
537 :
538 5352249 : pub fn validate_lsn_order(&self) {
539 : use std::collections::HashMap;
540 :
541 5352249 : let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
542 :
543 28399929 : for meta in self.metadata.iter() {
544 28399929 : let lsn = meta.lsn();
545 28399929 : let key = meta.key();
546 :
547 28399929 : if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
548 15 : assert!(
549 15 : lsn >= prev_lsn,
550 0 : "Ordering violated by {}: {} < {}",
551 0 : Key::from_compact(key),
552 : lsn,
553 : prev_lsn
554 : );
555 28399914 : }
556 : }
557 5352249 : }
558 : }
559 :
560 : #[cfg(all(test, feature = "testing"))]
561 : mod tests {
562 : use super::*;
563 :
564 6 : fn validate_batch(
565 6 : batch: &SerializedValueBatch,
566 6 : values: &[(CompactKey, Lsn, usize, Value)],
567 6 : gaps: Option<&Vec<(KeySpace, Lsn)>>,
568 6 : ) {
569 : // Invariant 1: The metadata for a given entry in the batch
570 : // is correct and can be used to deserialize back to the original value.
571 28 : for (key, lsn, size, value) in values.iter() {
572 28 : let meta = batch
573 28 : .metadata
574 28 : .iter()
575 136 : .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
576 28 : .unwrap();
577 28 : let meta = match meta {
578 28 : ValueMeta::Serialized(ser) => ser,
579 0 : ValueMeta::Observed(_) => unreachable!(),
580 : };
581 :
582 28 : assert_eq!(meta.len, *size);
583 28 : assert_eq!(meta.will_init, value.will_init());
584 :
585 28 : let start = meta.batch_offset as usize;
586 28 : let end = meta.batch_offset as usize + meta.len;
587 28 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
588 28 : assert_eq!(&value_from_batch, value);
589 : }
590 :
591 28 : let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
592 6 : let mut gap_pages_count: usize = 0;
593 :
594 : // Invariant 2: Zero pages were added for identified gaps and their metadata
595 : // is correct.
596 6 : if let Some(gaps) = gaps {
597 3 : for (gap_keyspace, lsn) in gaps {
598 5 : for gap_range in &gap_keyspace.ranges {
599 3 : let mut gap_key = gap_range.start;
600 13 : while gap_key != gap_range.end {
601 10 : let meta = batch
602 10 : .metadata
603 10 : .iter()
604 104 : .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
605 10 : .unwrap();
606 10 : let meta = match meta {
607 10 : ValueMeta::Serialized(ser) => ser,
608 0 : ValueMeta::Observed(_) => unreachable!(),
609 : };
610 :
611 10 : let zero_value = Value::Image(ZERO_PAGE.clone());
612 10 : let zero_value_size = zero_value.serialized_size().unwrap() as usize;
613 10 :
614 10 : assert_eq!(meta.len, zero_value_size);
615 10 : assert_eq!(meta.will_init, zero_value.will_init());
616 :
617 10 : let start = meta.batch_offset as usize;
618 10 : let end = meta.batch_offset as usize + meta.len;
619 10 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
620 10 : assert_eq!(value_from_batch, zero_value);
621 :
622 10 : gap_pages_count += 1;
623 10 : expected_buffer_size += zero_value_size;
624 10 : gap_key = gap_key.next();
625 : }
626 : }
627 : }
628 5 : }
629 :
630 : // Invariant 3: The length of the batch is equal to the number
631 : // of values inserted, plus the number of gap pages. This extends
632 : // to the raw buffer size.
633 6 : assert_eq!(batch.len(), values.len() + gap_pages_count);
634 6 : assert_eq!(expected_buffer_size, batch.buffer_size());
635 :
636 : // Invariant 4: Metadata entries for any given key are sorted in LSN order.
637 6 : batch.validate_lsn_order();
638 6 : }
639 :
640 : #[test]
641 1 : fn test_creation_from_values() {
642 : const LSN: Lsn = Lsn(0x10);
643 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
644 1 :
645 1 : let values = vec![
646 1 : (
647 1 : key.to_compact(),
648 1 : LSN,
649 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
650 1 : ),
651 1 : (
652 1 : key.next().to_compact(),
653 1 : LSN,
654 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
655 1 : ),
656 1 : (
657 1 : key.to_compact(),
658 1 : Lsn(LSN.0 + 0x10),
659 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
660 1 : ),
661 1 : (
662 1 : key.next().next().to_compact(),
663 1 : LSN,
664 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
665 1 : ),
666 1 : ];
667 1 :
668 1 : let values = values
669 1 : .into_iter()
670 4 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
671 1 : .collect::<Vec<_>>();
672 1 : let batch = SerializedValueBatch::from_values(values.clone());
673 1 :
674 1 : validate_batch(&batch, &values, None);
675 1 :
676 1 : assert!(!batch.is_empty());
677 1 : }
678 :
679 : #[test]
680 1 : fn test_put() {
681 : const LSN: Lsn = Lsn(0x10);
682 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
683 1 :
684 1 : let values = vec![
685 1 : (
686 1 : key.to_compact(),
687 1 : LSN,
688 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
689 1 : ),
690 1 : (
691 1 : key.next().to_compact(),
692 1 : LSN,
693 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
694 1 : ),
695 1 : ];
696 1 :
697 1 : let mut values = values
698 1 : .into_iter()
699 2 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
700 1 : .collect::<Vec<_>>();
701 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
702 1 :
703 1 : validate_batch(&batch, &values, None);
704 1 :
705 1 : let value = (
706 1 : key.to_compact(),
707 1 : Lsn(LSN.0 + 0x10),
708 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
709 1 : );
710 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
711 1 : let value = (value.0, value.1, serialized_size, value.2);
712 1 : values.push(value.clone());
713 1 : batch.put(value.0, value.3, value.1);
714 1 :
715 1 : validate_batch(&batch, &values, None);
716 1 :
717 1 : let value = (
718 1 : key.next().next().to_compact(),
719 1 : LSN,
720 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
721 1 : );
722 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
723 1 : let value = (value.0, value.1, serialized_size, value.2);
724 1 : values.push(value.clone());
725 1 : batch.put(value.0, value.3, value.1);
726 1 :
727 1 : validate_batch(&batch, &values, None);
728 1 : }
729 :
730 : #[test]
731 1 : fn test_extension() {
732 : const LSN: Lsn = Lsn(0x10);
733 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
734 1 :
735 1 : let values = vec![
736 1 : (
737 1 : key.to_compact(),
738 1 : LSN,
739 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
740 1 : ),
741 1 : (
742 1 : key.next().to_compact(),
743 1 : LSN,
744 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
745 1 : ),
746 1 : (
747 1 : key.next().next().to_compact(),
748 1 : LSN,
749 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
750 1 : ),
751 1 : ];
752 1 :
753 1 : let mut values = values
754 1 : .into_iter()
755 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
756 1 : .collect::<Vec<_>>();
757 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
758 1 :
759 1 : let other_values = vec![
760 1 : (
761 1 : key.to_compact(),
762 1 : Lsn(LSN.0 + 0x10),
763 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
764 1 : ),
765 1 : (
766 1 : key.next().to_compact(),
767 1 : Lsn(LSN.0 + 0x10),
768 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
769 1 : ),
770 1 : (
771 1 : key.next().next().to_compact(),
772 1 : Lsn(LSN.0 + 0x10),
773 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
774 1 : ),
775 1 : ];
776 1 :
777 1 : let other_values = other_values
778 1 : .into_iter()
779 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
780 1 : .collect::<Vec<_>>();
781 1 : let other_batch = SerializedValueBatch::from_values(other_values.clone());
782 1 :
783 1 : values.extend(other_values);
784 1 : batch.extend(other_batch);
785 1 :
786 1 : validate_batch(&batch, &values, None);
787 1 : }
788 :
789 : #[test]
790 1 : fn test_gap_zeroing() {
791 : const LSN: Lsn = Lsn(0x10);
792 1 : let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
793 1 :
794 1 : let rel_bar_base_key = {
795 1 : let mut key = rel_foo_base_key;
796 1 : key.field4 += 1;
797 1 : key
798 1 : };
799 1 :
800 1 : let values = vec![
801 1 : (
802 1 : rel_foo_base_key.to_compact(),
803 1 : LSN,
804 1 : Value::WalRecord(NeonWalRecord::wal_append("foo1")),
805 1 : ),
806 1 : (
807 1 : rel_foo_base_key.add(1).to_compact(),
808 1 : LSN,
809 1 : Value::WalRecord(NeonWalRecord::wal_append("foo2")),
810 1 : ),
811 1 : (
812 1 : rel_foo_base_key.add(5).to_compact(),
813 1 : LSN,
814 1 : Value::WalRecord(NeonWalRecord::wal_append("foo3")),
815 1 : ),
816 1 : (
817 1 : rel_foo_base_key.add(1).to_compact(),
818 1 : Lsn(LSN.0 + 0x10),
819 1 : Value::WalRecord(NeonWalRecord::wal_append("foo4")),
820 1 : ),
821 1 : (
822 1 : rel_foo_base_key.add(10).to_compact(),
823 1 : Lsn(LSN.0 + 0x10),
824 1 : Value::WalRecord(NeonWalRecord::wal_append("foo5")),
825 1 : ),
826 1 : (
827 1 : rel_foo_base_key.add(11).to_compact(),
828 1 : Lsn(LSN.0 + 0x10),
829 1 : Value::WalRecord(NeonWalRecord::wal_append("foo6")),
830 1 : ),
831 1 : (
832 1 : rel_foo_base_key.add(12).to_compact(),
833 1 : Lsn(LSN.0 + 0x10),
834 1 : Value::WalRecord(NeonWalRecord::wal_append("foo7")),
835 1 : ),
836 1 : (
837 1 : rel_bar_base_key.to_compact(),
838 1 : LSN,
839 1 : Value::WalRecord(NeonWalRecord::wal_append("bar1")),
840 1 : ),
841 1 : (
842 1 : rel_bar_base_key.add(4).to_compact(),
843 1 : LSN,
844 1 : Value::WalRecord(NeonWalRecord::wal_append("bar2")),
845 1 : ),
846 1 : ];
847 1 :
848 1 : let values = values
849 1 : .into_iter()
850 9 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
851 1 : .collect::<Vec<_>>();
852 1 :
853 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
854 1 :
855 1 : let gaps = vec![
856 1 : (
857 1 : KeySpace {
858 1 : ranges: vec![
859 1 : rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
860 1 : rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
861 1 : ],
862 1 : },
863 1 : LSN,
864 1 : ),
865 1 : (
866 1 : KeySpace {
867 1 : ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
868 1 : },
869 1 : Lsn(LSN.0 + 0x10),
870 1 : ),
871 1 : ];
872 1 :
873 1 : batch.zero_gaps(gaps.clone());
874 1 : validate_batch(&batch, &values, Some(&gaps));
875 1 : }
876 : }
|