LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - serialized_batch.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 91.8 % 631 579
Test Date: 2025-03-12 00:01:28 Functions: 54.8 % 62 34

            Line data    Source code
       1              : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
       2              : //! instances. Each batch contains a raw buffer (serialized values)
       3              : //! and a list of metadata for each (key, LSN) tuple present in the batch.
       4              : //!
       5              : //! Such batches are created from decoded PG wal records and ingested
       6              : //! by the pageserver by writing directly to the ephemeral file.
       7              : 
       8              : use std::collections::{BTreeSet, HashMap};
       9              : 
      10              : use bytes::{Bytes, BytesMut};
      11              : use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
      12              : use pageserver_api::keyspace::KeySpace;
      13              : use pageserver_api::record::NeonWalRecord;
      14              : use pageserver_api::reltag::RelTag;
      15              : use pageserver_api::shard::ShardIdentity;
      16              : use pageserver_api::value::Value;
      17              : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
      18              : use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
      19              : use serde::{Deserialize, Serialize};
      20              : use utils::bin_ser::BeSer;
      21              : use utils::lsn::Lsn;
      22              : 
      23              : use crate::models::InterpretedWalRecord;
      24              : 
      25              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
      26              : 
      27              : /// Accompanying metadata for the batch
      28              : /// A value may be serialized and stored into the batch or just "observed".
      29              : /// Shard 0 currently "observes" all values in order to accurately track
      30              : /// relation sizes. In the case of "observed" values, we only need to know
      31              : /// the key and LSN, so two types of metadata are supported to save on network
      32              : /// bandwidth.
      33            0 : #[derive(Serialize, Deserialize, Clone)]
      34              : pub enum ValueMeta {
      35              :     Serialized(SerializedValueMeta),
      36              :     Observed(ObservedValueMeta),
      37              : }
      38              : 
      39              : impl ValueMeta {
      40     57091945 :     pub fn key(&self) -> CompactKey {
      41     57091945 :         match self {
      42     57091945 :             Self::Serialized(ser) => ser.key,
      43            0 :             Self::Observed(obs) => obs.key,
      44              :         }
      45     57091945 :     }
      46              : 
      47     56800661 :     pub fn lsn(&self) -> Lsn {
      48     56800661 :         match self {
      49     56800661 :             Self::Serialized(ser) => ser.lsn,
      50            0 :             Self::Observed(obs) => obs.lsn,
      51              :         }
      52     56800661 :     }
      53              : }
      54              : 
      55              : /// Wrapper around [`ValueMeta`] that implements ordering by
      56              : /// (key, LSN) tuples
      57              : struct OrderedValueMeta(ValueMeta);
      58              : 
      59              : impl Ord for OrderedValueMeta {
      60           59 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      61           59 :         (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
      62           59 :     }
      63              : }
      64              : 
      65              : impl PartialOrd for OrderedValueMeta {
      66            9 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      67            9 :         Some(self.cmp(other))
      68            9 :     }
      69              : }
      70              : 
      71              : impl PartialEq for OrderedValueMeta {
      72            8 :     fn eq(&self, other: &Self) -> bool {
      73            8 :         (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
      74            8 :     }
      75              : }
      76              : 
      77              : impl Eq for OrderedValueMeta {}
      78              : 
      79              : /// Metadata for a [`Value`] serialized into the batch.
      80            0 : #[derive(Serialize, Deserialize, Clone)]
      81              : pub struct SerializedValueMeta {
      82              :     pub key: CompactKey,
      83              :     pub lsn: Lsn,
      84              :     /// Starting offset of the value for the (key, LSN) tuple
      85              :     /// in [`SerializedValueBatch::raw`]
      86              :     pub batch_offset: u64,
      87              :     pub len: usize,
      88              :     pub will_init: bool,
      89              : }
      90              : 
      91              : /// Metadata for a [`Value`] observed by the batch
      92            0 : #[derive(Serialize, Deserialize, Clone)]
      93              : pub struct ObservedValueMeta {
      94              :     pub key: CompactKey,
      95              :     pub lsn: Lsn,
      96              : }
      97              : 
      98              : /// Batch of serialized [`Value`]s.
      99            0 : #[derive(Serialize, Deserialize, Clone)]
     100              : pub struct SerializedValueBatch {
     101              :     /// [`Value`]s serialized in EphemeralFile's native format,
     102              :     /// ready for disk write by the pageserver
     103              :     pub raw: Vec<u8>,
     104              : 
     105              :     /// Metadata to make sense of the bytes in [`Self::raw`]
     106              :     /// and represent "observed" values.
     107              :     ///
     108              :     /// Invariant: Metadata entries for any given key are ordered
     109              :     /// by LSN. Note that entries for a key do not have to be contiguous.
     110              :     pub metadata: Vec<ValueMeta>,
     111              : 
     112              :     /// The highest LSN of any value in the batch
     113              :     pub max_lsn: Lsn,
     114              : 
     115              :     /// Number of values encoded by [`Self::raw`]
     116              :     pub len: usize,
     117              : }
     118              : 
     119              : impl Default for SerializedValueBatch {
     120       816886 :     fn default() -> Self {
     121       816886 :         Self {
     122       816886 :             raw: Default::default(),
     123       816886 :             metadata: Default::default(),
     124       816886 :             max_lsn: Lsn(0),
     125       816886 :             len: 0,
     126       816886 :         }
     127       816886 :     }
     128              : }
     129              : 
     130              : impl SerializedValueBatch {
     131              :     /// Populates the given `shard_records` with value batches from this WAL record, if any,
     132              :     /// discarding those belonging to other shards.
     133              :     ///
     134              :     /// The batch will only contain values for keys targeting the specifiec
     135              :     /// shard. Shard 0 is a special case, where any keys that don't belong to
     136              :     /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
     137              :     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     138       292310 :     pub(crate) fn from_decoded_filtered(
     139       292310 :         decoded: DecodedWALRecord,
     140       292310 :         shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
     141       292310 :         next_record_lsn: Lsn,
     142       292310 :         pg_version: u32,
     143       292310 :     ) -> anyhow::Result<()> {
     144              :         // First determine how big the buffers need to be and allocate it up-front.
     145              :         // This duplicates some of the work below, but it's empirically much faster.
     146       292510 :         for (shard, record) in shard_records.iter_mut() {
     147       292510 :             assert!(record.batch.is_empty());
     148              : 
     149       292510 :             let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
     150       292510 :             record.batch.raw = Vec::with_capacity(estimate);
     151              :         }
     152              : 
     153       292310 :         for blk in decoded.blocks.iter() {
     154       291284 :             let rel = RelTag {
     155       291284 :                 spcnode: blk.rnode_spcnode,
     156       291284 :                 dbnode: blk.rnode_dbnode,
     157       291284 :                 relnode: blk.rnode_relnode,
     158       291284 :                 forknum: blk.forknum,
     159       291284 :             };
     160       291284 : 
     161       291284 :             let key = rel_block_to_key(rel, blk.blkno);
     162       291284 : 
     163       291284 :             if !key.is_valid_key_on_write_path() {
     164            0 :                 anyhow::bail!(
     165            0 :                     "Unsupported key decoded at LSN {}: {}",
     166            0 :                     next_record_lsn,
     167            0 :                     key
     168            0 :                 );
     169       291284 :             }
     170              : 
     171       291284 :             for (shard, record) in shard_records.iter_mut() {
     172       291284 :                 let key_is_local = shard.is_key_local(&key);
     173       291284 : 
     174       291284 :                 tracing::debug!(
     175              :                     lsn=%next_record_lsn,
     176              :                     key=%key,
     177            0 :                     "ingest: shard decision {}",
     178            0 :                     if !key_is_local { "drop" } else { "keep" },
     179              :                 );
     180              : 
     181       291284 :                 if !key_is_local {
     182            0 :                     if shard.is_shard_zero() {
     183              :                         // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     184              :                         // its blkno in case it implicitly extends a relation.
     185            0 :                         record
     186            0 :                             .batch
     187            0 :                             .metadata
     188            0 :                             .push(ValueMeta::Observed(ObservedValueMeta {
     189            0 :                                 key: key.to_compact(),
     190            0 :                                 lsn: next_record_lsn,
     191            0 :                             }))
     192            0 :                     }
     193              : 
     194            0 :                     continue;
     195       291284 :                 }
     196              : 
     197              :                 // Instead of storing full-page-image WAL record,
     198              :                 // it is better to store extracted image: we can skip wal-redo
     199              :                 // in this case. Also some FPI records may contain multiple (up to 32) pages,
     200              :                 // so them have to be copied multiple times.
     201              :                 //
     202       291284 :                 let val = if Self::block_is_image(&decoded, blk, pg_version) {
     203              :                     // Extract page image from FPI record
     204           48 :                     let img_len = blk.bimg_len as usize;
     205           48 :                     let img_offs = blk.bimg_offset as usize;
     206           48 :                     let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     207           48 :                     // TODO(vlad): skip the copy
     208           48 :                     image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     209           48 : 
     210           48 :                     if blk.hole_length != 0 {
     211            0 :                         let tail = image.split_off(blk.hole_offset as usize);
     212            0 :                         image.resize(image.len() + blk.hole_length as usize, 0u8);
     213            0 :                         image.unsplit(tail);
     214           48 :                     }
     215              :                     //
     216              :                     // Match the logic of XLogReadBufferForRedoExtended:
     217              :                     // The page may be uninitialized. If so, we can't set the LSN because
     218              :                     // that would corrupt the page.
     219              :                     //
     220           48 :                     if !page_is_new(&image) {
     221           36 :                         page_set_lsn(&mut image, next_record_lsn)
     222           12 :                     }
     223           48 :                     assert_eq!(image.len(), BLCKSZ as usize);
     224              : 
     225           48 :                     Value::Image(image.freeze())
     226              :                 } else {
     227              :                     Value::WalRecord(NeonWalRecord::Postgres {
     228       291236 :                         will_init: blk.will_init || blk.apply_image,
     229       291236 :                         rec: decoded.record.clone(),
     230              :                     })
     231              :                 };
     232              : 
     233       291284 :                 let relative_off = record.batch.raw.len() as u64;
     234       291284 : 
     235       291284 :                 val.ser_into(&mut record.batch.raw)
     236       291284 :                     .expect("Writing into in-memory buffer is infallible");
     237       291284 : 
     238       291284 :                 let val_ser_size = record.batch.raw.len() - relative_off as usize;
     239       291284 : 
     240       291284 :                 record
     241       291284 :                     .batch
     242       291284 :                     .metadata
     243       291284 :                     .push(ValueMeta::Serialized(SerializedValueMeta {
     244       291284 :                         key: key.to_compact(),
     245       291284 :                         lsn: next_record_lsn,
     246       291284 :                         batch_offset: relative_off,
     247       291284 :                         len: val_ser_size,
     248       291284 :                         will_init: val.will_init(),
     249       291284 :                     }));
     250       291284 :                 record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
     251       291284 :                 record.batch.len += 1;
     252              :             }
     253              :         }
     254              : 
     255       292310 :         if cfg!(any(debug_assertions, test)) {
     256              :             // Validate that the batches are correct
     257       292510 :             for record in shard_records.values() {
     258       292510 :                 record.batch.validate_lsn_order();
     259       292510 :             }
     260            0 :         }
     261              : 
     262       292310 :         Ok(())
     263       292310 :     }
     264              : 
     265              :     /// Look into the decoded PG WAL record and determine
     266              :     /// roughly how large the buffer for serialized values needs to be.
     267       292510 :     fn estimate_buffer_size(
     268       292510 :         decoded: &DecodedWALRecord,
     269       292510 :         shard: &ShardIdentity,
     270       292510 :         pg_version: u32,
     271       292510 :     ) -> usize {
     272       292510 :         let mut estimate: usize = 0;
     273              : 
     274       292510 :         for blk in decoded.blocks.iter() {
     275       291284 :             let rel = RelTag {
     276       291284 :                 spcnode: blk.rnode_spcnode,
     277       291284 :                 dbnode: blk.rnode_dbnode,
     278       291284 :                 relnode: blk.rnode_relnode,
     279       291284 :                 forknum: blk.forknum,
     280       291284 :             };
     281       291284 : 
     282       291284 :             let key = rel_block_to_key(rel, blk.blkno);
     283       291284 : 
     284       291284 :             if !shard.is_key_local(&key) {
     285            0 :                 continue;
     286       291284 :             }
     287       291284 : 
     288       291284 :             if Self::block_is_image(decoded, blk, pg_version) {
     289           48 :                 // 4 bytes for the Value::Image discriminator
     290           48 :                 // 8 bytes for encoding the size of the buffer
     291           48 :                 // BLCKSZ for the raw image
     292           48 :                 estimate += (4 + 8 + BLCKSZ) as usize;
     293       291236 :             } else {
     294       291236 :                 // 4 bytes for the Value::WalRecord discriminator
     295       291236 :                 // 4 bytes for the NeonWalRecord::Postgres discriminator
     296       291236 :                 // 1 bytes for NeonWalRecord::Postgres::will_init
     297       291236 :                 // 8 bytes for encoding the size of the buffer
     298       291236 :                 // length of the raw record
     299       291236 :                 estimate += 8 + 1 + 8 + decoded.record.len();
     300       291236 :             }
     301              :         }
     302              : 
     303       292510 :         estimate
     304       292510 :     }
     305              : 
     306       582568 :     fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
     307       582568 :         blk.apply_image
     308          240 :             && blk.has_image
     309          240 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     310           96 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     311            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     312              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     313           96 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
     314              :             // do not materialize null pages because them most likely be soon replaced with real data
     315           96 :             && blk.bimg_len != 0
     316       582568 :     }
     317              : 
     318              :     /// Encode a list of values and metadata into a serialized batch
     319              :     ///
     320              :     /// This is used by the pageserver ingest code to conveniently generate
     321              :     /// batches for metadata writes.
     322      9321961 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
     323      9321961 :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     324      9321961 :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     325      9328501 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     326      9321961 :         let mut buf = Vec::<u8>::with_capacity(buffer_size);
     327      9321961 : 
     328      9321961 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
     329      9321961 :         let mut max_lsn: Lsn = Lsn(0);
     330      9321961 :         let len = batch.len();
     331     18650462 :         for (key, lsn, val_ser_size, val) in batch {
     332      9328501 :             let relative_off = buf.len() as u64;
     333      9328501 : 
     334      9328501 :             val.ser_into(&mut buf)
     335      9328501 :                 .expect("Writing into in-memory buffer is infallible");
     336      9328501 : 
     337      9328501 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     338      9328501 :                 key,
     339      9328501 :                 lsn,
     340      9328501 :                 batch_offset: relative_off,
     341      9328501 :                 len: val_ser_size,
     342      9328501 :                 will_init: val.will_init(),
     343      9328501 :             }));
     344      9328501 :             max_lsn = std::cmp::max(max_lsn, lsn);
     345      9328501 :         }
     346              : 
     347              :         // Assert that we didn't do any extra allocations while building buffer.
     348      9321961 :         debug_assert!(buf.len() <= buffer_size);
     349              : 
     350      9321961 :         if cfg!(any(debug_assertions, test)) {
     351      9321961 :             let batch = Self {
     352      9321961 :                 raw: buf,
     353      9321961 :                 metadata,
     354      9321961 :                 max_lsn,
     355      9321961 :                 len,
     356      9321961 :             };
     357      9321961 : 
     358      9321961 :             batch.validate_lsn_order();
     359      9321961 : 
     360      9321961 :             return batch;
     361            0 :         }
     362            0 : 
     363            0 :         Self {
     364            0 :             raw: buf,
     365            0 :             metadata,
     366            0 :             max_lsn,
     367            0 :             len,
     368            0 :         }
     369      9321961 :     }
     370              : 
     371              :     /// Add one value to the batch
     372              :     ///
     373              :     /// This is used by the pageserver ingest code to include metadata block
     374              :     /// updates for a single key.
     375       561734 :     pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
     376       561734 :         let relative_off = self.raw.len() as u64;
     377       561734 :         value.ser_into(&mut self.raw).unwrap();
     378       561734 : 
     379       561734 :         let val_ser_size = self.raw.len() - relative_off as usize;
     380       561734 :         self.metadata
     381       561734 :             .push(ValueMeta::Serialized(SerializedValueMeta {
     382       561734 :                 key,
     383       561734 :                 lsn,
     384       561734 :                 batch_offset: relative_off,
     385       561734 :                 len: val_ser_size,
     386       561734 :                 will_init: value.will_init(),
     387       561734 :             }));
     388       561734 : 
     389       561734 :         self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     390       561734 :         self.len += 1;
     391       561734 : 
     392       561734 :         if cfg!(any(debug_assertions, test)) {
     393       561734 :             self.validate_lsn_order();
     394       561734 :         }
     395       561734 :     }
     396              : 
     397              :     /// Extend with the contents of another batch
     398              :     ///
     399              :     /// One batch is generated for each decoded PG WAL record.
     400              :     /// They are then merged to accumulate reasonably sized writes.
     401       529153 :     pub fn extend(&mut self, mut other: SerializedValueBatch) {
     402       529153 :         let extend_batch_start_offset = self.raw.len() as u64;
     403       529153 : 
     404       529153 :         self.raw.extend(other.raw);
     405       529153 : 
     406       529153 :         // Shift the offsets in the batch we are extending with
     407       533015 :         other.metadata.iter_mut().for_each(|meta| match meta {
     408       533015 :             ValueMeta::Serialized(ser) => {
     409       533015 :                 ser.batch_offset += extend_batch_start_offset;
     410       533015 :                 if cfg!(debug_assertions) {
     411       533015 :                     let value_end = ser.batch_offset + ser.len as u64;
     412       533015 :                     assert!((value_end as usize) <= self.raw.len());
     413            0 :                 }
     414              :             }
     415            0 :             ValueMeta::Observed(_) => {}
     416       533015 :         });
     417       529153 :         self.metadata.extend(other.metadata);
     418       529153 : 
     419       529153 :         self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
     420       529153 : 
     421       529153 :         self.len += other.len;
     422       529153 : 
     423       529153 :         if cfg!(any(debug_assertions, test)) {
     424       529153 :             self.validate_lsn_order();
     425       529153 :         }
     426       529153 :     }
     427              : 
     428              :     /// Add zero images for the (key, LSN) tuples specified
     429              :     ///
     430              :     /// PG versions below 16 do not zero out pages before extending
     431              :     /// a relation and may leave gaps. Such gaps need to be identified
     432              :     /// by the pageserver ingest logic and get patched up here.
     433              :     ///
     434              :     /// Note that this function does not validate that the gaps have been
     435              :     /// identified correctly (it does not know relation sizes), so it's up
     436              :     /// to the call-site to do it properly.
     437            1 :     pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
     438            1 :         // Implementation note:
     439            1 :         //
     440            1 :         // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
     441            1 :         // but the metadata entries should be ordered properly (see
     442            1 :         // [`SerializedValueBatch::metadata`]).
     443            1 :         //
     444            1 :         // Exploiting this observation we do:
     445            1 :         // 1. Drain all the metadata entries into an ordered set.
     446            1 :         // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
     447            1 :         // includes more than one update to the same block in the same WAL record.
     448            1 :         // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
     449            1 :         // and add an index entry to the ordered metadata set.
     450            1 :         // 3. Drain the ordered set back into a metadata vector
     451            1 : 
     452            1 :         let mut ordered_metas = self
     453            1 :             .metadata
     454            1 :             .drain(..)
     455            1 :             .map(OrderedValueMeta)
     456            1 :             .collect::<BTreeSet<_>>();
     457            3 :         for (keyspace, lsn) in gaps {
     458            2 :             self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     459              : 
     460            5 :             for gap_range in keyspace.ranges {
     461            3 :                 let mut key = gap_range.start;
     462           13 :                 while key != gap_range.end {
     463           10 :                     let relative_off = self.raw.len() as u64;
     464           10 : 
     465           10 :                     // TODO(vlad): Can we be cheeky and write only one zero image, and
     466           10 :                     // make all index entries requiring a zero page point to it?
     467           10 :                     // Alternatively, we can change the index entry format to represent zero pages
     468           10 :                     // without writing them at all.
     469           10 :                     Value::Image(ZERO_PAGE.clone())
     470           10 :                         .ser_into(&mut self.raw)
     471           10 :                         .unwrap();
     472           10 :                     let val_ser_size = self.raw.len() - relative_off as usize;
     473           10 : 
     474           10 :                     ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
     475           10 :                         SerializedValueMeta {
     476           10 :                             key: key.to_compact(),
     477           10 :                             lsn,
     478           10 :                             batch_offset: relative_off,
     479           10 :                             len: val_ser_size,
     480           10 :                             will_init: true,
     481           10 :                         },
     482           10 :                     )));
     483           10 : 
     484           10 :                     self.len += 1;
     485           10 : 
     486           10 :                     key = key.next();
     487           10 :                 }
     488              :             }
     489              :         }
     490              : 
     491           19 :         self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
     492            1 : 
     493            1 :         if cfg!(any(debug_assertions, test)) {
     494            1 :             self.validate_lsn_order();
     495            1 :         }
     496            1 :     }
     497              : 
     498              :     /// Checks if the batch contains any serialized or observed values
     499       292711 :     pub fn is_empty(&self) -> bool {
     500       292711 :         !self.has_data() && self.metadata.is_empty()
     501       292711 :     }
     502              : 
     503              :     /// Checks if the batch contains only observed values
     504            0 :     pub fn is_observed(&self) -> bool {
     505            0 :         !self.has_data() && !self.metadata.is_empty()
     506            0 :     }
     507              : 
     508              :     /// Checks if the batch contains data
     509              :     ///
     510              :     /// Note that if this returns false, it may still contain observed values or
     511              :     /// a metadata record.
     512     10192847 :     pub fn has_data(&self) -> bool {
     513     10192847 :         let empty = self.raw.is_empty();
     514     10192847 : 
     515     10192847 :         if cfg!(debug_assertions) && empty {
     516       293114 :             assert!(
     517       293114 :                 self.metadata
     518       293114 :                     .iter()
     519       293114 :                     .all(|meta| matches!(meta, ValueMeta::Observed(_)))
     520       293114 :             );
     521      9899733 :         }
     522              : 
     523     10192847 :         !empty
     524     10192847 :     }
     525              : 
     526              :     /// Returns the number of values serialized in the batch
     527       291306 :     pub fn len(&self) -> usize {
     528       291306 :         self.len
     529       291306 :     }
     530              : 
     531              :     /// Returns the size of the buffer wrapped by the batch
     532      9608478 :     pub fn buffer_size(&self) -> usize {
     533      9608478 :         self.raw.len()
     534      9608478 :     }
     535              : 
     536            0 :     pub fn updates_key(&self, key: &Key) -> bool {
     537            0 :         self.metadata.iter().any(|meta| match meta {
     538            0 :             ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
     539            0 :             ValueMeta::Observed(_) => false,
     540            0 :         })
     541            0 :     }
     542              : 
     543     10705365 :     pub fn validate_lsn_order(&self) {
     544              :         use std::collections::HashMap;
     545              : 
     546     10705365 :         let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
     547              : 
     548     56800287 :         for meta in self.metadata.iter() {
     549     56800287 :             let lsn = meta.lsn();
     550     56800287 :             let key = meta.key();
     551              : 
     552     56800287 :             if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
     553           15 :                 assert!(
     554           15 :                     lsn >= prev_lsn,
     555            0 :                     "Ordering violated by {}: {} < {}",
     556            0 :                     Key::from_compact(key),
     557              :                     lsn,
     558              :                     prev_lsn
     559              :                 );
     560     56800272 :             }
     561              :         }
     562     10705365 :     }
     563              : }
     564              : 
     565              : #[cfg(all(test, feature = "testing"))]
     566              : mod tests {
     567              :     use super::*;
     568              : 
     569            6 :     fn validate_batch(
     570            6 :         batch: &SerializedValueBatch,
     571            6 :         values: &[(CompactKey, Lsn, usize, Value)],
     572            6 :         gaps: Option<&Vec<(KeySpace, Lsn)>>,
     573            6 :     ) {
     574              :         // Invariant 1: The metadata for a given entry in the batch
     575              :         // is correct and can be used to deserialize back to the original value.
     576           28 :         for (key, lsn, size, value) in values.iter() {
     577           28 :             let meta = batch
     578           28 :                 .metadata
     579           28 :                 .iter()
     580          136 :                 .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
     581           28 :                 .unwrap();
     582           28 :             let meta = match meta {
     583           28 :                 ValueMeta::Serialized(ser) => ser,
     584            0 :                 ValueMeta::Observed(_) => unreachable!(),
     585              :             };
     586              : 
     587           28 :             assert_eq!(meta.len, *size);
     588           28 :             assert_eq!(meta.will_init, value.will_init());
     589              : 
     590           28 :             let start = meta.batch_offset as usize;
     591           28 :             let end = meta.batch_offset as usize + meta.len;
     592           28 :             let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     593           28 :             assert_eq!(&value_from_batch, value);
     594              :         }
     595              : 
     596           28 :         let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
     597            6 :         let mut gap_pages_count: usize = 0;
     598              : 
     599              :         // Invariant 2: Zero pages were added for identified gaps and their metadata
     600              :         // is correct.
     601            6 :         if let Some(gaps) = gaps {
     602            3 :             for (gap_keyspace, lsn) in gaps {
     603            5 :                 for gap_range in &gap_keyspace.ranges {
     604            3 :                     let mut gap_key = gap_range.start;
     605           13 :                     while gap_key != gap_range.end {
     606           10 :                         let meta = batch
     607           10 :                             .metadata
     608           10 :                             .iter()
     609          104 :                             .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
     610           10 :                             .unwrap();
     611           10 :                         let meta = match meta {
     612           10 :                             ValueMeta::Serialized(ser) => ser,
     613            0 :                             ValueMeta::Observed(_) => unreachable!(),
     614              :                         };
     615              : 
     616           10 :                         let zero_value = Value::Image(ZERO_PAGE.clone());
     617           10 :                         let zero_value_size = zero_value.serialized_size().unwrap() as usize;
     618           10 : 
     619           10 :                         assert_eq!(meta.len, zero_value_size);
     620           10 :                         assert_eq!(meta.will_init, zero_value.will_init());
     621              : 
     622           10 :                         let start = meta.batch_offset as usize;
     623           10 :                         let end = meta.batch_offset as usize + meta.len;
     624           10 :                         let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     625           10 :                         assert_eq!(value_from_batch, zero_value);
     626              : 
     627           10 :                         gap_pages_count += 1;
     628           10 :                         expected_buffer_size += zero_value_size;
     629           10 :                         gap_key = gap_key.next();
     630              :                     }
     631              :                 }
     632              :             }
     633            5 :         }
     634              : 
     635              :         // Invariant 3: The length of the batch is equal to the number
     636              :         // of values inserted, plus the number of gap pages. This extends
     637              :         // to the raw buffer size.
     638            6 :         assert_eq!(batch.len(), values.len() + gap_pages_count);
     639            6 :         assert_eq!(expected_buffer_size, batch.buffer_size());
     640              : 
     641              :         // Invariant 4: Metadata entries for any given key are sorted in LSN order.
     642            6 :         batch.validate_lsn_order();
     643            6 :     }
     644              : 
     645              :     #[test]
     646            1 :     fn test_creation_from_values() {
     647              :         const LSN: Lsn = Lsn(0x10);
     648            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     649            1 : 
     650            1 :         let values = vec![
     651            1 :             (
     652            1 :                 key.to_compact(),
     653            1 :                 LSN,
     654            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     655            1 :             ),
     656            1 :             (
     657            1 :                 key.next().to_compact(),
     658            1 :                 LSN,
     659            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     660            1 :             ),
     661            1 :             (
     662            1 :                 key.to_compact(),
     663            1 :                 Lsn(LSN.0 + 0x10),
     664            1 :                 Value::WalRecord(NeonWalRecord::wal_append("baz")),
     665            1 :             ),
     666            1 :             (
     667            1 :                 key.next().next().to_compact(),
     668            1 :                 LSN,
     669            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     670            1 :             ),
     671            1 :         ];
     672            1 : 
     673            1 :         let values = values
     674            1 :             .into_iter()
     675            4 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     676            1 :             .collect::<Vec<_>>();
     677            1 :         let batch = SerializedValueBatch::from_values(values.clone());
     678            1 : 
     679            1 :         validate_batch(&batch, &values, None);
     680            1 : 
     681            1 :         assert!(!batch.is_empty());
     682            1 :     }
     683              : 
     684              :     #[test]
     685            1 :     fn test_put() {
     686              :         const LSN: Lsn = Lsn(0x10);
     687            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     688            1 : 
     689            1 :         let values = vec![
     690            1 :             (
     691            1 :                 key.to_compact(),
     692            1 :                 LSN,
     693            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     694            1 :             ),
     695            1 :             (
     696            1 :                 key.next().to_compact(),
     697            1 :                 LSN,
     698            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     699            1 :             ),
     700            1 :         ];
     701            1 : 
     702            1 :         let mut values = values
     703            1 :             .into_iter()
     704            2 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     705            1 :             .collect::<Vec<_>>();
     706            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     707            1 : 
     708            1 :         validate_batch(&batch, &values, None);
     709            1 : 
     710            1 :         let value = (
     711            1 :             key.to_compact(),
     712            1 :             Lsn(LSN.0 + 0x10),
     713            1 :             Value::WalRecord(NeonWalRecord::wal_append("baz")),
     714            1 :         );
     715            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     716            1 :         let value = (value.0, value.1, serialized_size, value.2);
     717            1 :         values.push(value.clone());
     718            1 :         batch.put(value.0, value.3, value.1);
     719            1 : 
     720            1 :         validate_batch(&batch, &values, None);
     721            1 : 
     722            1 :         let value = (
     723            1 :             key.next().next().to_compact(),
     724            1 :             LSN,
     725            1 :             Value::WalRecord(NeonWalRecord::wal_append("taz")),
     726            1 :         );
     727            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     728            1 :         let value = (value.0, value.1, serialized_size, value.2);
     729            1 :         values.push(value.clone());
     730            1 :         batch.put(value.0, value.3, value.1);
     731            1 : 
     732            1 :         validate_batch(&batch, &values, None);
     733            1 :     }
     734              : 
     735              :     #[test]
     736            1 :     fn test_extension() {
     737              :         const LSN: Lsn = Lsn(0x10);
     738            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     739            1 : 
     740            1 :         let values = vec![
     741            1 :             (
     742            1 :                 key.to_compact(),
     743            1 :                 LSN,
     744            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     745            1 :             ),
     746            1 :             (
     747            1 :                 key.next().to_compact(),
     748            1 :                 LSN,
     749            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     750            1 :             ),
     751            1 :             (
     752            1 :                 key.next().next().to_compact(),
     753            1 :                 LSN,
     754            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     755            1 :             ),
     756            1 :         ];
     757            1 : 
     758            1 :         let mut values = values
     759            1 :             .into_iter()
     760            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     761            1 :             .collect::<Vec<_>>();
     762            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     763            1 : 
     764            1 :         let other_values = vec![
     765            1 :             (
     766            1 :                 key.to_compact(),
     767            1 :                 Lsn(LSN.0 + 0x10),
     768            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     769            1 :             ),
     770            1 :             (
     771            1 :                 key.next().to_compact(),
     772            1 :                 Lsn(LSN.0 + 0x10),
     773            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     774            1 :             ),
     775            1 :             (
     776            1 :                 key.next().next().to_compact(),
     777            1 :                 Lsn(LSN.0 + 0x10),
     778            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     779            1 :             ),
     780            1 :         ];
     781            1 : 
     782            1 :         let other_values = other_values
     783            1 :             .into_iter()
     784            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     785            1 :             .collect::<Vec<_>>();
     786            1 :         let other_batch = SerializedValueBatch::from_values(other_values.clone());
     787            1 : 
     788            1 :         values.extend(other_values);
     789            1 :         batch.extend(other_batch);
     790            1 : 
     791            1 :         validate_batch(&batch, &values, None);
     792            1 :     }
     793              : 
     794              :     #[test]
     795            1 :     fn test_gap_zeroing() {
     796              :         const LSN: Lsn = Lsn(0x10);
     797            1 :         let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     798            1 : 
     799            1 :         let rel_bar_base_key = {
     800            1 :             let mut key = rel_foo_base_key;
     801            1 :             key.field4 += 1;
     802            1 :             key
     803            1 :         };
     804            1 : 
     805            1 :         let values = vec![
     806            1 :             (
     807            1 :                 rel_foo_base_key.to_compact(),
     808            1 :                 LSN,
     809            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo1")),
     810            1 :             ),
     811            1 :             (
     812            1 :                 rel_foo_base_key.add(1).to_compact(),
     813            1 :                 LSN,
     814            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo2")),
     815            1 :             ),
     816            1 :             (
     817            1 :                 rel_foo_base_key.add(5).to_compact(),
     818            1 :                 LSN,
     819            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo3")),
     820            1 :             ),
     821            1 :             (
     822            1 :                 rel_foo_base_key.add(1).to_compact(),
     823            1 :                 Lsn(LSN.0 + 0x10),
     824            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo4")),
     825            1 :             ),
     826            1 :             (
     827            1 :                 rel_foo_base_key.add(10).to_compact(),
     828            1 :                 Lsn(LSN.0 + 0x10),
     829            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo5")),
     830            1 :             ),
     831            1 :             (
     832            1 :                 rel_foo_base_key.add(11).to_compact(),
     833            1 :                 Lsn(LSN.0 + 0x10),
     834            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo6")),
     835            1 :             ),
     836            1 :             (
     837            1 :                 rel_foo_base_key.add(12).to_compact(),
     838            1 :                 Lsn(LSN.0 + 0x10),
     839            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo7")),
     840            1 :             ),
     841            1 :             (
     842            1 :                 rel_bar_base_key.to_compact(),
     843            1 :                 LSN,
     844            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar1")),
     845            1 :             ),
     846            1 :             (
     847            1 :                 rel_bar_base_key.add(4).to_compact(),
     848            1 :                 LSN,
     849            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar2")),
     850            1 :             ),
     851            1 :         ];
     852            1 : 
     853            1 :         let values = values
     854            1 :             .into_iter()
     855            9 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     856            1 :             .collect::<Vec<_>>();
     857            1 : 
     858            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     859            1 : 
     860            1 :         let gaps = vec![
     861            1 :             (
     862            1 :                 KeySpace {
     863            1 :                     ranges: vec![
     864            1 :                         rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
     865            1 :                         rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
     866            1 :                     ],
     867            1 :                 },
     868            1 :                 LSN,
     869            1 :             ),
     870            1 :             (
     871            1 :                 KeySpace {
     872            1 :                     ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
     873            1 :                 },
     874            1 :                 Lsn(LSN.0 + 0x10),
     875            1 :             ),
     876            1 :         ];
     877            1 : 
     878            1 :         batch.zero_gaps(gaps.clone());
     879            1 :         validate_batch(&batch, &values, Some(&gaps));
     880            1 :     }
     881              : }
        

Generated by: LCOV version 2.1-beta