Line data Source code
1 : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
2 : //! instances. Each batch contains a raw buffer (serialized values)
3 : //! and a list of metadata for each (key, LSN) tuple present in the batch.
4 : //!
5 : //! Such batches are created from decoded PG wal records and ingested
6 : //! by the pageserver by writing directly to the ephemeral file.
7 :
8 : use std::collections::BTreeSet;
9 :
10 : use bytes::{Bytes, BytesMut};
11 : use pageserver_api::key::rel_block_to_key;
12 : use pageserver_api::keyspace::KeySpace;
13 : use pageserver_api::record::NeonWalRecord;
14 : use pageserver_api::reltag::RelTag;
15 : use pageserver_api::shard::ShardIdentity;
16 : use pageserver_api::{key::CompactKey, value::Value};
17 : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
18 : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
19 : use utils::bin_ser::BeSer;
20 : use utils::lsn::Lsn;
21 :
22 : use pageserver_api::key::Key;
23 :
24 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
25 :
26 : /// Accompanying metadata for the batch
27 : /// A value may be serialized and stored into the batch or just "observed".
28 : /// Shard 0 currently "observes" all values in order to accurately track
29 : /// relation sizes. In the case of "observed" values, we only need to know
30 : /// the key and LSN, so two types of metadata are supported to save on network
31 : /// bandwidth.
32 : pub enum ValueMeta {
33 : Serialized(SerializedValueMeta),
34 : Observed(ObservedValueMeta),
35 : }
36 :
37 : impl ValueMeta {
38 28545931 : pub fn key(&self) -> CompactKey {
39 28545931 : match self {
40 28545931 : Self::Serialized(ser) => ser.key,
41 0 : Self::Observed(obs) => obs.key,
42 : }
43 28545931 : }
44 :
45 28400289 : pub fn lsn(&self) -> Lsn {
46 28400289 : match self {
47 28400289 : Self::Serialized(ser) => ser.lsn,
48 0 : Self::Observed(obs) => obs.lsn,
49 : }
50 28400289 : }
51 : }
52 :
53 : /// Wrapper around [`ValueMeta`] that implements ordering by
54 : /// (key, LSN) tuples
55 : struct OrderedValueMeta(ValueMeta);
56 :
57 : impl Ord for OrderedValueMeta {
58 59 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
59 59 : (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
60 59 : }
61 : }
62 :
63 : impl PartialOrd for OrderedValueMeta {
64 9 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
65 9 : Some(self.cmp(other))
66 9 : }
67 : }
68 :
69 : impl PartialEq for OrderedValueMeta {
70 8 : fn eq(&self, other: &Self) -> bool {
71 8 : (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
72 8 : }
73 : }
74 :
75 : impl Eq for OrderedValueMeta {}
76 :
77 : /// Metadata for a [`Value`] serialized into the batch.
78 : pub struct SerializedValueMeta {
79 : pub key: CompactKey,
80 : pub lsn: Lsn,
81 : /// Starting offset of the value for the (key, LSN) tuple
82 : /// in [`SerializedValueBatch::raw`]
83 : pub batch_offset: u64,
84 : pub len: usize,
85 : pub will_init: bool,
86 : }
87 :
88 : /// Metadata for a [`Value`] observed by the batch
89 : pub struct ObservedValueMeta {
90 : pub key: CompactKey,
91 : pub lsn: Lsn,
92 : }
93 :
94 : /// Batch of serialized [`Value`]s.
95 : pub struct SerializedValueBatch {
96 : /// [`Value`]s serialized in EphemeralFile's native format,
97 : /// ready for disk write by the pageserver
98 : pub raw: Vec<u8>,
99 :
100 : /// Metadata to make sense of the bytes in [`Self::raw`]
101 : /// and represent "observed" values.
102 : ///
103 : /// Invariant: Metadata entries for any given key are ordered
104 : /// by LSN. Note that entries for a key do not have to be contiguous.
105 : pub metadata: Vec<ValueMeta>,
106 :
107 : /// The highest LSN of any value in the batch
108 : pub max_lsn: Lsn,
109 :
110 : /// Number of values encoded by [`Self::raw`]
111 : pub len: usize,
112 : }
113 :
114 : impl Default for SerializedValueBatch {
115 262188 : fn default() -> Self {
116 262188 : Self {
117 262188 : raw: Default::default(),
118 262188 : metadata: Default::default(),
119 262188 : max_lsn: Lsn(0),
120 262188 : len: 0,
121 262188 : }
122 262188 : }
123 : }
124 :
125 : impl SerializedValueBatch {
126 : /// Build a batch of serialized values from a decoded PG WAL record
127 : ///
128 : /// The batch will only contain values for keys targeting the specifiec
129 : /// shard. Shard 0 is a special case, where any keys that don't belong to
130 : /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
131 : /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
132 145852 : pub(crate) fn from_decoded_filtered(
133 145852 : decoded: DecodedWALRecord,
134 145852 : shard: &ShardIdentity,
135 145852 : record_end_lsn: Lsn,
136 145852 : pg_version: u32,
137 145852 : ) -> anyhow::Result<SerializedValueBatch> {
138 145852 : // First determine how big the buffer needs to be and allocate it up-front.
139 145852 : // This duplicates some of the work below, but it's empirically much faster.
140 145852 : let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
141 145852 : let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
142 145852 :
143 145852 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
144 145852 : let mut max_lsn: Lsn = Lsn(0);
145 145852 : let mut len: usize = 0;
146 145852 : for blk in decoded.blocks.iter() {
147 145642 : let relative_off = buf.len() as u64;
148 145642 :
149 145642 : let rel = RelTag {
150 145642 : spcnode: blk.rnode_spcnode,
151 145642 : dbnode: blk.rnode_dbnode,
152 145642 : relnode: blk.rnode_relnode,
153 145642 : forknum: blk.forknum,
154 145642 : };
155 145642 :
156 145642 : let key = rel_block_to_key(rel, blk.blkno);
157 145642 :
158 145642 : if !key.is_valid_key_on_write_path() {
159 0 : anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key);
160 145642 : }
161 145642 :
162 145642 : let key_is_local = shard.is_key_local(&key);
163 145642 :
164 145642 : tracing::debug!(
165 : lsn=%record_end_lsn,
166 : key=%key,
167 0 : "ingest: shard decision {}",
168 0 : if !key_is_local { "drop" } else { "keep" },
169 : );
170 :
171 145642 : if !key_is_local {
172 0 : if shard.is_shard_zero() {
173 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
174 : // its blkno in case it implicitly extends a relation.
175 0 : metadata.push(ValueMeta::Observed(ObservedValueMeta {
176 0 : key: key.to_compact(),
177 0 : lsn: record_end_lsn,
178 0 : }))
179 0 : }
180 :
181 0 : continue;
182 145642 : }
183 :
184 : // Instead of storing full-page-image WAL record,
185 : // it is better to store extracted image: we can skip wal-redo
186 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
187 : // so them have to be copied multiple times.
188 : //
189 145642 : let val = if Self::block_is_image(&decoded, blk, pg_version) {
190 : // Extract page image from FPI record
191 24 : let img_len = blk.bimg_len as usize;
192 24 : let img_offs = blk.bimg_offset as usize;
193 24 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
194 24 : // TODO(vlad): skip the copy
195 24 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
196 24 :
197 24 : if blk.hole_length != 0 {
198 0 : let tail = image.split_off(blk.hole_offset as usize);
199 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
200 0 : image.unsplit(tail);
201 24 : }
202 : //
203 : // Match the logic of XLogReadBufferForRedoExtended:
204 : // The page may be uninitialized. If so, we can't set the LSN because
205 : // that would corrupt the page.
206 : //
207 24 : if !page_is_new(&image) {
208 18 : page_set_lsn(&mut image, record_end_lsn)
209 6 : }
210 24 : assert_eq!(image.len(), BLCKSZ as usize);
211 :
212 24 : Value::Image(image.freeze())
213 : } else {
214 : Value::WalRecord(NeonWalRecord::Postgres {
215 145618 : will_init: blk.will_init || blk.apply_image,
216 145618 : rec: decoded.record.clone(),
217 : })
218 : };
219 :
220 145642 : val.ser_into(&mut buf)
221 145642 : .expect("Writing into in-memory buffer is infallible");
222 145642 :
223 145642 : let val_ser_size = buf.len() - relative_off as usize;
224 145642 :
225 145642 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
226 145642 : key: key.to_compact(),
227 145642 : lsn: record_end_lsn,
228 145642 : batch_offset: relative_off,
229 145642 : len: val_ser_size,
230 145642 : will_init: val.will_init(),
231 145642 : }));
232 145642 : max_lsn = std::cmp::max(max_lsn, record_end_lsn);
233 145642 : len += 1;
234 : }
235 :
236 145852 : if cfg!(any(debug_assertions, test)) {
237 145852 : let batch = Self {
238 145852 : raw: buf,
239 145852 : metadata,
240 145852 : max_lsn,
241 145852 : len,
242 145852 : };
243 145852 :
244 145852 : batch.validate_lsn_order();
245 145852 :
246 145852 : return Ok(batch);
247 0 : }
248 0 :
249 0 : Ok(Self {
250 0 : raw: buf,
251 0 : metadata,
252 0 : max_lsn,
253 0 : len,
254 0 : })
255 145852 : }
256 :
257 : /// Look into the decoded PG WAL record and determine
258 : /// roughly how large the buffer for serialized values needs to be.
259 145852 : fn estimate_buffer_size(
260 145852 : decoded: &DecodedWALRecord,
261 145852 : shard: &ShardIdentity,
262 145852 : pg_version: u32,
263 145852 : ) -> usize {
264 145852 : let mut estimate: usize = 0;
265 :
266 145852 : for blk in decoded.blocks.iter() {
267 145642 : let rel = RelTag {
268 145642 : spcnode: blk.rnode_spcnode,
269 145642 : dbnode: blk.rnode_dbnode,
270 145642 : relnode: blk.rnode_relnode,
271 145642 : forknum: blk.forknum,
272 145642 : };
273 145642 :
274 145642 : let key = rel_block_to_key(rel, blk.blkno);
275 145642 :
276 145642 : if !shard.is_key_local(&key) {
277 0 : continue;
278 145642 : }
279 145642 :
280 145642 : if Self::block_is_image(decoded, blk, pg_version) {
281 24 : // 4 bytes for the Value::Image discriminator
282 24 : // 8 bytes for encoding the size of the buffer
283 24 : // BLCKSZ for the raw image
284 24 : estimate += (4 + 8 + BLCKSZ) as usize;
285 145618 : } else {
286 145618 : // 4 bytes for the Value::WalRecord discriminator
287 145618 : // 4 bytes for the NeonWalRecord::Postgres discriminator
288 145618 : // 1 bytes for NeonWalRecord::Postgres::will_init
289 145618 : // 8 bytes for encoding the size of the buffer
290 145618 : // length of the raw record
291 145618 : estimate += 8 + 1 + 8 + decoded.record.len();
292 145618 : }
293 : }
294 :
295 145852 : estimate
296 145852 : }
297 :
298 291284 : fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
299 291284 : blk.apply_image
300 120 : && blk.has_image
301 120 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
302 48 : && (decoded.xl_info == pg_constants::XLOG_FPI
303 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
304 : // compression of WAL is not yet supported: fall back to storing the original WAL record
305 48 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
306 : // do not materialize null pages because them most likely be soon replaced with real data
307 48 : && blk.bimg_len != 0
308 291284 : }
309 :
310 : /// Encode a list of values and metadata into a serialized batch
311 : ///
312 : /// This is used by the pageserver ingest code to conveniently generate
313 : /// batches for metadata writes.
314 4660943 : pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
315 4660943 : // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
316 4660943 : // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
317 4663987 : let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
318 4660943 : let mut buf = Vec::<u8>::with_capacity(buffer_size);
319 4660943 :
320 4660943 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
321 4660943 : let mut max_lsn: Lsn = Lsn(0);
322 4660943 : let len = batch.len();
323 9324930 : for (key, lsn, val_ser_size, val) in batch {
324 4663987 : let relative_off = buf.len() as u64;
325 4663987 :
326 4663987 : val.ser_into(&mut buf)
327 4663987 : .expect("Writing into in-memory buffer is infallible");
328 4663987 :
329 4663987 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
330 4663987 : key,
331 4663987 : lsn,
332 4663987 : batch_offset: relative_off,
333 4663987 : len: val_ser_size,
334 4663987 : will_init: val.will_init(),
335 4663987 : }));
336 4663987 : max_lsn = std::cmp::max(max_lsn, lsn);
337 4663987 : }
338 :
339 : // Assert that we didn't do any extra allocations while building buffer.
340 4660943 : debug_assert!(buf.len() <= buffer_size);
341 :
342 4660943 : if cfg!(any(debug_assertions, test)) {
343 4660943 : let batch = Self {
344 4660943 : raw: buf,
345 4660943 : metadata,
346 4660943 : max_lsn,
347 4660943 : len,
348 4660943 : };
349 4660943 :
350 4660943 : batch.validate_lsn_order();
351 4660943 :
352 4660943 : return batch;
353 0 : }
354 0 :
355 0 : Self {
356 0 : raw: buf,
357 0 : metadata,
358 0 : max_lsn,
359 0 : len,
360 0 : }
361 4660943 : }
362 :
363 : /// Add one value to the batch
364 : ///
365 : /// This is used by the pageserver ingest code to include metadata block
366 : /// updates for a single key.
367 280868 : pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
368 280868 : let relative_off = self.raw.len() as u64;
369 280868 : value.ser_into(&mut self.raw).unwrap();
370 280868 :
371 280868 : let val_ser_size = self.raw.len() - relative_off as usize;
372 280868 : self.metadata
373 280868 : .push(ValueMeta::Serialized(SerializedValueMeta {
374 280868 : key,
375 280868 : lsn,
376 280868 : batch_offset: relative_off,
377 280868 : len: val_ser_size,
378 280868 : will_init: value.will_init(),
379 280868 : }));
380 280868 :
381 280868 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
382 280868 : self.len += 1;
383 280868 :
384 280868 : if cfg!(any(debug_assertions, test)) {
385 280868 : self.validate_lsn_order();
386 280868 : }
387 280868 : }
388 :
389 : /// Extend with the contents of another batch
390 : ///
391 : /// One batch is generated for each decoded PG WAL record.
392 : /// They are then merged to accumulate reasonably sized writes.
393 264577 : pub fn extend(&mut self, mut other: SerializedValueBatch) {
394 264577 : let extend_batch_start_offset = self.raw.len() as u64;
395 264577 :
396 264577 : self.raw.extend(other.raw);
397 264577 :
398 264577 : // Shift the offsets in the batch we are extending with
399 266509 : other.metadata.iter_mut().for_each(|meta| match meta {
400 266509 : ValueMeta::Serialized(ser) => {
401 266509 : ser.batch_offset += extend_batch_start_offset;
402 266509 : if cfg!(debug_assertions) {
403 266509 : let value_end = ser.batch_offset + ser.len as u64;
404 266509 : assert!((value_end as usize) <= self.raw.len());
405 0 : }
406 : }
407 0 : ValueMeta::Observed(_) => {}
408 266509 : });
409 264577 : self.metadata.extend(other.metadata);
410 264577 :
411 264577 : self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
412 264577 :
413 264577 : self.len += other.len;
414 264577 :
415 264577 : if cfg!(any(debug_assertions, test)) {
416 264577 : self.validate_lsn_order();
417 264577 : }
418 264577 : }
419 :
420 : /// Add zero images for the (key, LSN) tuples specified
421 : ///
422 : /// PG versions below 16 do not zero out pages before extending
423 : /// a relation and may leave gaps. Such gaps need to be identified
424 : /// by the pageserver ingest logic and get patched up here.
425 : ///
426 : /// Note that this function does not validate that the gaps have been
427 : /// identified correctly (it does not know relation sizes), so it's up
428 : /// to the call-site to do it properly.
429 1 : pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
430 1 : // Implementation note:
431 1 : //
432 1 : // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
433 1 : // but the metadata entries should be ordered properly (see
434 1 : // [`SerializedValueBatch::metadata`]).
435 1 : //
436 1 : // Exploiting this observation we do:
437 1 : // 1. Drain all the metadata entries into an ordered set.
438 1 : // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
439 1 : // includes more than one update to the same block in the same WAL record.
440 1 : // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
441 1 : // and add an index entry to the ordered metadata set.
442 1 : // 3. Drain the ordered set back into a metadata vector
443 1 :
444 1 : let mut ordered_metas = self
445 1 : .metadata
446 1 : .drain(..)
447 1 : .map(OrderedValueMeta)
448 1 : .collect::<BTreeSet<_>>();
449 3 : for (keyspace, lsn) in gaps {
450 2 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
451 :
452 5 : for gap_range in keyspace.ranges {
453 3 : let mut key = gap_range.start;
454 13 : while key != gap_range.end {
455 10 : let relative_off = self.raw.len() as u64;
456 10 :
457 10 : // TODO(vlad): Can we be cheeky and write only one zero image, and
458 10 : // make all index entries requiring a zero page point to it?
459 10 : // Alternatively, we can change the index entry format to represent zero pages
460 10 : // without writing them at all.
461 10 : Value::Image(ZERO_PAGE.clone())
462 10 : .ser_into(&mut self.raw)
463 10 : .unwrap();
464 10 : let val_ser_size = self.raw.len() - relative_off as usize;
465 10 :
466 10 : ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
467 10 : SerializedValueMeta {
468 10 : key: key.to_compact(),
469 10 : lsn,
470 10 : batch_offset: relative_off,
471 10 : len: val_ser_size,
472 10 : will_init: true,
473 10 : },
474 10 : )));
475 10 :
476 10 : self.len += 1;
477 10 :
478 10 : key = key.next();
479 10 : }
480 : }
481 : }
482 :
483 19 : self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
484 1 :
485 1 : if cfg!(any(debug_assertions, test)) {
486 1 : self.validate_lsn_order();
487 1 : }
488 1 : }
489 :
490 : /// Checks if the batch is empty
491 : ///
492 : /// A batch is empty when it contains no serialized values.
493 : /// Note that it may still contain observed values.
494 4950033 : pub fn is_empty(&self) -> bool {
495 4950033 : let empty = self.raw.is_empty();
496 4950033 :
497 4950033 : if cfg!(debug_assertions) && empty {
498 202 : assert!(self
499 202 : .metadata
500 202 : .iter()
501 202 : .all(|meta| matches!(meta, ValueMeta::Observed(_))));
502 4949831 : }
503 :
504 4950033 : empty
505 4950033 : }
506 :
507 : /// Returns the number of values serialized in the batch
508 145656 : pub fn len(&self) -> usize {
509 145656 : self.len
510 145656 : }
511 :
512 : /// Returns the size of the buffer wrapped by the batch
513 4804206 : pub fn buffer_size(&self) -> usize {
514 4804206 : self.raw.len()
515 4804206 : }
516 :
517 0 : pub fn updates_key(&self, key: &Key) -> bool {
518 0 : self.metadata.iter().any(|meta| match meta {
519 0 : ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
520 0 : ValueMeta::Observed(_) => false,
521 0 : })
522 0 : }
523 :
524 5352247 : pub fn validate_lsn_order(&self) {
525 : use std::collections::HashMap;
526 :
527 5352247 : let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
528 :
529 28399915 : for meta in self.metadata.iter() {
530 28399915 : let lsn = meta.lsn();
531 28399915 : let key = meta.key();
532 :
533 28399915 : if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
534 15 : assert!(
535 15 : lsn >= prev_lsn,
536 0 : "Ordering violated by {}: {} < {}",
537 0 : Key::from_compact(key),
538 : lsn,
539 : prev_lsn
540 : );
541 28399900 : }
542 : }
543 5352247 : }
544 : }
545 :
546 : #[cfg(all(test, feature = "testing"))]
547 : mod tests {
548 : use super::*;
549 :
550 6 : fn validate_batch(
551 6 : batch: &SerializedValueBatch,
552 6 : values: &[(CompactKey, Lsn, usize, Value)],
553 6 : gaps: Option<&Vec<(KeySpace, Lsn)>>,
554 6 : ) {
555 : // Invariant 1: The metadata for a given entry in the batch
556 : // is correct and can be used to deserialize back to the original value.
557 28 : for (key, lsn, size, value) in values.iter() {
558 28 : let meta = batch
559 28 : .metadata
560 28 : .iter()
561 136 : .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
562 28 : .unwrap();
563 28 : let meta = match meta {
564 28 : ValueMeta::Serialized(ser) => ser,
565 0 : ValueMeta::Observed(_) => unreachable!(),
566 : };
567 :
568 28 : assert_eq!(meta.len, *size);
569 28 : assert_eq!(meta.will_init, value.will_init());
570 :
571 28 : let start = meta.batch_offset as usize;
572 28 : let end = meta.batch_offset as usize + meta.len;
573 28 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
574 28 : assert_eq!(&value_from_batch, value);
575 : }
576 :
577 28 : let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
578 6 : let mut gap_pages_count: usize = 0;
579 :
580 : // Invariant 2: Zero pages were added for identified gaps and their metadata
581 : // is correct.
582 6 : if let Some(gaps) = gaps {
583 3 : for (gap_keyspace, lsn) in gaps {
584 5 : for gap_range in &gap_keyspace.ranges {
585 3 : let mut gap_key = gap_range.start;
586 13 : while gap_key != gap_range.end {
587 10 : let meta = batch
588 10 : .metadata
589 10 : .iter()
590 104 : .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
591 10 : .unwrap();
592 10 : let meta = match meta {
593 10 : ValueMeta::Serialized(ser) => ser,
594 0 : ValueMeta::Observed(_) => unreachable!(),
595 : };
596 :
597 10 : let zero_value = Value::Image(ZERO_PAGE.clone());
598 10 : let zero_value_size = zero_value.serialized_size().unwrap() as usize;
599 10 :
600 10 : assert_eq!(meta.len, zero_value_size);
601 10 : assert_eq!(meta.will_init, zero_value.will_init());
602 :
603 10 : let start = meta.batch_offset as usize;
604 10 : let end = meta.batch_offset as usize + meta.len;
605 10 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
606 10 : assert_eq!(value_from_batch, zero_value);
607 :
608 10 : gap_pages_count += 1;
609 10 : expected_buffer_size += zero_value_size;
610 10 : gap_key = gap_key.next();
611 : }
612 : }
613 : }
614 5 : }
615 :
616 : // Invariant 3: The length of the batch is equal to the number
617 : // of values inserted, plus the number of gap pages. This extends
618 : // to the raw buffer size.
619 6 : assert_eq!(batch.len(), values.len() + gap_pages_count);
620 6 : assert_eq!(expected_buffer_size, batch.buffer_size());
621 :
622 : // Invariant 4: Metadata entries for any given key are sorted in LSN order.
623 6 : batch.validate_lsn_order();
624 6 : }
625 :
626 : #[test]
627 1 : fn test_creation_from_values() {
628 : const LSN: Lsn = Lsn(0x10);
629 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
630 1 :
631 1 : let values = vec![
632 1 : (
633 1 : key.to_compact(),
634 1 : LSN,
635 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
636 1 : ),
637 1 : (
638 1 : key.next().to_compact(),
639 1 : LSN,
640 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
641 1 : ),
642 1 : (
643 1 : key.to_compact(),
644 1 : Lsn(LSN.0 + 0x10),
645 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
646 1 : ),
647 1 : (
648 1 : key.next().next().to_compact(),
649 1 : LSN,
650 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
651 1 : ),
652 1 : ];
653 1 :
654 1 : let values = values
655 1 : .into_iter()
656 4 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
657 1 : .collect::<Vec<_>>();
658 1 : let batch = SerializedValueBatch::from_values(values.clone());
659 1 :
660 1 : validate_batch(&batch, &values, None);
661 1 :
662 1 : assert!(!batch.is_empty());
663 1 : }
664 :
665 : #[test]
666 1 : fn test_put() {
667 : const LSN: Lsn = Lsn(0x10);
668 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
669 1 :
670 1 : let values = vec![
671 1 : (
672 1 : key.to_compact(),
673 1 : LSN,
674 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
675 1 : ),
676 1 : (
677 1 : key.next().to_compact(),
678 1 : LSN,
679 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
680 1 : ),
681 1 : ];
682 1 :
683 1 : let mut values = values
684 1 : .into_iter()
685 2 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
686 1 : .collect::<Vec<_>>();
687 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
688 1 :
689 1 : validate_batch(&batch, &values, None);
690 1 :
691 1 : let value = (
692 1 : key.to_compact(),
693 1 : Lsn(LSN.0 + 0x10),
694 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
695 1 : );
696 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
697 1 : let value = (value.0, value.1, serialized_size, value.2);
698 1 : values.push(value.clone());
699 1 : batch.put(value.0, value.3, value.1);
700 1 :
701 1 : validate_batch(&batch, &values, None);
702 1 :
703 1 : let value = (
704 1 : key.next().next().to_compact(),
705 1 : LSN,
706 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
707 1 : );
708 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
709 1 : let value = (value.0, value.1, serialized_size, value.2);
710 1 : values.push(value.clone());
711 1 : batch.put(value.0, value.3, value.1);
712 1 :
713 1 : validate_batch(&batch, &values, None);
714 1 : }
715 :
716 : #[test]
717 1 : fn test_extension() {
718 : const LSN: Lsn = Lsn(0x10);
719 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
720 1 :
721 1 : let values = vec![
722 1 : (
723 1 : key.to_compact(),
724 1 : LSN,
725 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
726 1 : ),
727 1 : (
728 1 : key.next().to_compact(),
729 1 : LSN,
730 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
731 1 : ),
732 1 : (
733 1 : key.next().next().to_compact(),
734 1 : LSN,
735 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
736 1 : ),
737 1 : ];
738 1 :
739 1 : let mut values = values
740 1 : .into_iter()
741 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
742 1 : .collect::<Vec<_>>();
743 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
744 1 :
745 1 : let other_values = vec![
746 1 : (
747 1 : key.to_compact(),
748 1 : Lsn(LSN.0 + 0x10),
749 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
750 1 : ),
751 1 : (
752 1 : key.next().to_compact(),
753 1 : Lsn(LSN.0 + 0x10),
754 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
755 1 : ),
756 1 : (
757 1 : key.next().next().to_compact(),
758 1 : Lsn(LSN.0 + 0x10),
759 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
760 1 : ),
761 1 : ];
762 1 :
763 1 : let other_values = other_values
764 1 : .into_iter()
765 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
766 1 : .collect::<Vec<_>>();
767 1 : let other_batch = SerializedValueBatch::from_values(other_values.clone());
768 1 :
769 1 : values.extend(other_values);
770 1 : batch.extend(other_batch);
771 1 :
772 1 : validate_batch(&batch, &values, None);
773 1 : }
774 :
775 : #[test]
776 1 : fn test_gap_zeroing() {
777 : const LSN: Lsn = Lsn(0x10);
778 1 : let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
779 1 :
780 1 : let rel_bar_base_key = {
781 1 : let mut key = rel_foo_base_key;
782 1 : key.field4 += 1;
783 1 : key
784 1 : };
785 1 :
786 1 : let values = vec![
787 1 : (
788 1 : rel_foo_base_key.to_compact(),
789 1 : LSN,
790 1 : Value::WalRecord(NeonWalRecord::wal_append("foo1")),
791 1 : ),
792 1 : (
793 1 : rel_foo_base_key.add(1).to_compact(),
794 1 : LSN,
795 1 : Value::WalRecord(NeonWalRecord::wal_append("foo2")),
796 1 : ),
797 1 : (
798 1 : rel_foo_base_key.add(5).to_compact(),
799 1 : LSN,
800 1 : Value::WalRecord(NeonWalRecord::wal_append("foo3")),
801 1 : ),
802 1 : (
803 1 : rel_foo_base_key.add(1).to_compact(),
804 1 : Lsn(LSN.0 + 0x10),
805 1 : Value::WalRecord(NeonWalRecord::wal_append("foo4")),
806 1 : ),
807 1 : (
808 1 : rel_foo_base_key.add(10).to_compact(),
809 1 : Lsn(LSN.0 + 0x10),
810 1 : Value::WalRecord(NeonWalRecord::wal_append("foo5")),
811 1 : ),
812 1 : (
813 1 : rel_foo_base_key.add(11).to_compact(),
814 1 : Lsn(LSN.0 + 0x10),
815 1 : Value::WalRecord(NeonWalRecord::wal_append("foo6")),
816 1 : ),
817 1 : (
818 1 : rel_foo_base_key.add(12).to_compact(),
819 1 : Lsn(LSN.0 + 0x10),
820 1 : Value::WalRecord(NeonWalRecord::wal_append("foo7")),
821 1 : ),
822 1 : (
823 1 : rel_bar_base_key.to_compact(),
824 1 : LSN,
825 1 : Value::WalRecord(NeonWalRecord::wal_append("bar1")),
826 1 : ),
827 1 : (
828 1 : rel_bar_base_key.add(4).to_compact(),
829 1 : LSN,
830 1 : Value::WalRecord(NeonWalRecord::wal_append("bar2")),
831 1 : ),
832 1 : ];
833 1 :
834 1 : let values = values
835 1 : .into_iter()
836 9 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
837 1 : .collect::<Vec<_>>();
838 1 :
839 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
840 1 :
841 1 : let gaps = vec![
842 1 : (
843 1 : KeySpace {
844 1 : ranges: vec![
845 1 : rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
846 1 : rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
847 1 : ],
848 1 : },
849 1 : LSN,
850 1 : ),
851 1 : (
852 1 : KeySpace {
853 1 : ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
854 1 : },
855 1 : Lsn(LSN.0 + 0x10),
856 1 : ),
857 1 : ];
858 1 :
859 1 : batch.zero_gaps(gaps.clone());
860 1 : validate_batch(&batch, &values, Some(&gaps));
861 1 : }
862 : }
|