Line data Source code
1 : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
2 : //! instances. Each batch contains a raw buffer (serialized values)
3 : //! and a list of metadata for each (key, LSN) tuple present in the batch.
4 : //!
5 : //! Such batches are created from decoded PG wal records and ingested
6 : //! by the pageserver by writing directly to the ephemeral file.
7 :
8 : use std::collections::{BTreeSet, HashMap};
9 :
10 : use bytes::{Bytes, BytesMut};
11 : use pageserver_api::key::rel_block_to_key;
12 : use pageserver_api::keyspace::KeySpace;
13 : use pageserver_api::record::NeonWalRecord;
14 : use pageserver_api::reltag::RelTag;
15 : use pageserver_api::shard::ShardIdentity;
16 : use pageserver_api::{key::CompactKey, value::Value};
17 : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
18 : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
19 : use serde::{Deserialize, Serialize};
20 : use utils::bin_ser::BeSer;
21 : use utils::lsn::Lsn;
22 :
23 : use pageserver_api::key::Key;
24 :
25 : use crate::models::InterpretedWalRecord;
26 :
27 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
28 :
29 : /// Accompanying metadata for the batch
30 : /// A value may be serialized and stored into the batch or just "observed".
31 : /// Shard 0 currently "observes" all values in order to accurately track
32 : /// relation sizes. In the case of "observed" values, we only need to know
33 : /// the key and LSN, so two types of metadata are supported to save on network
34 : /// bandwidth.
35 0 : #[derive(Serialize, Deserialize, Clone)]
36 : pub enum ValueMeta {
37 : Serialized(SerializedValueMeta),
38 : Observed(ObservedValueMeta),
39 : }
40 :
41 : impl ValueMeta {
42 57091809 : pub fn key(&self) -> CompactKey {
43 57091809 : match self {
44 57091809 : Self::Serialized(ser) => ser.key,
45 0 : Self::Observed(obs) => obs.key,
46 : }
47 57091809 : }
48 :
49 56800525 : pub fn lsn(&self) -> Lsn {
50 56800525 : match self {
51 56800525 : Self::Serialized(ser) => ser.lsn,
52 0 : Self::Observed(obs) => obs.lsn,
53 : }
54 56800525 : }
55 : }
56 :
57 : /// Wrapper around [`ValueMeta`] that implements ordering by
58 : /// (key, LSN) tuples
59 : struct OrderedValueMeta(ValueMeta);
60 :
61 : impl Ord for OrderedValueMeta {
62 59 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63 59 : (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
64 59 : }
65 : }
66 :
67 : impl PartialOrd for OrderedValueMeta {
68 9 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
69 9 : Some(self.cmp(other))
70 9 : }
71 : }
72 :
73 : impl PartialEq for OrderedValueMeta {
74 8 : fn eq(&self, other: &Self) -> bool {
75 8 : (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
76 8 : }
77 : }
78 :
79 : impl Eq for OrderedValueMeta {}
80 :
81 : /// Metadata for a [`Value`] serialized into the batch.
82 0 : #[derive(Serialize, Deserialize, Clone)]
83 : pub struct SerializedValueMeta {
84 : pub key: CompactKey,
85 : pub lsn: Lsn,
86 : /// Starting offset of the value for the (key, LSN) tuple
87 : /// in [`SerializedValueBatch::raw`]
88 : pub batch_offset: u64,
89 : pub len: usize,
90 : pub will_init: bool,
91 : }
92 :
93 : /// Metadata for a [`Value`] observed by the batch
94 0 : #[derive(Serialize, Deserialize, Clone)]
95 : pub struct ObservedValueMeta {
96 : pub key: CompactKey,
97 : pub lsn: Lsn,
98 : }
99 :
100 : /// Batch of serialized [`Value`]s.
101 0 : #[derive(Serialize, Deserialize, Clone)]
102 : pub struct SerializedValueBatch {
103 : /// [`Value`]s serialized in EphemeralFile's native format,
104 : /// ready for disk write by the pageserver
105 : pub raw: Vec<u8>,
106 :
107 : /// Metadata to make sense of the bytes in [`Self::raw`]
108 : /// and represent "observed" values.
109 : ///
110 : /// Invariant: Metadata entries for any given key are ordered
111 : /// by LSN. Note that entries for a key do not have to be contiguous.
112 : pub metadata: Vec<ValueMeta>,
113 :
114 : /// The highest LSN of any value in the batch
115 : pub max_lsn: Lsn,
116 :
117 : /// Number of values encoded by [`Self::raw`]
118 : pub len: usize,
119 : }
120 :
121 : impl Default for SerializedValueBatch {
122 816875 : fn default() -> Self {
123 816875 : Self {
124 816875 : raw: Default::default(),
125 816875 : metadata: Default::default(),
126 816875 : max_lsn: Lsn(0),
127 816875 : len: 0,
128 816875 : }
129 816875 : }
130 : }
131 :
132 : impl SerializedValueBatch {
133 : /// Populates the given `shard_records` with value batches from this WAL record, if any,
134 : /// discarding those belonging to other shards.
135 : ///
136 : /// The batch will only contain values for keys targeting the specifiec
137 : /// shard. Shard 0 is a special case, where any keys that don't belong to
138 : /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
139 : /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
140 292300 : pub(crate) fn from_decoded_filtered(
141 292300 : decoded: DecodedWALRecord,
142 292300 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
143 292300 : next_record_lsn: Lsn,
144 292300 : pg_version: u32,
145 292300 : ) -> anyhow::Result<()> {
146 : // First determine how big the buffers need to be and allocate it up-front.
147 : // This duplicates some of the work below, but it's empirically much faster.
148 292499 : for (shard, record) in shard_records.iter_mut() {
149 292499 : assert!(record.batch.is_empty());
150 :
151 292499 : let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
152 292499 : record.batch.raw = Vec::with_capacity(estimate);
153 : }
154 :
155 292300 : for blk in decoded.blocks.iter() {
156 291284 : let rel = RelTag {
157 291284 : spcnode: blk.rnode_spcnode,
158 291284 : dbnode: blk.rnode_dbnode,
159 291284 : relnode: blk.rnode_relnode,
160 291284 : forknum: blk.forknum,
161 291284 : };
162 291284 :
163 291284 : let key = rel_block_to_key(rel, blk.blkno);
164 291284 :
165 291284 : if !key.is_valid_key_on_write_path() {
166 0 : anyhow::bail!(
167 0 : "Unsupported key decoded at LSN {}: {}",
168 0 : next_record_lsn,
169 0 : key
170 0 : );
171 291284 : }
172 :
173 291284 : for (shard, record) in shard_records.iter_mut() {
174 291284 : let key_is_local = shard.is_key_local(&key);
175 291284 :
176 291284 : tracing::debug!(
177 : lsn=%next_record_lsn,
178 : key=%key,
179 0 : "ingest: shard decision {}",
180 0 : if !key_is_local { "drop" } else { "keep" },
181 : );
182 :
183 291284 : if !key_is_local {
184 0 : if shard.is_shard_zero() {
185 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
186 : // its blkno in case it implicitly extends a relation.
187 0 : record
188 0 : .batch
189 0 : .metadata
190 0 : .push(ValueMeta::Observed(ObservedValueMeta {
191 0 : key: key.to_compact(),
192 0 : lsn: next_record_lsn,
193 0 : }))
194 0 : }
195 :
196 0 : continue;
197 291284 : }
198 :
199 : // Instead of storing full-page-image WAL record,
200 : // it is better to store extracted image: we can skip wal-redo
201 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
202 : // so them have to be copied multiple times.
203 : //
204 291284 : let val = if Self::block_is_image(&decoded, blk, pg_version) {
205 : // Extract page image from FPI record
206 48 : let img_len = blk.bimg_len as usize;
207 48 : let img_offs = blk.bimg_offset as usize;
208 48 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
209 48 : // TODO(vlad): skip the copy
210 48 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
211 48 :
212 48 : if blk.hole_length != 0 {
213 0 : let tail = image.split_off(blk.hole_offset as usize);
214 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
215 0 : image.unsplit(tail);
216 48 : }
217 : //
218 : // Match the logic of XLogReadBufferForRedoExtended:
219 : // The page may be uninitialized. If so, we can't set the LSN because
220 : // that would corrupt the page.
221 : //
222 48 : if !page_is_new(&image) {
223 36 : page_set_lsn(&mut image, next_record_lsn)
224 12 : }
225 48 : assert_eq!(image.len(), BLCKSZ as usize);
226 :
227 48 : Value::Image(image.freeze())
228 : } else {
229 : Value::WalRecord(NeonWalRecord::Postgres {
230 291236 : will_init: blk.will_init || blk.apply_image,
231 291236 : rec: decoded.record.clone(),
232 : })
233 : };
234 :
235 291284 : let relative_off = record.batch.raw.len() as u64;
236 291284 :
237 291284 : val.ser_into(&mut record.batch.raw)
238 291284 : .expect("Writing into in-memory buffer is infallible");
239 291284 :
240 291284 : let val_ser_size = record.batch.raw.len() - relative_off as usize;
241 291284 :
242 291284 : record
243 291284 : .batch
244 291284 : .metadata
245 291284 : .push(ValueMeta::Serialized(SerializedValueMeta {
246 291284 : key: key.to_compact(),
247 291284 : lsn: next_record_lsn,
248 291284 : batch_offset: relative_off,
249 291284 : len: val_ser_size,
250 291284 : will_init: val.will_init(),
251 291284 : }));
252 291284 : record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
253 291284 : record.batch.len += 1;
254 : }
255 : }
256 :
257 292300 : if cfg!(any(debug_assertions, test)) {
258 : // Validate that the batches are correct
259 292499 : for record in shard_records.values() {
260 292499 : record.batch.validate_lsn_order();
261 292499 : }
262 0 : }
263 :
264 292300 : Ok(())
265 292300 : }
266 :
267 : /// Look into the decoded PG WAL record and determine
268 : /// roughly how large the buffer for serialized values needs to be.
269 292499 : fn estimate_buffer_size(
270 292499 : decoded: &DecodedWALRecord,
271 292499 : shard: &ShardIdentity,
272 292499 : pg_version: u32,
273 292499 : ) -> usize {
274 292499 : let mut estimate: usize = 0;
275 :
276 292499 : for blk in decoded.blocks.iter() {
277 291284 : let rel = RelTag {
278 291284 : spcnode: blk.rnode_spcnode,
279 291284 : dbnode: blk.rnode_dbnode,
280 291284 : relnode: blk.rnode_relnode,
281 291284 : forknum: blk.forknum,
282 291284 : };
283 291284 :
284 291284 : let key = rel_block_to_key(rel, blk.blkno);
285 291284 :
286 291284 : if !shard.is_key_local(&key) {
287 0 : continue;
288 291284 : }
289 291284 :
290 291284 : if Self::block_is_image(decoded, blk, pg_version) {
291 48 : // 4 bytes for the Value::Image discriminator
292 48 : // 8 bytes for encoding the size of the buffer
293 48 : // BLCKSZ for the raw image
294 48 : estimate += (4 + 8 + BLCKSZ) as usize;
295 291236 : } else {
296 291236 : // 4 bytes for the Value::WalRecord discriminator
297 291236 : // 4 bytes for the NeonWalRecord::Postgres discriminator
298 291236 : // 1 bytes for NeonWalRecord::Postgres::will_init
299 291236 : // 8 bytes for encoding the size of the buffer
300 291236 : // length of the raw record
301 291236 : estimate += 8 + 1 + 8 + decoded.record.len();
302 291236 : }
303 : }
304 :
305 292499 : estimate
306 292499 : }
307 :
308 582568 : fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
309 582568 : blk.apply_image
310 240 : && blk.has_image
311 240 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
312 96 : && (decoded.xl_info == pg_constants::XLOG_FPI
313 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
314 : // compression of WAL is not yet supported: fall back to storing the original WAL record
315 96 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
316 : // do not materialize null pages because them most likely be soon replaced with real data
317 96 : && blk.bimg_len != 0
318 582568 : }
319 :
320 : /// Encode a list of values and metadata into a serialized batch
321 : ///
322 : /// This is used by the pageserver ingest code to conveniently generate
323 : /// batches for metadata writes.
324 9321945 : pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
325 9321945 : // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
326 9321945 : // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
327 9328365 : let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
328 9321945 : let mut buf = Vec::<u8>::with_capacity(buffer_size);
329 9321945 :
330 9321945 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
331 9321945 : let mut max_lsn: Lsn = Lsn(0);
332 9321945 : let len = batch.len();
333 18650310 : for (key, lsn, val_ser_size, val) in batch {
334 9328365 : let relative_off = buf.len() as u64;
335 9328365 :
336 9328365 : val.ser_into(&mut buf)
337 9328365 : .expect("Writing into in-memory buffer is infallible");
338 9328365 :
339 9328365 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
340 9328365 : key,
341 9328365 : lsn,
342 9328365 : batch_offset: relative_off,
343 9328365 : len: val_ser_size,
344 9328365 : will_init: val.will_init(),
345 9328365 : }));
346 9328365 : max_lsn = std::cmp::max(max_lsn, lsn);
347 9328365 : }
348 :
349 : // Assert that we didn't do any extra allocations while building buffer.
350 9321945 : debug_assert!(buf.len() <= buffer_size);
351 :
352 9321945 : if cfg!(any(debug_assertions, test)) {
353 9321945 : let batch = Self {
354 9321945 : raw: buf,
355 9321945 : metadata,
356 9321945 : max_lsn,
357 9321945 : len,
358 9321945 : };
359 9321945 :
360 9321945 : batch.validate_lsn_order();
361 9321945 :
362 9321945 : return batch;
363 0 : }
364 0 :
365 0 : Self {
366 0 : raw: buf,
367 0 : metadata,
368 0 : max_lsn,
369 0 : len,
370 0 : }
371 9321945 : }
372 :
373 : /// Add one value to the batch
374 : ///
375 : /// This is used by the pageserver ingest code to include metadata block
376 : /// updates for a single key.
377 561734 : pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
378 561734 : let relative_off = self.raw.len() as u64;
379 561734 : value.ser_into(&mut self.raw).unwrap();
380 561734 :
381 561734 : let val_ser_size = self.raw.len() - relative_off as usize;
382 561734 : self.metadata
383 561734 : .push(ValueMeta::Serialized(SerializedValueMeta {
384 561734 : key,
385 561734 : lsn,
386 561734 : batch_offset: relative_off,
387 561734 : len: val_ser_size,
388 561734 : will_init: value.will_init(),
389 561734 : }));
390 561734 :
391 561734 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
392 561734 : self.len += 1;
393 561734 :
394 561734 : if cfg!(any(debug_assertions, test)) {
395 561734 : self.validate_lsn_order();
396 561734 : }
397 561734 : }
398 :
399 : /// Extend with the contents of another batch
400 : ///
401 : /// One batch is generated for each decoded PG WAL record.
402 : /// They are then merged to accumulate reasonably sized writes.
403 529153 : pub fn extend(&mut self, mut other: SerializedValueBatch) {
404 529153 : let extend_batch_start_offset = self.raw.len() as u64;
405 529153 :
406 529153 : self.raw.extend(other.raw);
407 529153 :
408 529153 : // Shift the offsets in the batch we are extending with
409 533015 : other.metadata.iter_mut().for_each(|meta| match meta {
410 533015 : ValueMeta::Serialized(ser) => {
411 533015 : ser.batch_offset += extend_batch_start_offset;
412 533015 : if cfg!(debug_assertions) {
413 533015 : let value_end = ser.batch_offset + ser.len as u64;
414 533015 : assert!((value_end as usize) <= self.raw.len());
415 0 : }
416 : }
417 0 : ValueMeta::Observed(_) => {}
418 533015 : });
419 529153 : self.metadata.extend(other.metadata);
420 529153 :
421 529153 : self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
422 529153 :
423 529153 : self.len += other.len;
424 529153 :
425 529153 : if cfg!(any(debug_assertions, test)) {
426 529153 : self.validate_lsn_order();
427 529153 : }
428 529153 : }
429 :
430 : /// Add zero images for the (key, LSN) tuples specified
431 : ///
432 : /// PG versions below 16 do not zero out pages before extending
433 : /// a relation and may leave gaps. Such gaps need to be identified
434 : /// by the pageserver ingest logic and get patched up here.
435 : ///
436 : /// Note that this function does not validate that the gaps have been
437 : /// identified correctly (it does not know relation sizes), so it's up
438 : /// to the call-site to do it properly.
439 1 : pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
440 1 : // Implementation note:
441 1 : //
442 1 : // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
443 1 : // but the metadata entries should be ordered properly (see
444 1 : // [`SerializedValueBatch::metadata`]).
445 1 : //
446 1 : // Exploiting this observation we do:
447 1 : // 1. Drain all the metadata entries into an ordered set.
448 1 : // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
449 1 : // includes more than one update to the same block in the same WAL record.
450 1 : // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
451 1 : // and add an index entry to the ordered metadata set.
452 1 : // 3. Drain the ordered set back into a metadata vector
453 1 :
454 1 : let mut ordered_metas = self
455 1 : .metadata
456 1 : .drain(..)
457 1 : .map(OrderedValueMeta)
458 1 : .collect::<BTreeSet<_>>();
459 3 : for (keyspace, lsn) in gaps {
460 2 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
461 :
462 5 : for gap_range in keyspace.ranges {
463 3 : let mut key = gap_range.start;
464 13 : while key != gap_range.end {
465 10 : let relative_off = self.raw.len() as u64;
466 10 :
467 10 : // TODO(vlad): Can we be cheeky and write only one zero image, and
468 10 : // make all index entries requiring a zero page point to it?
469 10 : // Alternatively, we can change the index entry format to represent zero pages
470 10 : // without writing them at all.
471 10 : Value::Image(ZERO_PAGE.clone())
472 10 : .ser_into(&mut self.raw)
473 10 : .unwrap();
474 10 : let val_ser_size = self.raw.len() - relative_off as usize;
475 10 :
476 10 : ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
477 10 : SerializedValueMeta {
478 10 : key: key.to_compact(),
479 10 : lsn,
480 10 : batch_offset: relative_off,
481 10 : len: val_ser_size,
482 10 : will_init: true,
483 10 : },
484 10 : )));
485 10 :
486 10 : self.len += 1;
487 10 :
488 10 : key = key.next();
489 10 : }
490 : }
491 : }
492 :
493 19 : self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
494 1 :
495 1 : if cfg!(any(debug_assertions, test)) {
496 1 : self.validate_lsn_order();
497 1 : }
498 1 : }
499 :
500 : /// Checks if the batch contains any serialized or observed values
501 293295 : pub fn is_empty(&self) -> bool {
502 293295 : !self.has_data() && self.metadata.is_empty()
503 293295 : }
504 :
505 : /// Checks if the batch contains only observed values
506 0 : pub fn is_observed(&self) -> bool {
507 0 : !self.has_data() && !self.metadata.is_empty()
508 0 : }
509 :
510 : /// Checks if the batch contains data
511 : ///
512 : /// Note that if this returns false, it may still contain observed values or
513 : /// a metadata record.
514 10193423 : pub fn has_data(&self) -> bool {
515 10193423 : let empty = self.raw.is_empty();
516 10193423 :
517 10193423 : if cfg!(debug_assertions) && empty {
518 293698 : assert!(self
519 293698 : .metadata
520 293698 : .iter()
521 293698 : .all(|meta| matches!(meta, ValueMeta::Observed(_))));
522 9899725 : }
523 :
524 10193423 : !empty
525 10193423 : }
526 :
527 : /// Returns the number of values serialized in the batch
528 291306 : pub fn len(&self) -> usize {
529 291306 : self.len
530 291306 : }
531 :
532 : /// Returns the size of the buffer wrapped by the batch
533 9608470 : pub fn buffer_size(&self) -> usize {
534 9608470 : self.raw.len()
535 9608470 : }
536 :
537 0 : pub fn updates_key(&self, key: &Key) -> bool {
538 0 : self.metadata.iter().any(|meta| match meta {
539 0 : ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
540 0 : ValueMeta::Observed(_) => false,
541 0 : })
542 0 : }
543 :
544 10705338 : pub fn validate_lsn_order(&self) {
545 : use std::collections::HashMap;
546 :
547 10705338 : let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
548 :
549 56800151 : for meta in self.metadata.iter() {
550 56800151 : let lsn = meta.lsn();
551 56800151 : let key = meta.key();
552 :
553 56800151 : if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
554 15 : assert!(
555 15 : lsn >= prev_lsn,
556 0 : "Ordering violated by {}: {} < {}",
557 0 : Key::from_compact(key),
558 : lsn,
559 : prev_lsn
560 : );
561 56800136 : }
562 : }
563 10705338 : }
564 : }
565 :
566 : #[cfg(all(test, feature = "testing"))]
567 : mod tests {
568 : use super::*;
569 :
570 6 : fn validate_batch(
571 6 : batch: &SerializedValueBatch,
572 6 : values: &[(CompactKey, Lsn, usize, Value)],
573 6 : gaps: Option<&Vec<(KeySpace, Lsn)>>,
574 6 : ) {
575 : // Invariant 1: The metadata for a given entry in the batch
576 : // is correct and can be used to deserialize back to the original value.
577 28 : for (key, lsn, size, value) in values.iter() {
578 28 : let meta = batch
579 28 : .metadata
580 28 : .iter()
581 136 : .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
582 28 : .unwrap();
583 28 : let meta = match meta {
584 28 : ValueMeta::Serialized(ser) => ser,
585 0 : ValueMeta::Observed(_) => unreachable!(),
586 : };
587 :
588 28 : assert_eq!(meta.len, *size);
589 28 : assert_eq!(meta.will_init, value.will_init());
590 :
591 28 : let start = meta.batch_offset as usize;
592 28 : let end = meta.batch_offset as usize + meta.len;
593 28 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
594 28 : assert_eq!(&value_from_batch, value);
595 : }
596 :
597 28 : let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
598 6 : let mut gap_pages_count: usize = 0;
599 :
600 : // Invariant 2: Zero pages were added for identified gaps and their metadata
601 : // is correct.
602 6 : if let Some(gaps) = gaps {
603 3 : for (gap_keyspace, lsn) in gaps {
604 5 : for gap_range in &gap_keyspace.ranges {
605 3 : let mut gap_key = gap_range.start;
606 13 : while gap_key != gap_range.end {
607 10 : let meta = batch
608 10 : .metadata
609 10 : .iter()
610 104 : .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
611 10 : .unwrap();
612 10 : let meta = match meta {
613 10 : ValueMeta::Serialized(ser) => ser,
614 0 : ValueMeta::Observed(_) => unreachable!(),
615 : };
616 :
617 10 : let zero_value = Value::Image(ZERO_PAGE.clone());
618 10 : let zero_value_size = zero_value.serialized_size().unwrap() as usize;
619 10 :
620 10 : assert_eq!(meta.len, zero_value_size);
621 10 : assert_eq!(meta.will_init, zero_value.will_init());
622 :
623 10 : let start = meta.batch_offset as usize;
624 10 : let end = meta.batch_offset as usize + meta.len;
625 10 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
626 10 : assert_eq!(value_from_batch, zero_value);
627 :
628 10 : gap_pages_count += 1;
629 10 : expected_buffer_size += zero_value_size;
630 10 : gap_key = gap_key.next();
631 : }
632 : }
633 : }
634 5 : }
635 :
636 : // Invariant 3: The length of the batch is equal to the number
637 : // of values inserted, plus the number of gap pages. This extends
638 : // to the raw buffer size.
639 6 : assert_eq!(batch.len(), values.len() + gap_pages_count);
640 6 : assert_eq!(expected_buffer_size, batch.buffer_size());
641 :
642 : // Invariant 4: Metadata entries for any given key are sorted in LSN order.
643 6 : batch.validate_lsn_order();
644 6 : }
645 :
646 : #[test]
647 1 : fn test_creation_from_values() {
648 : const LSN: Lsn = Lsn(0x10);
649 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
650 1 :
651 1 : let values = vec![
652 1 : (
653 1 : key.to_compact(),
654 1 : LSN,
655 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
656 1 : ),
657 1 : (
658 1 : key.next().to_compact(),
659 1 : LSN,
660 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
661 1 : ),
662 1 : (
663 1 : key.to_compact(),
664 1 : Lsn(LSN.0 + 0x10),
665 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
666 1 : ),
667 1 : (
668 1 : key.next().next().to_compact(),
669 1 : LSN,
670 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
671 1 : ),
672 1 : ];
673 1 :
674 1 : let values = values
675 1 : .into_iter()
676 4 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
677 1 : .collect::<Vec<_>>();
678 1 : let batch = SerializedValueBatch::from_values(values.clone());
679 1 :
680 1 : validate_batch(&batch, &values, None);
681 1 :
682 1 : assert!(!batch.is_empty());
683 1 : }
684 :
685 : #[test]
686 1 : fn test_put() {
687 : const LSN: Lsn = Lsn(0x10);
688 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
689 1 :
690 1 : let values = vec![
691 1 : (
692 1 : key.to_compact(),
693 1 : LSN,
694 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
695 1 : ),
696 1 : (
697 1 : key.next().to_compact(),
698 1 : LSN,
699 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
700 1 : ),
701 1 : ];
702 1 :
703 1 : let mut values = values
704 1 : .into_iter()
705 2 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
706 1 : .collect::<Vec<_>>();
707 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
708 1 :
709 1 : validate_batch(&batch, &values, None);
710 1 :
711 1 : let value = (
712 1 : key.to_compact(),
713 1 : Lsn(LSN.0 + 0x10),
714 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
715 1 : );
716 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
717 1 : let value = (value.0, value.1, serialized_size, value.2);
718 1 : values.push(value.clone());
719 1 : batch.put(value.0, value.3, value.1);
720 1 :
721 1 : validate_batch(&batch, &values, None);
722 1 :
723 1 : let value = (
724 1 : key.next().next().to_compact(),
725 1 : LSN,
726 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
727 1 : );
728 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
729 1 : let value = (value.0, value.1, serialized_size, value.2);
730 1 : values.push(value.clone());
731 1 : batch.put(value.0, value.3, value.1);
732 1 :
733 1 : validate_batch(&batch, &values, None);
734 1 : }
735 :
736 : #[test]
737 1 : fn test_extension() {
738 : const LSN: Lsn = Lsn(0x10);
739 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
740 1 :
741 1 : let values = vec![
742 1 : (
743 1 : key.to_compact(),
744 1 : LSN,
745 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
746 1 : ),
747 1 : (
748 1 : key.next().to_compact(),
749 1 : LSN,
750 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
751 1 : ),
752 1 : (
753 1 : key.next().next().to_compact(),
754 1 : LSN,
755 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
756 1 : ),
757 1 : ];
758 1 :
759 1 : let mut values = values
760 1 : .into_iter()
761 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
762 1 : .collect::<Vec<_>>();
763 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
764 1 :
765 1 : let other_values = vec![
766 1 : (
767 1 : key.to_compact(),
768 1 : Lsn(LSN.0 + 0x10),
769 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
770 1 : ),
771 1 : (
772 1 : key.next().to_compact(),
773 1 : Lsn(LSN.0 + 0x10),
774 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
775 1 : ),
776 1 : (
777 1 : key.next().next().to_compact(),
778 1 : Lsn(LSN.0 + 0x10),
779 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
780 1 : ),
781 1 : ];
782 1 :
783 1 : let other_values = other_values
784 1 : .into_iter()
785 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
786 1 : .collect::<Vec<_>>();
787 1 : let other_batch = SerializedValueBatch::from_values(other_values.clone());
788 1 :
789 1 : values.extend(other_values);
790 1 : batch.extend(other_batch);
791 1 :
792 1 : validate_batch(&batch, &values, None);
793 1 : }
794 :
795 : #[test]
796 1 : fn test_gap_zeroing() {
797 : const LSN: Lsn = Lsn(0x10);
798 1 : let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
799 1 :
800 1 : let rel_bar_base_key = {
801 1 : let mut key = rel_foo_base_key;
802 1 : key.field4 += 1;
803 1 : key
804 1 : };
805 1 :
806 1 : let values = vec![
807 1 : (
808 1 : rel_foo_base_key.to_compact(),
809 1 : LSN,
810 1 : Value::WalRecord(NeonWalRecord::wal_append("foo1")),
811 1 : ),
812 1 : (
813 1 : rel_foo_base_key.add(1).to_compact(),
814 1 : LSN,
815 1 : Value::WalRecord(NeonWalRecord::wal_append("foo2")),
816 1 : ),
817 1 : (
818 1 : rel_foo_base_key.add(5).to_compact(),
819 1 : LSN,
820 1 : Value::WalRecord(NeonWalRecord::wal_append("foo3")),
821 1 : ),
822 1 : (
823 1 : rel_foo_base_key.add(1).to_compact(),
824 1 : Lsn(LSN.0 + 0x10),
825 1 : Value::WalRecord(NeonWalRecord::wal_append("foo4")),
826 1 : ),
827 1 : (
828 1 : rel_foo_base_key.add(10).to_compact(),
829 1 : Lsn(LSN.0 + 0x10),
830 1 : Value::WalRecord(NeonWalRecord::wal_append("foo5")),
831 1 : ),
832 1 : (
833 1 : rel_foo_base_key.add(11).to_compact(),
834 1 : Lsn(LSN.0 + 0x10),
835 1 : Value::WalRecord(NeonWalRecord::wal_append("foo6")),
836 1 : ),
837 1 : (
838 1 : rel_foo_base_key.add(12).to_compact(),
839 1 : Lsn(LSN.0 + 0x10),
840 1 : Value::WalRecord(NeonWalRecord::wal_append("foo7")),
841 1 : ),
842 1 : (
843 1 : rel_bar_base_key.to_compact(),
844 1 : LSN,
845 1 : Value::WalRecord(NeonWalRecord::wal_append("bar1")),
846 1 : ),
847 1 : (
848 1 : rel_bar_base_key.add(4).to_compact(),
849 1 : LSN,
850 1 : Value::WalRecord(NeonWalRecord::wal_append("bar2")),
851 1 : ),
852 1 : ];
853 1 :
854 1 : let values = values
855 1 : .into_iter()
856 9 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
857 1 : .collect::<Vec<_>>();
858 1 :
859 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
860 1 :
861 1 : let gaps = vec![
862 1 : (
863 1 : KeySpace {
864 1 : ranges: vec![
865 1 : rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
866 1 : rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
867 1 : ],
868 1 : },
869 1 : LSN,
870 1 : ),
871 1 : (
872 1 : KeySpace {
873 1 : ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
874 1 : },
875 1 : Lsn(LSN.0 + 0x10),
876 1 : ),
877 1 : ];
878 1 :
879 1 : batch.zero_gaps(gaps.clone());
880 1 : validate_batch(&batch, &values, Some(&gaps));
881 1 : }
882 : }
|