LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - serialized_batch.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 91.7 % 630 578
Test Date: 2025-02-20 13:11:02 Functions: 54.8 % 62 34

            Line data    Source code
       1              : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
       2              : //! instances. Each batch contains a raw buffer (serialized values)
       3              : //! and a list of metadata for each (key, LSN) tuple present in the batch.
       4              : //!
       5              : //! Such batches are created from decoded PG wal records and ingested
       6              : //! by the pageserver by writing directly to the ephemeral file.
       7              : 
       8              : use std::collections::{BTreeSet, HashMap};
       9              : 
      10              : use bytes::{Bytes, BytesMut};
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use pageserver_api::keyspace::KeySpace;
      13              : use pageserver_api::record::NeonWalRecord;
      14              : use pageserver_api::reltag::RelTag;
      15              : use pageserver_api::shard::ShardIdentity;
      16              : use pageserver_api::{key::CompactKey, value::Value};
      17              : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
      18              : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
      19              : use serde::{Deserialize, Serialize};
      20              : use utils::bin_ser::BeSer;
      21              : use utils::lsn::Lsn;
      22              : 
      23              : use pageserver_api::key::Key;
      24              : 
      25              : use crate::models::InterpretedWalRecord;
      26              : 
      27              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
      28              : 
      29              : /// Accompanying metadata for the batch
      30              : /// A value may be serialized and stored into the batch or just "observed".
      31              : /// Shard 0 currently "observes" all values in order to accurately track
      32              : /// relation sizes. In the case of "observed" values, we only need to know
      33              : /// the key and LSN, so two types of metadata are supported to save on network
      34              : /// bandwidth.
      35            0 : #[derive(Serialize, Deserialize, Clone)]
      36              : pub enum ValueMeta {
      37              :     Serialized(SerializedValueMeta),
      38              :     Observed(ObservedValueMeta),
      39              : }
      40              : 
      41              : impl ValueMeta {
      42     57091809 :     pub fn key(&self) -> CompactKey {
      43     57091809 :         match self {
      44     57091809 :             Self::Serialized(ser) => ser.key,
      45            0 :             Self::Observed(obs) => obs.key,
      46              :         }
      47     57091809 :     }
      48              : 
      49     56800525 :     pub fn lsn(&self) -> Lsn {
      50     56800525 :         match self {
      51     56800525 :             Self::Serialized(ser) => ser.lsn,
      52            0 :             Self::Observed(obs) => obs.lsn,
      53              :         }
      54     56800525 :     }
      55              : }
      56              : 
      57              : /// Wrapper around [`ValueMeta`] that implements ordering by
      58              : /// (key, LSN) tuples
      59              : struct OrderedValueMeta(ValueMeta);
      60              : 
      61              : impl Ord for OrderedValueMeta {
      62           59 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      63           59 :         (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
      64           59 :     }
      65              : }
      66              : 
      67              : impl PartialOrd for OrderedValueMeta {
      68            9 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      69            9 :         Some(self.cmp(other))
      70            9 :     }
      71              : }
      72              : 
      73              : impl PartialEq for OrderedValueMeta {
      74            8 :     fn eq(&self, other: &Self) -> bool {
      75            8 :         (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
      76            8 :     }
      77              : }
      78              : 
      79              : impl Eq for OrderedValueMeta {}
      80              : 
      81              : /// Metadata for a [`Value`] serialized into the batch.
      82            0 : #[derive(Serialize, Deserialize, Clone)]
      83              : pub struct SerializedValueMeta {
      84              :     pub key: CompactKey,
      85              :     pub lsn: Lsn,
      86              :     /// Starting offset of the value for the (key, LSN) tuple
      87              :     /// in [`SerializedValueBatch::raw`]
      88              :     pub batch_offset: u64,
      89              :     pub len: usize,
      90              :     pub will_init: bool,
      91              : }
      92              : 
      93              : /// Metadata for a [`Value`] observed by the batch
      94            0 : #[derive(Serialize, Deserialize, Clone)]
      95              : pub struct ObservedValueMeta {
      96              :     pub key: CompactKey,
      97              :     pub lsn: Lsn,
      98              : }
      99              : 
     100              : /// Batch of serialized [`Value`]s.
     101            0 : #[derive(Serialize, Deserialize, Clone)]
     102              : pub struct SerializedValueBatch {
     103              :     /// [`Value`]s serialized in EphemeralFile's native format,
     104              :     /// ready for disk write by the pageserver
     105              :     pub raw: Vec<u8>,
     106              : 
     107              :     /// Metadata to make sense of the bytes in [`Self::raw`]
     108              :     /// and represent "observed" values.
     109              :     ///
     110              :     /// Invariant: Metadata entries for any given key are ordered
     111              :     /// by LSN. Note that entries for a key do not have to be contiguous.
     112              :     pub metadata: Vec<ValueMeta>,
     113              : 
     114              :     /// The highest LSN of any value in the batch
     115              :     pub max_lsn: Lsn,
     116              : 
     117              :     /// Number of values encoded by [`Self::raw`]
     118              :     pub len: usize,
     119              : }
     120              : 
     121              : impl Default for SerializedValueBatch {
     122       816875 :     fn default() -> Self {
     123       816875 :         Self {
     124       816875 :             raw: Default::default(),
     125       816875 :             metadata: Default::default(),
     126       816875 :             max_lsn: Lsn(0),
     127       816875 :             len: 0,
     128       816875 :         }
     129       816875 :     }
     130              : }
     131              : 
     132              : impl SerializedValueBatch {
     133              :     /// Populates the given `shard_records` with value batches from this WAL record, if any,
     134              :     /// discarding those belonging to other shards.
     135              :     ///
     136              :     /// The batch will only contain values for keys targeting the specifiec
     137              :     /// shard. Shard 0 is a special case, where any keys that don't belong to
     138              :     /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
     139              :     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     140       292300 :     pub(crate) fn from_decoded_filtered(
     141       292300 :         decoded: DecodedWALRecord,
     142       292300 :         shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
     143       292300 :         next_record_lsn: Lsn,
     144       292300 :         pg_version: u32,
     145       292300 :     ) -> anyhow::Result<()> {
     146              :         // First determine how big the buffers need to be and allocate it up-front.
     147              :         // This duplicates some of the work below, but it's empirically much faster.
     148       292499 :         for (shard, record) in shard_records.iter_mut() {
     149       292499 :             assert!(record.batch.is_empty());
     150              : 
     151       292499 :             let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
     152       292499 :             record.batch.raw = Vec::with_capacity(estimate);
     153              :         }
     154              : 
     155       292300 :         for blk in decoded.blocks.iter() {
     156       291284 :             let rel = RelTag {
     157       291284 :                 spcnode: blk.rnode_spcnode,
     158       291284 :                 dbnode: blk.rnode_dbnode,
     159       291284 :                 relnode: blk.rnode_relnode,
     160       291284 :                 forknum: blk.forknum,
     161       291284 :             };
     162       291284 : 
     163       291284 :             let key = rel_block_to_key(rel, blk.blkno);
     164       291284 : 
     165       291284 :             if !key.is_valid_key_on_write_path() {
     166            0 :                 anyhow::bail!(
     167            0 :                     "Unsupported key decoded at LSN {}: {}",
     168            0 :                     next_record_lsn,
     169            0 :                     key
     170            0 :                 );
     171       291284 :             }
     172              : 
     173       291284 :             for (shard, record) in shard_records.iter_mut() {
     174       291284 :                 let key_is_local = shard.is_key_local(&key);
     175       291284 : 
     176       291284 :                 tracing::debug!(
     177              :                     lsn=%next_record_lsn,
     178              :                     key=%key,
     179            0 :                     "ingest: shard decision {}",
     180            0 :                     if !key_is_local { "drop" } else { "keep" },
     181              :                 );
     182              : 
     183       291284 :                 if !key_is_local {
     184            0 :                     if shard.is_shard_zero() {
     185              :                         // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     186              :                         // its blkno in case it implicitly extends a relation.
     187            0 :                         record
     188            0 :                             .batch
     189            0 :                             .metadata
     190            0 :                             .push(ValueMeta::Observed(ObservedValueMeta {
     191            0 :                                 key: key.to_compact(),
     192            0 :                                 lsn: next_record_lsn,
     193            0 :                             }))
     194            0 :                     }
     195              : 
     196            0 :                     continue;
     197       291284 :                 }
     198              : 
     199              :                 // Instead of storing full-page-image WAL record,
     200              :                 // it is better to store extracted image: we can skip wal-redo
     201              :                 // in this case. Also some FPI records may contain multiple (up to 32) pages,
     202              :                 // so them have to be copied multiple times.
     203              :                 //
     204       291284 :                 let val = if Self::block_is_image(&decoded, blk, pg_version) {
     205              :                     // Extract page image from FPI record
     206           48 :                     let img_len = blk.bimg_len as usize;
     207           48 :                     let img_offs = blk.bimg_offset as usize;
     208           48 :                     let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     209           48 :                     // TODO(vlad): skip the copy
     210           48 :                     image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     211           48 : 
     212           48 :                     if blk.hole_length != 0 {
     213            0 :                         let tail = image.split_off(blk.hole_offset as usize);
     214            0 :                         image.resize(image.len() + blk.hole_length as usize, 0u8);
     215            0 :                         image.unsplit(tail);
     216           48 :                     }
     217              :                     //
     218              :                     // Match the logic of XLogReadBufferForRedoExtended:
     219              :                     // The page may be uninitialized. If so, we can't set the LSN because
     220              :                     // that would corrupt the page.
     221              :                     //
     222           48 :                     if !page_is_new(&image) {
     223           36 :                         page_set_lsn(&mut image, next_record_lsn)
     224           12 :                     }
     225           48 :                     assert_eq!(image.len(), BLCKSZ as usize);
     226              : 
     227           48 :                     Value::Image(image.freeze())
     228              :                 } else {
     229              :                     Value::WalRecord(NeonWalRecord::Postgres {
     230       291236 :                         will_init: blk.will_init || blk.apply_image,
     231       291236 :                         rec: decoded.record.clone(),
     232              :                     })
     233              :                 };
     234              : 
     235       291284 :                 let relative_off = record.batch.raw.len() as u64;
     236       291284 : 
     237       291284 :                 val.ser_into(&mut record.batch.raw)
     238       291284 :                     .expect("Writing into in-memory buffer is infallible");
     239       291284 : 
     240       291284 :                 let val_ser_size = record.batch.raw.len() - relative_off as usize;
     241       291284 : 
     242       291284 :                 record
     243       291284 :                     .batch
     244       291284 :                     .metadata
     245       291284 :                     .push(ValueMeta::Serialized(SerializedValueMeta {
     246       291284 :                         key: key.to_compact(),
     247       291284 :                         lsn: next_record_lsn,
     248       291284 :                         batch_offset: relative_off,
     249       291284 :                         len: val_ser_size,
     250       291284 :                         will_init: val.will_init(),
     251       291284 :                     }));
     252       291284 :                 record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
     253       291284 :                 record.batch.len += 1;
     254              :             }
     255              :         }
     256              : 
     257       292300 :         if cfg!(any(debug_assertions, test)) {
     258              :             // Validate that the batches are correct
     259       292499 :             for record in shard_records.values() {
     260       292499 :                 record.batch.validate_lsn_order();
     261       292499 :             }
     262            0 :         }
     263              : 
     264       292300 :         Ok(())
     265       292300 :     }
     266              : 
     267              :     /// Look into the decoded PG WAL record and determine
     268              :     /// roughly how large the buffer for serialized values needs to be.
     269       292499 :     fn estimate_buffer_size(
     270       292499 :         decoded: &DecodedWALRecord,
     271       292499 :         shard: &ShardIdentity,
     272       292499 :         pg_version: u32,
     273       292499 :     ) -> usize {
     274       292499 :         let mut estimate: usize = 0;
     275              : 
     276       292499 :         for blk in decoded.blocks.iter() {
     277       291284 :             let rel = RelTag {
     278       291284 :                 spcnode: blk.rnode_spcnode,
     279       291284 :                 dbnode: blk.rnode_dbnode,
     280       291284 :                 relnode: blk.rnode_relnode,
     281       291284 :                 forknum: blk.forknum,
     282       291284 :             };
     283       291284 : 
     284       291284 :             let key = rel_block_to_key(rel, blk.blkno);
     285       291284 : 
     286       291284 :             if !shard.is_key_local(&key) {
     287            0 :                 continue;
     288       291284 :             }
     289       291284 : 
     290       291284 :             if Self::block_is_image(decoded, blk, pg_version) {
     291           48 :                 // 4 bytes for the Value::Image discriminator
     292           48 :                 // 8 bytes for encoding the size of the buffer
     293           48 :                 // BLCKSZ for the raw image
     294           48 :                 estimate += (4 + 8 + BLCKSZ) as usize;
     295       291236 :             } else {
     296       291236 :                 // 4 bytes for the Value::WalRecord discriminator
     297       291236 :                 // 4 bytes for the NeonWalRecord::Postgres discriminator
     298       291236 :                 // 1 bytes for NeonWalRecord::Postgres::will_init
     299       291236 :                 // 8 bytes for encoding the size of the buffer
     300       291236 :                 // length of the raw record
     301       291236 :                 estimate += 8 + 1 + 8 + decoded.record.len();
     302       291236 :             }
     303              :         }
     304              : 
     305       292499 :         estimate
     306       292499 :     }
     307              : 
     308       582568 :     fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
     309       582568 :         blk.apply_image
     310          240 :             && blk.has_image
     311          240 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     312           96 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     313            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     314              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     315           96 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
     316              :             // do not materialize null pages because them most likely be soon replaced with real data
     317           96 :             && blk.bimg_len != 0
     318       582568 :     }
     319              : 
     320              :     /// Encode a list of values and metadata into a serialized batch
     321              :     ///
     322              :     /// This is used by the pageserver ingest code to conveniently generate
     323              :     /// batches for metadata writes.
     324      9321945 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
     325      9321945 :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     326      9321945 :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     327      9328365 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     328      9321945 :         let mut buf = Vec::<u8>::with_capacity(buffer_size);
     329      9321945 : 
     330      9321945 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
     331      9321945 :         let mut max_lsn: Lsn = Lsn(0);
     332      9321945 :         let len = batch.len();
     333     18650310 :         for (key, lsn, val_ser_size, val) in batch {
     334      9328365 :             let relative_off = buf.len() as u64;
     335      9328365 : 
     336      9328365 :             val.ser_into(&mut buf)
     337      9328365 :                 .expect("Writing into in-memory buffer is infallible");
     338      9328365 : 
     339      9328365 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     340      9328365 :                 key,
     341      9328365 :                 lsn,
     342      9328365 :                 batch_offset: relative_off,
     343      9328365 :                 len: val_ser_size,
     344      9328365 :                 will_init: val.will_init(),
     345      9328365 :             }));
     346      9328365 :             max_lsn = std::cmp::max(max_lsn, lsn);
     347      9328365 :         }
     348              : 
     349              :         // Assert that we didn't do any extra allocations while building buffer.
     350      9321945 :         debug_assert!(buf.len() <= buffer_size);
     351              : 
     352      9321945 :         if cfg!(any(debug_assertions, test)) {
     353      9321945 :             let batch = Self {
     354      9321945 :                 raw: buf,
     355      9321945 :                 metadata,
     356      9321945 :                 max_lsn,
     357      9321945 :                 len,
     358      9321945 :             };
     359      9321945 : 
     360      9321945 :             batch.validate_lsn_order();
     361      9321945 : 
     362      9321945 :             return batch;
     363            0 :         }
     364            0 : 
     365            0 :         Self {
     366            0 :             raw: buf,
     367            0 :             metadata,
     368            0 :             max_lsn,
     369            0 :             len,
     370            0 :         }
     371      9321945 :     }
     372              : 
     373              :     /// Add one value to the batch
     374              :     ///
     375              :     /// This is used by the pageserver ingest code to include metadata block
     376              :     /// updates for a single key.
     377       561734 :     pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
     378       561734 :         let relative_off = self.raw.len() as u64;
     379       561734 :         value.ser_into(&mut self.raw).unwrap();
     380       561734 : 
     381       561734 :         let val_ser_size = self.raw.len() - relative_off as usize;
     382       561734 :         self.metadata
     383       561734 :             .push(ValueMeta::Serialized(SerializedValueMeta {
     384       561734 :                 key,
     385       561734 :                 lsn,
     386       561734 :                 batch_offset: relative_off,
     387       561734 :                 len: val_ser_size,
     388       561734 :                 will_init: value.will_init(),
     389       561734 :             }));
     390       561734 : 
     391       561734 :         self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     392       561734 :         self.len += 1;
     393       561734 : 
     394       561734 :         if cfg!(any(debug_assertions, test)) {
     395       561734 :             self.validate_lsn_order();
     396       561734 :         }
     397       561734 :     }
     398              : 
     399              :     /// Extend with the contents of another batch
     400              :     ///
     401              :     /// One batch is generated for each decoded PG WAL record.
     402              :     /// They are then merged to accumulate reasonably sized writes.
     403       529153 :     pub fn extend(&mut self, mut other: SerializedValueBatch) {
     404       529153 :         let extend_batch_start_offset = self.raw.len() as u64;
     405       529153 : 
     406       529153 :         self.raw.extend(other.raw);
     407       529153 : 
     408       529153 :         // Shift the offsets in the batch we are extending with
     409       533015 :         other.metadata.iter_mut().for_each(|meta| match meta {
     410       533015 :             ValueMeta::Serialized(ser) => {
     411       533015 :                 ser.batch_offset += extend_batch_start_offset;
     412       533015 :                 if cfg!(debug_assertions) {
     413       533015 :                     let value_end = ser.batch_offset + ser.len as u64;
     414       533015 :                     assert!((value_end as usize) <= self.raw.len());
     415            0 :                 }
     416              :             }
     417            0 :             ValueMeta::Observed(_) => {}
     418       533015 :         });
     419       529153 :         self.metadata.extend(other.metadata);
     420       529153 : 
     421       529153 :         self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
     422       529153 : 
     423       529153 :         self.len += other.len;
     424       529153 : 
     425       529153 :         if cfg!(any(debug_assertions, test)) {
     426       529153 :             self.validate_lsn_order();
     427       529153 :         }
     428       529153 :     }
     429              : 
     430              :     /// Add zero images for the (key, LSN) tuples specified
     431              :     ///
     432              :     /// PG versions below 16 do not zero out pages before extending
     433              :     /// a relation and may leave gaps. Such gaps need to be identified
     434              :     /// by the pageserver ingest logic and get patched up here.
     435              :     ///
     436              :     /// Note that this function does not validate that the gaps have been
     437              :     /// identified correctly (it does not know relation sizes), so it's up
     438              :     /// to the call-site to do it properly.
     439            1 :     pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
     440            1 :         // Implementation note:
     441            1 :         //
     442            1 :         // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
     443            1 :         // but the metadata entries should be ordered properly (see
     444            1 :         // [`SerializedValueBatch::metadata`]).
     445            1 :         //
     446            1 :         // Exploiting this observation we do:
     447            1 :         // 1. Drain all the metadata entries into an ordered set.
     448            1 :         // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
     449            1 :         // includes more than one update to the same block in the same WAL record.
     450            1 :         // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
     451            1 :         // and add an index entry to the ordered metadata set.
     452            1 :         // 3. Drain the ordered set back into a metadata vector
     453            1 : 
     454            1 :         let mut ordered_metas = self
     455            1 :             .metadata
     456            1 :             .drain(..)
     457            1 :             .map(OrderedValueMeta)
     458            1 :             .collect::<BTreeSet<_>>();
     459            3 :         for (keyspace, lsn) in gaps {
     460            2 :             self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     461              : 
     462            5 :             for gap_range in keyspace.ranges {
     463            3 :                 let mut key = gap_range.start;
     464           13 :                 while key != gap_range.end {
     465           10 :                     let relative_off = self.raw.len() as u64;
     466           10 : 
     467           10 :                     // TODO(vlad): Can we be cheeky and write only one zero image, and
     468           10 :                     // make all index entries requiring a zero page point to it?
     469           10 :                     // Alternatively, we can change the index entry format to represent zero pages
     470           10 :                     // without writing them at all.
     471           10 :                     Value::Image(ZERO_PAGE.clone())
     472           10 :                         .ser_into(&mut self.raw)
     473           10 :                         .unwrap();
     474           10 :                     let val_ser_size = self.raw.len() - relative_off as usize;
     475           10 : 
     476           10 :                     ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
     477           10 :                         SerializedValueMeta {
     478           10 :                             key: key.to_compact(),
     479           10 :                             lsn,
     480           10 :                             batch_offset: relative_off,
     481           10 :                             len: val_ser_size,
     482           10 :                             will_init: true,
     483           10 :                         },
     484           10 :                     )));
     485           10 : 
     486           10 :                     self.len += 1;
     487           10 : 
     488           10 :                     key = key.next();
     489           10 :                 }
     490              :             }
     491              :         }
     492              : 
     493           19 :         self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
     494            1 : 
     495            1 :         if cfg!(any(debug_assertions, test)) {
     496            1 :             self.validate_lsn_order();
     497            1 :         }
     498            1 :     }
     499              : 
     500              :     /// Checks if the batch contains any serialized or observed values
     501       293295 :     pub fn is_empty(&self) -> bool {
     502       293295 :         !self.has_data() && self.metadata.is_empty()
     503       293295 :     }
     504              : 
     505              :     /// Checks if the batch contains only observed values
     506            0 :     pub fn is_observed(&self) -> bool {
     507            0 :         !self.has_data() && !self.metadata.is_empty()
     508            0 :     }
     509              : 
     510              :     /// Checks if the batch contains data
     511              :     ///
     512              :     /// Note that if this returns false, it may still contain observed values or
     513              :     /// a metadata record.
     514     10193423 :     pub fn has_data(&self) -> bool {
     515     10193423 :         let empty = self.raw.is_empty();
     516     10193423 : 
     517     10193423 :         if cfg!(debug_assertions) && empty {
     518       293698 :             assert!(self
     519       293698 :                 .metadata
     520       293698 :                 .iter()
     521       293698 :                 .all(|meta| matches!(meta, ValueMeta::Observed(_))));
     522      9899725 :         }
     523              : 
     524     10193423 :         !empty
     525     10193423 :     }
     526              : 
     527              :     /// Returns the number of values serialized in the batch
     528       291306 :     pub fn len(&self) -> usize {
     529       291306 :         self.len
     530       291306 :     }
     531              : 
     532              :     /// Returns the size of the buffer wrapped by the batch
     533      9608470 :     pub fn buffer_size(&self) -> usize {
     534      9608470 :         self.raw.len()
     535      9608470 :     }
     536              : 
     537            0 :     pub fn updates_key(&self, key: &Key) -> bool {
     538            0 :         self.metadata.iter().any(|meta| match meta {
     539            0 :             ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
     540            0 :             ValueMeta::Observed(_) => false,
     541            0 :         })
     542            0 :     }
     543              : 
     544     10705338 :     pub fn validate_lsn_order(&self) {
     545              :         use std::collections::HashMap;
     546              : 
     547     10705338 :         let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
     548              : 
     549     56800151 :         for meta in self.metadata.iter() {
     550     56800151 :             let lsn = meta.lsn();
     551     56800151 :             let key = meta.key();
     552              : 
     553     56800151 :             if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
     554           15 :                 assert!(
     555           15 :                     lsn >= prev_lsn,
     556            0 :                     "Ordering violated by {}: {} < {}",
     557            0 :                     Key::from_compact(key),
     558              :                     lsn,
     559              :                     prev_lsn
     560              :                 );
     561     56800136 :             }
     562              :         }
     563     10705338 :     }
     564              : }
     565              : 
     566              : #[cfg(all(test, feature = "testing"))]
     567              : mod tests {
     568              :     use super::*;
     569              : 
     570            6 :     fn validate_batch(
     571            6 :         batch: &SerializedValueBatch,
     572            6 :         values: &[(CompactKey, Lsn, usize, Value)],
     573            6 :         gaps: Option<&Vec<(KeySpace, Lsn)>>,
     574            6 :     ) {
     575              :         // Invariant 1: The metadata for a given entry in the batch
     576              :         // is correct and can be used to deserialize back to the original value.
     577           28 :         for (key, lsn, size, value) in values.iter() {
     578           28 :             let meta = batch
     579           28 :                 .metadata
     580           28 :                 .iter()
     581          136 :                 .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
     582           28 :                 .unwrap();
     583           28 :             let meta = match meta {
     584           28 :                 ValueMeta::Serialized(ser) => ser,
     585            0 :                 ValueMeta::Observed(_) => unreachable!(),
     586              :             };
     587              : 
     588           28 :             assert_eq!(meta.len, *size);
     589           28 :             assert_eq!(meta.will_init, value.will_init());
     590              : 
     591           28 :             let start = meta.batch_offset as usize;
     592           28 :             let end = meta.batch_offset as usize + meta.len;
     593           28 :             let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     594           28 :             assert_eq!(&value_from_batch, value);
     595              :         }
     596              : 
     597           28 :         let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
     598            6 :         let mut gap_pages_count: usize = 0;
     599              : 
     600              :         // Invariant 2: Zero pages were added for identified gaps and their metadata
     601              :         // is correct.
     602            6 :         if let Some(gaps) = gaps {
     603            3 :             for (gap_keyspace, lsn) in gaps {
     604            5 :                 for gap_range in &gap_keyspace.ranges {
     605            3 :                     let mut gap_key = gap_range.start;
     606           13 :                     while gap_key != gap_range.end {
     607           10 :                         let meta = batch
     608           10 :                             .metadata
     609           10 :                             .iter()
     610          104 :                             .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
     611           10 :                             .unwrap();
     612           10 :                         let meta = match meta {
     613           10 :                             ValueMeta::Serialized(ser) => ser,
     614            0 :                             ValueMeta::Observed(_) => unreachable!(),
     615              :                         };
     616              : 
     617           10 :                         let zero_value = Value::Image(ZERO_PAGE.clone());
     618           10 :                         let zero_value_size = zero_value.serialized_size().unwrap() as usize;
     619           10 : 
     620           10 :                         assert_eq!(meta.len, zero_value_size);
     621           10 :                         assert_eq!(meta.will_init, zero_value.will_init());
     622              : 
     623           10 :                         let start = meta.batch_offset as usize;
     624           10 :                         let end = meta.batch_offset as usize + meta.len;
     625           10 :                         let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     626           10 :                         assert_eq!(value_from_batch, zero_value);
     627              : 
     628           10 :                         gap_pages_count += 1;
     629           10 :                         expected_buffer_size += zero_value_size;
     630           10 :                         gap_key = gap_key.next();
     631              :                     }
     632              :                 }
     633              :             }
     634            5 :         }
     635              : 
     636              :         // Invariant 3: The length of the batch is equal to the number
     637              :         // of values inserted, plus the number of gap pages. This extends
     638              :         // to the raw buffer size.
     639            6 :         assert_eq!(batch.len(), values.len() + gap_pages_count);
     640            6 :         assert_eq!(expected_buffer_size, batch.buffer_size());
     641              : 
     642              :         // Invariant 4: Metadata entries for any given key are sorted in LSN order.
     643            6 :         batch.validate_lsn_order();
     644            6 :     }
     645              : 
     646              :     #[test]
     647            1 :     fn test_creation_from_values() {
     648              :         const LSN: Lsn = Lsn(0x10);
     649            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     650            1 : 
     651            1 :         let values = vec![
     652            1 :             (
     653            1 :                 key.to_compact(),
     654            1 :                 LSN,
     655            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     656            1 :             ),
     657            1 :             (
     658            1 :                 key.next().to_compact(),
     659            1 :                 LSN,
     660            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     661            1 :             ),
     662            1 :             (
     663            1 :                 key.to_compact(),
     664            1 :                 Lsn(LSN.0 + 0x10),
     665            1 :                 Value::WalRecord(NeonWalRecord::wal_append("baz")),
     666            1 :             ),
     667            1 :             (
     668            1 :                 key.next().next().to_compact(),
     669            1 :                 LSN,
     670            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     671            1 :             ),
     672            1 :         ];
     673            1 : 
     674            1 :         let values = values
     675            1 :             .into_iter()
     676            4 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     677            1 :             .collect::<Vec<_>>();
     678            1 :         let batch = SerializedValueBatch::from_values(values.clone());
     679            1 : 
     680            1 :         validate_batch(&batch, &values, None);
     681            1 : 
     682            1 :         assert!(!batch.is_empty());
     683            1 :     }
     684              : 
     685              :     #[test]
     686            1 :     fn test_put() {
     687              :         const LSN: Lsn = Lsn(0x10);
     688            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     689            1 : 
     690            1 :         let values = vec![
     691            1 :             (
     692            1 :                 key.to_compact(),
     693            1 :                 LSN,
     694            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     695            1 :             ),
     696            1 :             (
     697            1 :                 key.next().to_compact(),
     698            1 :                 LSN,
     699            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     700            1 :             ),
     701            1 :         ];
     702            1 : 
     703            1 :         let mut values = values
     704            1 :             .into_iter()
     705            2 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     706            1 :             .collect::<Vec<_>>();
     707            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     708            1 : 
     709            1 :         validate_batch(&batch, &values, None);
     710            1 : 
     711            1 :         let value = (
     712            1 :             key.to_compact(),
     713            1 :             Lsn(LSN.0 + 0x10),
     714            1 :             Value::WalRecord(NeonWalRecord::wal_append("baz")),
     715            1 :         );
     716            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     717            1 :         let value = (value.0, value.1, serialized_size, value.2);
     718            1 :         values.push(value.clone());
     719            1 :         batch.put(value.0, value.3, value.1);
     720            1 : 
     721            1 :         validate_batch(&batch, &values, None);
     722            1 : 
     723            1 :         let value = (
     724            1 :             key.next().next().to_compact(),
     725            1 :             LSN,
     726            1 :             Value::WalRecord(NeonWalRecord::wal_append("taz")),
     727            1 :         );
     728            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     729            1 :         let value = (value.0, value.1, serialized_size, value.2);
     730            1 :         values.push(value.clone());
     731            1 :         batch.put(value.0, value.3, value.1);
     732            1 : 
     733            1 :         validate_batch(&batch, &values, None);
     734            1 :     }
     735              : 
     736              :     #[test]
     737            1 :     fn test_extension() {
     738              :         const LSN: Lsn = Lsn(0x10);
     739            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     740            1 : 
     741            1 :         let values = vec![
     742            1 :             (
     743            1 :                 key.to_compact(),
     744            1 :                 LSN,
     745            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     746            1 :             ),
     747            1 :             (
     748            1 :                 key.next().to_compact(),
     749            1 :                 LSN,
     750            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     751            1 :             ),
     752            1 :             (
     753            1 :                 key.next().next().to_compact(),
     754            1 :                 LSN,
     755            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     756            1 :             ),
     757            1 :         ];
     758            1 : 
     759            1 :         let mut values = values
     760            1 :             .into_iter()
     761            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     762            1 :             .collect::<Vec<_>>();
     763            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     764            1 : 
     765            1 :         let other_values = vec![
     766            1 :             (
     767            1 :                 key.to_compact(),
     768            1 :                 Lsn(LSN.0 + 0x10),
     769            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     770            1 :             ),
     771            1 :             (
     772            1 :                 key.next().to_compact(),
     773            1 :                 Lsn(LSN.0 + 0x10),
     774            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     775            1 :             ),
     776            1 :             (
     777            1 :                 key.next().next().to_compact(),
     778            1 :                 Lsn(LSN.0 + 0x10),
     779            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     780            1 :             ),
     781            1 :         ];
     782            1 : 
     783            1 :         let other_values = other_values
     784            1 :             .into_iter()
     785            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     786            1 :             .collect::<Vec<_>>();
     787            1 :         let other_batch = SerializedValueBatch::from_values(other_values.clone());
     788            1 : 
     789            1 :         values.extend(other_values);
     790            1 :         batch.extend(other_batch);
     791            1 : 
     792            1 :         validate_batch(&batch, &values, None);
     793            1 :     }
     794              : 
     795              :     #[test]
     796            1 :     fn test_gap_zeroing() {
     797              :         const LSN: Lsn = Lsn(0x10);
     798            1 :         let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     799            1 : 
     800            1 :         let rel_bar_base_key = {
     801            1 :             let mut key = rel_foo_base_key;
     802            1 :             key.field4 += 1;
     803            1 :             key
     804            1 :         };
     805            1 : 
     806            1 :         let values = vec![
     807            1 :             (
     808            1 :                 rel_foo_base_key.to_compact(),
     809            1 :                 LSN,
     810            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo1")),
     811            1 :             ),
     812            1 :             (
     813            1 :                 rel_foo_base_key.add(1).to_compact(),
     814            1 :                 LSN,
     815            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo2")),
     816            1 :             ),
     817            1 :             (
     818            1 :                 rel_foo_base_key.add(5).to_compact(),
     819            1 :                 LSN,
     820            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo3")),
     821            1 :             ),
     822            1 :             (
     823            1 :                 rel_foo_base_key.add(1).to_compact(),
     824            1 :                 Lsn(LSN.0 + 0x10),
     825            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo4")),
     826            1 :             ),
     827            1 :             (
     828            1 :                 rel_foo_base_key.add(10).to_compact(),
     829            1 :                 Lsn(LSN.0 + 0x10),
     830            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo5")),
     831            1 :             ),
     832            1 :             (
     833            1 :                 rel_foo_base_key.add(11).to_compact(),
     834            1 :                 Lsn(LSN.0 + 0x10),
     835            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo6")),
     836            1 :             ),
     837            1 :             (
     838            1 :                 rel_foo_base_key.add(12).to_compact(),
     839            1 :                 Lsn(LSN.0 + 0x10),
     840            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo7")),
     841            1 :             ),
     842            1 :             (
     843            1 :                 rel_bar_base_key.to_compact(),
     844            1 :                 LSN,
     845            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar1")),
     846            1 :             ),
     847            1 :             (
     848            1 :                 rel_bar_base_key.add(4).to_compact(),
     849            1 :                 LSN,
     850            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar2")),
     851            1 :             ),
     852            1 :         ];
     853            1 : 
     854            1 :         let values = values
     855            1 :             .into_iter()
     856            9 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     857            1 :             .collect::<Vec<_>>();
     858            1 : 
     859            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     860            1 : 
     861            1 :         let gaps = vec![
     862            1 :             (
     863            1 :                 KeySpace {
     864            1 :                     ranges: vec![
     865            1 :                         rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
     866            1 :                         rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
     867            1 :                     ],
     868            1 :                 },
     869            1 :                 LSN,
     870            1 :             ),
     871            1 :             (
     872            1 :                 KeySpace {
     873            1 :                     ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
     874            1 :                 },
     875            1 :                 Lsn(LSN.0 + 0x10),
     876            1 :             ),
     877            1 :         ];
     878            1 : 
     879            1 :         batch.zero_gaps(gaps.clone());
     880            1 :         validate_batch(&batch, &values, Some(&gaps));
     881            1 :     }
     882              : }
        

Generated by: LCOV version 2.1-beta