Line data Source code
1 : //! This module implements batch type for serialized [`crate::models::value::Value`]
2 : //! instances. Each batch contains a raw buffer (serialized values)
3 : //! and a list of metadata for each (key, LSN) tuple present in the batch.
4 : //!
5 : //! Such batches are created from decoded PG wal records and ingested
6 : //! by the pageserver by writing directly to the ephemeral file.
7 :
8 : use std::collections::{BTreeSet, HashMap};
9 :
10 : use bytes::{Bytes, BytesMut};
11 : use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
12 : use pageserver_api::keyspace::KeySpace;
13 : use pageserver_api::reltag::RelTag;
14 : use pageserver_api::shard::ShardIdentity;
15 : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
16 : use postgres_ffi::{BLCKSZ, PgMajorVersion, page_is_new, page_set_lsn, pg_constants};
17 : use serde::{Deserialize, Serialize};
18 : use utils::bin_ser::BeSer;
19 : use utils::lsn::Lsn;
20 :
21 : use crate::models::InterpretedWalRecord;
22 : use crate::models::record::NeonWalRecord;
23 : use crate::models::value::Value;
24 :
25 : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
26 :
27 : /// Accompanying metadata for the batch
28 : /// A value may be serialized and stored into the batch or just "observed".
29 : /// Shard 0 currently "observes" all values in order to accurately track
30 : /// relation sizes. In the case of "observed" values, we only need to know
31 : /// the key and LSN, so two types of metadata are supported to save on network
32 : /// bandwidth.
33 0 : #[derive(Serialize, Deserialize, Clone)]
34 : pub enum ValueMeta {
35 : Serialized(SerializedValueMeta),
36 : Observed(ObservedValueMeta),
37 : }
38 :
39 : impl ValueMeta {
40 14293182 : pub fn key(&self) -> CompactKey {
41 14293182 : match self {
42 14293182 : Self::Serialized(ser) => ser.key,
43 0 : Self::Observed(obs) => obs.key,
44 : }
45 14293182 : }
46 :
47 14220361 : pub fn lsn(&self) -> Lsn {
48 14220361 : match self {
49 14220361 : Self::Serialized(ser) => ser.lsn,
50 0 : Self::Observed(obs) => obs.lsn,
51 : }
52 14220361 : }
53 : }
54 :
55 : /// Wrapper around [`ValueMeta`] that implements ordering by
56 : /// (key, LSN) tuples
57 : struct OrderedValueMeta(ValueMeta);
58 :
59 : impl Ord for OrderedValueMeta {
60 59 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 59 : (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
62 59 : }
63 : }
64 :
65 : impl PartialOrd for OrderedValueMeta {
66 9 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
67 9 : Some(self.cmp(other))
68 9 : }
69 : }
70 :
71 : impl PartialEq for OrderedValueMeta {
72 8 : fn eq(&self, other: &Self) -> bool {
73 8 : (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
74 8 : }
75 : }
76 :
77 : impl Eq for OrderedValueMeta {}
78 :
79 : /// Metadata for a [`Value`] serialized into the batch.
80 0 : #[derive(Serialize, Deserialize, Clone)]
81 : pub struct SerializedValueMeta {
82 : pub key: CompactKey,
83 : pub lsn: Lsn,
84 : /// Starting offset of the value for the (key, LSN) tuple
85 : /// in [`SerializedValueBatch::raw`]
86 : pub batch_offset: u64,
87 : pub len: usize,
88 : pub will_init: bool,
89 : }
90 :
91 : /// Metadata for a [`Value`] observed by the batch
92 0 : #[derive(Serialize, Deserialize, Clone)]
93 : pub struct ObservedValueMeta {
94 : pub key: CompactKey,
95 : pub lsn: Lsn,
96 : }
97 :
98 : /// Batch of serialized [`Value`]s.
99 0 : #[derive(Serialize, Deserialize, Clone)]
100 : pub struct SerializedValueBatch {
101 : /// [`Value`]s serialized in EphemeralFile's native format,
102 : /// ready for disk write by the pageserver
103 : pub raw: Vec<u8>,
104 :
105 : /// Metadata to make sense of the bytes in [`Self::raw`]
106 : /// and represent "observed" values.
107 : ///
108 : /// Invariant: Metadata entries for any given key are ordered
109 : /// by LSN. Note that entries for a key do not have to be contiguous.
110 : pub metadata: Vec<ValueMeta>,
111 :
112 : /// The highest LSN of any value in the batch
113 : pub max_lsn: Lsn,
114 :
115 : /// Number of values encoded by [`Self::raw`]
116 : pub len: usize,
117 : }
118 :
119 : impl Default for SerializedValueBatch {
120 204826 : fn default() -> Self {
121 204826 : Self {
122 204826 : raw: Default::default(),
123 204826 : metadata: Default::default(),
124 204826 : max_lsn: Lsn(0),
125 204826 : len: 0,
126 204826 : }
127 204826 : }
128 : }
129 :
130 : impl SerializedValueBatch {
131 : /// Populates the given `shard_records` with value batches from this WAL record, if any,
132 : /// discarding those belonging to other shards.
133 : ///
134 : /// The batch will only contain values for keys targeting the specifiec
135 : /// shard. Shard 0 is a special case, where any keys that don't belong to
136 : /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
137 : /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
138 73532 : pub(crate) fn from_decoded_filtered(
139 73532 : decoded: DecodedWALRecord,
140 73532 : shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
141 73532 : next_record_lsn: Lsn,
142 73532 : pg_version: PgMajorVersion,
143 73532 : ) -> anyhow::Result<()> {
144 : // First determine how big the buffers need to be and allocate it up-front.
145 : // This duplicates some of the work below, but it's empirically much faster.
146 73732 : for (shard, record) in shard_records.iter_mut() {
147 73732 : assert!(record.batch.is_empty());
148 :
149 73732 : let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
150 73732 : record.batch.raw = Vec::with_capacity(estimate);
151 : }
152 :
153 73532 : for blk in decoded.blocks.iter() {
154 72821 : let rel = RelTag {
155 72821 : spcnode: blk.rnode_spcnode,
156 72821 : dbnode: blk.rnode_dbnode,
157 72821 : relnode: blk.rnode_relnode,
158 72821 : forknum: blk.forknum,
159 72821 : };
160 :
161 72821 : let key = rel_block_to_key(rel, blk.blkno);
162 :
163 72821 : if !key.is_valid_key_on_write_path() {
164 0 : anyhow::bail!(
165 0 : "Unsupported key decoded at LSN {}: {}",
166 : next_record_lsn,
167 : key
168 : );
169 72821 : }
170 :
171 72821 : for (shard, record) in shard_records.iter_mut() {
172 72821 : let key_is_local = shard.is_key_local(&key);
173 :
174 72821 : tracing::debug!(
175 : lsn=%next_record_lsn,
176 : key=%key,
177 0 : "ingest: shard decision {}",
178 0 : if !key_is_local { "drop" } else { "keep" },
179 : );
180 :
181 72821 : if !key_is_local {
182 0 : if shard.is_shard_zero() {
183 : // Shard 0 tracks relation sizes. Although we will not store this block, we will observe
184 : // its blkno in case it implicitly extends a relation.
185 0 : record
186 0 : .batch
187 0 : .metadata
188 0 : .push(ValueMeta::Observed(ObservedValueMeta {
189 0 : key: key.to_compact(),
190 0 : lsn: next_record_lsn,
191 0 : }))
192 0 : }
193 :
194 0 : continue;
195 72821 : }
196 :
197 : // Instead of storing full-page-image WAL record,
198 : // it is better to store extracted image: we can skip wal-redo
199 : // in this case. Also some FPI records may contain multiple (up to 32) pages,
200 : // so them have to be copied multiple times.
201 : //
202 72821 : let val = if Self::block_is_image(&decoded, blk, pg_version) {
203 : // Extract page image from FPI record
204 12 : let img_len = blk.bimg_len as usize;
205 12 : let img_offs = blk.bimg_offset as usize;
206 12 : let mut image = BytesMut::with_capacity(BLCKSZ as usize);
207 : // TODO(vlad): skip the copy
208 12 : image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
209 :
210 12 : if blk.hole_length != 0 {
211 0 : let tail = image.split_off(blk.hole_offset as usize);
212 0 : image.resize(image.len() + blk.hole_length as usize, 0u8);
213 0 : image.unsplit(tail);
214 12 : }
215 : //
216 : // Match the logic of XLogReadBufferForRedoExtended:
217 : // The page may be uninitialized. If so, we can't set the LSN because
218 : // that would corrupt the page.
219 : //
220 12 : if !page_is_new(&image) {
221 9 : page_set_lsn(&mut image, next_record_lsn)
222 3 : }
223 12 : assert_eq!(image.len(), BLCKSZ as usize);
224 :
225 12 : Value::Image(image.freeze())
226 : } else {
227 : Value::WalRecord(NeonWalRecord::Postgres {
228 72809 : will_init: blk.will_init || blk.apply_image,
229 72809 : rec: decoded.record.clone(),
230 : })
231 : };
232 :
233 72821 : let relative_off = record.batch.raw.len() as u64;
234 :
235 72821 : val.ser_into(&mut record.batch.raw)
236 72821 : .expect("Writing into in-memory buffer is infallible");
237 :
238 72821 : let val_ser_size = record.batch.raw.len() - relative_off as usize;
239 :
240 72821 : record
241 72821 : .batch
242 72821 : .metadata
243 72821 : .push(ValueMeta::Serialized(SerializedValueMeta {
244 72821 : key: key.to_compact(),
245 72821 : lsn: next_record_lsn,
246 72821 : batch_offset: relative_off,
247 72821 : len: val_ser_size,
248 72821 : will_init: val.will_init(),
249 72821 : }));
250 72821 : record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
251 72821 : record.batch.len += 1;
252 : }
253 : }
254 :
255 73532 : if cfg!(any(debug_assertions, test)) {
256 : // Validate that the batches are correct
257 73732 : for record in shard_records.values() {
258 73732 : record.batch.validate_lsn_order();
259 73732 : }
260 0 : }
261 :
262 73532 : Ok(())
263 73532 : }
264 :
265 : /// Look into the decoded PG WAL record and determine
266 : /// roughly how large the buffer for serialized values needs to be.
267 73732 : fn estimate_buffer_size(
268 73732 : decoded: &DecodedWALRecord,
269 73732 : shard: &ShardIdentity,
270 73732 : pg_version: PgMajorVersion,
271 73732 : ) -> usize {
272 73732 : let mut estimate: usize = 0;
273 :
274 73732 : for blk in decoded.blocks.iter() {
275 72821 : let rel = RelTag {
276 72821 : spcnode: blk.rnode_spcnode,
277 72821 : dbnode: blk.rnode_dbnode,
278 72821 : relnode: blk.rnode_relnode,
279 72821 : forknum: blk.forknum,
280 72821 : };
281 :
282 72821 : let key = rel_block_to_key(rel, blk.blkno);
283 :
284 72821 : if !shard.is_key_local(&key) {
285 0 : continue;
286 72821 : }
287 :
288 72821 : if Self::block_is_image(decoded, blk, pg_version) {
289 12 : // 4 bytes for the Value::Image discriminator
290 12 : // 8 bytes for encoding the size of the buffer
291 12 : // BLCKSZ for the raw image
292 12 : estimate += (4 + 8 + BLCKSZ) as usize;
293 72809 : } else {
294 72809 : // 4 bytes for the Value::WalRecord discriminator
295 72809 : // 4 bytes for the NeonWalRecord::Postgres discriminator
296 72809 : // 1 bytes for NeonWalRecord::Postgres::will_init
297 72809 : // 8 bytes for encoding the size of the buffer
298 72809 : // length of the raw record
299 72809 : estimate += 8 + 1 + 8 + decoded.record.len();
300 72809 : }
301 : }
302 :
303 73732 : estimate
304 73732 : }
305 :
306 145642 : fn block_is_image(
307 145642 : decoded: &DecodedWALRecord,
308 145642 : blk: &DecodedBkpBlock,
309 145642 : pg_version: PgMajorVersion,
310 145642 : ) -> bool {
311 145642 : blk.apply_image
312 60 : && blk.has_image
313 60 : && decoded.xl_rmid == pg_constants::RM_XLOG_ID
314 24 : && (decoded.xl_info == pg_constants::XLOG_FPI
315 0 : || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
316 : // compression of WAL is not yet supported: fall back to storing the original WAL record
317 24 : && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
318 : // do not materialize null pages because them most likely be soon replaced with real data
319 24 : && blk.bimg_len != 0
320 145642 : }
321 :
322 : /// Encode a list of values and metadata into a serialized batch
323 : ///
324 : /// This is used by the pageserver ingest code to conveniently generate
325 : /// batches for metadata writes.
326 2330506 : pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
327 : // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
328 : // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
329 2330506 : let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
330 2330506 : let mut buf = Vec::<u8>::with_capacity(buffer_size);
331 :
332 2330506 : let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
333 2330506 : let mut max_lsn: Lsn = Lsn(0);
334 2330506 : let len = batch.len();
335 4682494 : for (key, lsn, val_ser_size, val) in batch {
336 2351988 : let relative_off = buf.len() as u64;
337 2351988 :
338 2351988 : val.ser_into(&mut buf)
339 2351988 : .expect("Writing into in-memory buffer is infallible");
340 2351988 :
341 2351988 : metadata.push(ValueMeta::Serialized(SerializedValueMeta {
342 2351988 : key,
343 2351988 : lsn,
344 2351988 : batch_offset: relative_off,
345 2351988 : len: val_ser_size,
346 2351988 : will_init: val.will_init(),
347 2351988 : }));
348 2351988 : max_lsn = std::cmp::max(max_lsn, lsn);
349 2351988 : }
350 :
351 : // Assert that we didn't do any extra allocations while building buffer.
352 2330506 : debug_assert!(buf.len() <= buffer_size);
353 :
354 2330506 : if cfg!(any(debug_assertions, test)) {
355 2330506 : let batch = Self {
356 2330506 : raw: buf,
357 2330506 : metadata,
358 2330506 : max_lsn,
359 2330506 : len,
360 2330506 : };
361 :
362 2330506 : batch.validate_lsn_order();
363 :
364 2330506 : return batch;
365 0 : }
366 :
367 0 : Self {
368 0 : raw: buf,
369 0 : metadata,
370 0 : max_lsn,
371 0 : len,
372 0 : }
373 2330506 : }
374 :
375 : /// Add one value to the batch
376 : ///
377 : /// This is used by the pageserver ingest code to include metadata block
378 : /// updates for a single key.
379 140435 : pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
380 140435 : let relative_off = self.raw.len() as u64;
381 140435 : value.ser_into(&mut self.raw).unwrap();
382 :
383 140435 : let val_ser_size = self.raw.len() - relative_off as usize;
384 140435 : self.metadata
385 140435 : .push(ValueMeta::Serialized(SerializedValueMeta {
386 140435 : key,
387 140435 : lsn,
388 140435 : batch_offset: relative_off,
389 140435 : len: val_ser_size,
390 140435 : will_init: value.will_init(),
391 140435 : }));
392 :
393 140435 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
394 140435 : self.len += 1;
395 :
396 140435 : if cfg!(any(debug_assertions, test)) {
397 140435 : self.validate_lsn_order();
398 140435 : }
399 140435 : }
400 :
401 : /// Extend with the contents of another batch
402 : ///
403 : /// One batch is generated for each decoded PG WAL record.
404 : /// They are then merged to accumulate reasonably sized writes.
405 132289 : pub fn extend(&mut self, mut other: SerializedValueBatch) {
406 132289 : let extend_batch_start_offset = self.raw.len() as u64;
407 :
408 132289 : self.raw.extend(other.raw);
409 :
410 : // Shift the offsets in the batch we are extending with
411 133256 : other.metadata.iter_mut().for_each(|meta| match meta {
412 133256 : ValueMeta::Serialized(ser) => {
413 133256 : ser.batch_offset += extend_batch_start_offset;
414 133256 : if cfg!(debug_assertions) {
415 133256 : let value_end = ser.batch_offset + ser.len as u64;
416 133256 : assert!((value_end as usize) <= self.raw.len());
417 0 : }
418 : }
419 0 : ValueMeta::Observed(_) => {}
420 133256 : });
421 132289 : self.metadata.extend(other.metadata);
422 :
423 132289 : self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
424 :
425 132289 : self.len += other.len;
426 :
427 132289 : if cfg!(any(debug_assertions, test)) {
428 132289 : self.validate_lsn_order();
429 132289 : }
430 132289 : }
431 :
432 : /// Add zero images for the (key, LSN) tuples specified
433 : ///
434 : /// PG versions below 16 do not zero out pages before extending
435 : /// a relation and may leave gaps. Such gaps need to be identified
436 : /// by the pageserver ingest logic and get patched up here.
437 : ///
438 : /// Note that this function does not validate that the gaps have been
439 : /// identified correctly (it does not know relation sizes), so it's up
440 : /// to the call-site to do it properly.
441 1 : pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
442 : // Implementation note:
443 : //
444 : // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
445 : // but the metadata entries should be ordered properly (see
446 : // [`SerializedValueBatch::metadata`]).
447 : //
448 : // Exploiting this observation we do:
449 : // 1. Drain all the metadata entries into an ordered set.
450 : // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
451 : // includes more than one update to the same block in the same WAL record.
452 : // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
453 : // and add an index entry to the ordered metadata set.
454 : // 3. Drain the ordered set back into a metadata vector
455 :
456 1 : let mut ordered_metas = self
457 1 : .metadata
458 1 : .drain(..)
459 1 : .map(OrderedValueMeta)
460 1 : .collect::<BTreeSet<_>>();
461 3 : for (keyspace, lsn) in gaps {
462 2 : self.max_lsn = std::cmp::max(self.max_lsn, lsn);
463 :
464 5 : for gap_range in keyspace.ranges {
465 3 : let mut key = gap_range.start;
466 13 : while key != gap_range.end {
467 10 : let relative_off = self.raw.len() as u64;
468 10 :
469 10 : // TODO(vlad): Can we be cheeky and write only one zero image, and
470 10 : // make all index entries requiring a zero page point to it?
471 10 : // Alternatively, we can change the index entry format to represent zero pages
472 10 : // without writing them at all.
473 10 : Value::Image(ZERO_PAGE.clone())
474 10 : .ser_into(&mut self.raw)
475 10 : .unwrap();
476 10 : let val_ser_size = self.raw.len() - relative_off as usize;
477 10 :
478 10 : ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
479 10 : SerializedValueMeta {
480 10 : key: key.to_compact(),
481 10 : lsn,
482 10 : batch_offset: relative_off,
483 10 : len: val_ser_size,
484 10 : will_init: true,
485 10 : },
486 10 : )));
487 10 :
488 10 : self.len += 1;
489 10 :
490 10 : key = key.next();
491 10 : }
492 : }
493 : }
494 :
495 1 : self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
496 :
497 1 : if cfg!(any(debug_assertions, test)) {
498 1 : self.validate_lsn_order();
499 1 : }
500 1 : }
501 :
502 : /// Checks if the batch contains any serialized or observed values
503 73933 : pub fn is_empty(&self) -> bool {
504 73933 : !self.has_data() && self.metadata.is_empty()
505 73933 : }
506 :
507 : /// Checks if the batch contains only observed values
508 0 : pub fn is_observed(&self) -> bool {
509 0 : !self.has_data() && !self.metadata.is_empty()
510 0 : }
511 :
512 : /// Checks if the batch contains data
513 : ///
514 : /// Note that if this returns false, it may still contain observed values or
515 : /// a metadata record.
516 2548977 : pub fn has_data(&self) -> bool {
517 2548977 : let empty = self.raw.is_empty();
518 :
519 2548977 : if cfg!(debug_assertions) && empty {
520 74033 : assert!(
521 74033 : self.metadata
522 74033 : .iter()
523 74033 : .all(|meta| matches!(meta, ValueMeta::Observed(_)))
524 : );
525 2474944 : }
526 :
527 2548977 : !empty
528 2548977 : }
529 :
530 : /// Returns the number of values serialized in the batch
531 72831 : pub fn len(&self) -> usize {
532 72831 : self.len
533 72831 : }
534 :
535 : /// Returns the size of the buffer wrapped by the batch
536 2402134 : pub fn buffer_size(&self) -> usize {
537 2402134 : self.raw.len()
538 2402134 : }
539 :
540 0 : pub fn updates_key(&self, key: &Key) -> bool {
541 0 : self.metadata.iter().any(|meta| match meta {
542 0 : ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
543 0 : ValueMeta::Observed(_) => false,
544 0 : })
545 0 : }
546 :
547 2676969 : pub fn validate_lsn_order(&self) {
548 : use std::collections::HashMap;
549 :
550 2676969 : let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
551 :
552 14219987 : for meta in self.metadata.iter() {
553 14219987 : let lsn = meta.lsn();
554 14219987 : let key = meta.key();
555 :
556 14219987 : if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
557 19617 : assert!(
558 19617 : lsn >= prev_lsn,
559 0 : "Ordering violated by {}: {} < {}",
560 0 : Key::from_compact(key),
561 : lsn,
562 : prev_lsn
563 : );
564 14200370 : }
565 : }
566 2676969 : }
567 : }
568 :
569 : #[cfg(all(test, feature = "testing"))]
570 : mod tests {
571 : use super::*;
572 :
573 6 : fn validate_batch(
574 6 : batch: &SerializedValueBatch,
575 6 : values: &[(CompactKey, Lsn, usize, Value)],
576 6 : gaps: Option<&Vec<(KeySpace, Lsn)>>,
577 6 : ) {
578 : // Invariant 1: The metadata for a given entry in the batch
579 : // is correct and can be used to deserialize back to the original value.
580 28 : for (key, lsn, size, value) in values.iter() {
581 28 : let meta = batch
582 28 : .metadata
583 28 : .iter()
584 136 : .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
585 28 : .unwrap();
586 28 : let meta = match meta {
587 28 : ValueMeta::Serialized(ser) => ser,
588 0 : ValueMeta::Observed(_) => unreachable!(),
589 : };
590 :
591 28 : assert_eq!(meta.len, *size);
592 28 : assert_eq!(meta.will_init, value.will_init());
593 :
594 28 : let start = meta.batch_offset as usize;
595 28 : let end = meta.batch_offset as usize + meta.len;
596 28 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
597 28 : assert_eq!(&value_from_batch, value);
598 : }
599 :
600 6 : let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
601 6 : let mut gap_pages_count: usize = 0;
602 :
603 : // Invariant 2: Zero pages were added for identified gaps and their metadata
604 : // is correct.
605 6 : if let Some(gaps) = gaps {
606 3 : for (gap_keyspace, lsn) in gaps {
607 5 : for gap_range in &gap_keyspace.ranges {
608 3 : let mut gap_key = gap_range.start;
609 13 : while gap_key != gap_range.end {
610 10 : let meta = batch
611 10 : .metadata
612 10 : .iter()
613 104 : .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
614 10 : .unwrap();
615 10 : let meta = match meta {
616 10 : ValueMeta::Serialized(ser) => ser,
617 0 : ValueMeta::Observed(_) => unreachable!(),
618 : };
619 :
620 10 : let zero_value = Value::Image(ZERO_PAGE.clone());
621 10 : let zero_value_size = zero_value.serialized_size().unwrap() as usize;
622 :
623 10 : assert_eq!(meta.len, zero_value_size);
624 10 : assert_eq!(meta.will_init, zero_value.will_init());
625 :
626 10 : let start = meta.batch_offset as usize;
627 10 : let end = meta.batch_offset as usize + meta.len;
628 10 : let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
629 10 : assert_eq!(value_from_batch, zero_value);
630 :
631 10 : gap_pages_count += 1;
632 10 : expected_buffer_size += zero_value_size;
633 10 : gap_key = gap_key.next();
634 : }
635 : }
636 : }
637 5 : }
638 :
639 : // Invariant 3: The length of the batch is equal to the number
640 : // of values inserted, plus the number of gap pages. This extends
641 : // to the raw buffer size.
642 6 : assert_eq!(batch.len(), values.len() + gap_pages_count);
643 6 : assert_eq!(expected_buffer_size, batch.buffer_size());
644 :
645 : // Invariant 4: Metadata entries for any given key are sorted in LSN order.
646 6 : batch.validate_lsn_order();
647 6 : }
648 :
649 : #[test]
650 1 : fn test_creation_from_values() {
651 : const LSN: Lsn = Lsn(0x10);
652 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
653 :
654 1 : let values = vec![
655 1 : (
656 1 : key.to_compact(),
657 1 : LSN,
658 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
659 1 : ),
660 1 : (
661 1 : key.next().to_compact(),
662 1 : LSN,
663 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
664 1 : ),
665 1 : (
666 1 : key.to_compact(),
667 1 : Lsn(LSN.0 + 0x10),
668 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
669 1 : ),
670 1 : (
671 1 : key.next().next().to_compact(),
672 1 : LSN,
673 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
674 1 : ),
675 : ];
676 :
677 1 : let values = values
678 1 : .into_iter()
679 4 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
680 1 : .collect::<Vec<_>>();
681 1 : let batch = SerializedValueBatch::from_values(values.clone());
682 :
683 1 : validate_batch(&batch, &values, None);
684 :
685 1 : assert!(!batch.is_empty());
686 1 : }
687 :
688 : #[test]
689 1 : fn test_put() {
690 : const LSN: Lsn = Lsn(0x10);
691 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
692 :
693 1 : let values = vec![
694 1 : (
695 1 : key.to_compact(),
696 1 : LSN,
697 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
698 1 : ),
699 1 : (
700 1 : key.next().to_compact(),
701 1 : LSN,
702 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
703 1 : ),
704 : ];
705 :
706 1 : let mut values = values
707 1 : .into_iter()
708 2 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
709 1 : .collect::<Vec<_>>();
710 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
711 :
712 1 : validate_batch(&batch, &values, None);
713 :
714 1 : let value = (
715 1 : key.to_compact(),
716 1 : Lsn(LSN.0 + 0x10),
717 1 : Value::WalRecord(NeonWalRecord::wal_append("baz")),
718 1 : );
719 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
720 1 : let value = (value.0, value.1, serialized_size, value.2);
721 1 : values.push(value.clone());
722 1 : batch.put(value.0, value.3, value.1);
723 :
724 1 : validate_batch(&batch, &values, None);
725 :
726 1 : let value = (
727 1 : key.next().next().to_compact(),
728 1 : LSN,
729 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
730 1 : );
731 1 : let serialized_size = value.2.serialized_size().unwrap() as usize;
732 1 : let value = (value.0, value.1, serialized_size, value.2);
733 1 : values.push(value.clone());
734 1 : batch.put(value.0, value.3, value.1);
735 :
736 1 : validate_batch(&batch, &values, None);
737 1 : }
738 :
739 : #[test]
740 1 : fn test_extension() {
741 : const LSN: Lsn = Lsn(0x10);
742 1 : let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
743 :
744 1 : let values = vec![
745 1 : (
746 1 : key.to_compact(),
747 1 : LSN,
748 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
749 1 : ),
750 1 : (
751 1 : key.next().to_compact(),
752 1 : LSN,
753 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
754 1 : ),
755 1 : (
756 1 : key.next().next().to_compact(),
757 1 : LSN,
758 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
759 1 : ),
760 : ];
761 :
762 1 : let mut values = values
763 1 : .into_iter()
764 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
765 1 : .collect::<Vec<_>>();
766 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
767 :
768 1 : let other_values = vec![
769 1 : (
770 1 : key.to_compact(),
771 1 : Lsn(LSN.0 + 0x10),
772 1 : Value::WalRecord(NeonWalRecord::wal_append("foo")),
773 1 : ),
774 1 : (
775 1 : key.next().to_compact(),
776 1 : Lsn(LSN.0 + 0x10),
777 1 : Value::WalRecord(NeonWalRecord::wal_append("bar")),
778 1 : ),
779 1 : (
780 1 : key.next().next().to_compact(),
781 1 : Lsn(LSN.0 + 0x10),
782 1 : Value::WalRecord(NeonWalRecord::wal_append("taz")),
783 1 : ),
784 : ];
785 :
786 1 : let other_values = other_values
787 1 : .into_iter()
788 3 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
789 1 : .collect::<Vec<_>>();
790 1 : let other_batch = SerializedValueBatch::from_values(other_values.clone());
791 :
792 1 : values.extend(other_values);
793 1 : batch.extend(other_batch);
794 :
795 1 : validate_batch(&batch, &values, None);
796 1 : }
797 :
798 : #[test]
799 1 : fn test_gap_zeroing() {
800 : const LSN: Lsn = Lsn(0x10);
801 1 : let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
802 :
803 1 : let rel_bar_base_key = {
804 1 : let mut key = rel_foo_base_key;
805 1 : key.field4 += 1;
806 1 : key
807 : };
808 :
809 1 : let values = vec![
810 1 : (
811 1 : rel_foo_base_key.to_compact(),
812 1 : LSN,
813 1 : Value::WalRecord(NeonWalRecord::wal_append("foo1")),
814 1 : ),
815 1 : (
816 1 : rel_foo_base_key.add(1).to_compact(),
817 1 : LSN,
818 1 : Value::WalRecord(NeonWalRecord::wal_append("foo2")),
819 1 : ),
820 1 : (
821 1 : rel_foo_base_key.add(5).to_compact(),
822 1 : LSN,
823 1 : Value::WalRecord(NeonWalRecord::wal_append("foo3")),
824 1 : ),
825 1 : (
826 1 : rel_foo_base_key.add(1).to_compact(),
827 1 : Lsn(LSN.0 + 0x10),
828 1 : Value::WalRecord(NeonWalRecord::wal_append("foo4")),
829 1 : ),
830 1 : (
831 1 : rel_foo_base_key.add(10).to_compact(),
832 1 : Lsn(LSN.0 + 0x10),
833 1 : Value::WalRecord(NeonWalRecord::wal_append("foo5")),
834 1 : ),
835 1 : (
836 1 : rel_foo_base_key.add(11).to_compact(),
837 1 : Lsn(LSN.0 + 0x10),
838 1 : Value::WalRecord(NeonWalRecord::wal_append("foo6")),
839 1 : ),
840 1 : (
841 1 : rel_foo_base_key.add(12).to_compact(),
842 1 : Lsn(LSN.0 + 0x10),
843 1 : Value::WalRecord(NeonWalRecord::wal_append("foo7")),
844 1 : ),
845 1 : (
846 1 : rel_bar_base_key.to_compact(),
847 1 : LSN,
848 1 : Value::WalRecord(NeonWalRecord::wal_append("bar1")),
849 1 : ),
850 1 : (
851 1 : rel_bar_base_key.add(4).to_compact(),
852 1 : LSN,
853 1 : Value::WalRecord(NeonWalRecord::wal_append("bar2")),
854 1 : ),
855 : ];
856 :
857 1 : let values = values
858 1 : .into_iter()
859 9 : .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
860 1 : .collect::<Vec<_>>();
861 :
862 1 : let mut batch = SerializedValueBatch::from_values(values.clone());
863 :
864 1 : let gaps = vec![
865 1 : (
866 1 : KeySpace {
867 1 : ranges: vec![
868 1 : rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
869 1 : rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
870 1 : ],
871 1 : },
872 1 : LSN,
873 1 : ),
874 1 : (
875 1 : KeySpace {
876 1 : ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
877 1 : },
878 1 : Lsn(LSN.0 + 0x10),
879 1 : ),
880 : ];
881 :
882 1 : batch.zero_gaps(gaps.clone());
883 1 : validate_batch(&batch, &values, Some(&gaps));
884 1 : }
885 : }
|