LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - serialized_batch.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 91.7 % 635 582
Test Date: 2024-11-25 17:48:16 Functions: 49.3 % 67 33

            Line data    Source code
       1              : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
       2              : //! instances. Each batch contains a raw buffer (serialized values)
       3              : //! and a list of metadata for each (key, LSN) tuple present in the batch.
       4              : //!
       5              : //! Such batches are created from decoded PG wal records and ingested
       6              : //! by the pageserver by writing directly to the ephemeral file.
       7              : 
       8              : use std::collections::BTreeSet;
       9              : 
      10              : use bytes::{Bytes, BytesMut};
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use pageserver_api::keyspace::KeySpace;
      13              : use pageserver_api::record::NeonWalRecord;
      14              : use pageserver_api::reltag::RelTag;
      15              : use pageserver_api::shard::ShardIdentity;
      16              : use pageserver_api::{key::CompactKey, value::Value};
      17              : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
      18              : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
      19              : use serde::{Deserialize, Serialize};
      20              : use utils::bin_ser::BeSer;
      21              : use utils::lsn::Lsn;
      22              : 
      23              : use pageserver_api::key::Key;
      24              : 
      25              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
      26              : 
      27              : /// Accompanying metadata for the batch
      28              : /// A value may be serialized and stored into the batch or just "observed".
      29              : /// Shard 0 currently "observes" all values in order to accurately track
      30              : /// relation sizes. In the case of "observed" values, we only need to know
      31              : /// the key and LSN, so two types of metadata are supported to save on network
      32              : /// bandwidth.
      33            0 : #[derive(Serialize, Deserialize)]
      34              : pub enum ValueMeta {
      35              :     Serialized(SerializedValueMeta),
      36              :     Observed(ObservedValueMeta),
      37              : }
      38              : 
      39              : impl ValueMeta {
      40     28545945 :     pub fn key(&self) -> CompactKey {
      41     28545945 :         match self {
      42     28545945 :             Self::Serialized(ser) => ser.key,
      43            0 :             Self::Observed(obs) => obs.key,
      44              :         }
      45     28545945 :     }
      46              : 
      47     28400303 :     pub fn lsn(&self) -> Lsn {
      48     28400303 :         match self {
      49     28400303 :             Self::Serialized(ser) => ser.lsn,
      50            0 :             Self::Observed(obs) => obs.lsn,
      51              :         }
      52     28400303 :     }
      53              : }
      54              : 
      55              : /// Wrapper around [`ValueMeta`] that implements ordering by
      56              : /// (key, LSN) tuples
      57              : struct OrderedValueMeta(ValueMeta);
      58              : 
      59              : impl Ord for OrderedValueMeta {
      60           59 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      61           59 :         (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
      62           59 :     }
      63              : }
      64              : 
      65              : impl PartialOrd for OrderedValueMeta {
      66            9 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      67            9 :         Some(self.cmp(other))
      68            9 :     }
      69              : }
      70              : 
      71              : impl PartialEq for OrderedValueMeta {
      72            8 :     fn eq(&self, other: &Self) -> bool {
      73            8 :         (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
      74            8 :     }
      75              : }
      76              : 
      77              : impl Eq for OrderedValueMeta {}
      78              : 
      79              : /// Metadata for a [`Value`] serialized into the batch.
      80            0 : #[derive(Serialize, Deserialize)]
      81              : pub struct SerializedValueMeta {
      82              :     pub key: CompactKey,
      83              :     pub lsn: Lsn,
      84              :     /// Starting offset of the value for the (key, LSN) tuple
      85              :     /// in [`SerializedValueBatch::raw`]
      86              :     pub batch_offset: u64,
      87              :     pub len: usize,
      88              :     pub will_init: bool,
      89              : }
      90              : 
      91              : /// Metadata for a [`Value`] observed by the batch
      92            0 : #[derive(Serialize, Deserialize)]
      93              : pub struct ObservedValueMeta {
      94              :     pub key: CompactKey,
      95              :     pub lsn: Lsn,
      96              : }
      97              : 
      98              : /// Batch of serialized [`Value`]s.
      99            0 : #[derive(Serialize, Deserialize)]
     100              : pub struct SerializedValueBatch {
     101              :     /// [`Value`]s serialized in EphemeralFile's native format,
     102              :     /// ready for disk write by the pageserver
     103              :     pub raw: Vec<u8>,
     104              : 
     105              :     /// Metadata to make sense of the bytes in [`Self::raw`]
     106              :     /// and represent "observed" values.
     107              :     ///
     108              :     /// Invariant: Metadata entries for any given key are ordered
     109              :     /// by LSN. Note that entries for a key do not have to be contiguous.
     110              :     pub metadata: Vec<ValueMeta>,
     111              : 
     112              :     /// The highest LSN of any value in the batch
     113              :     pub max_lsn: Lsn,
     114              : 
     115              :     /// Number of values encoded by [`Self::raw`]
     116              :     pub len: usize,
     117              : }
     118              : 
     119              : impl Default for SerializedValueBatch {
     120       262188 :     fn default() -> Self {
     121       262188 :         Self {
     122       262188 :             raw: Default::default(),
     123       262188 :             metadata: Default::default(),
     124       262188 :             max_lsn: Lsn(0),
     125       262188 :             len: 0,
     126       262188 :         }
     127       262188 :     }
     128              : }
     129              : 
     130              : impl SerializedValueBatch {
     131              :     /// Build a batch of serialized values from a decoded PG WAL record
     132              :     ///
     133              :     /// The batch will only contain values for keys targeting the specifiec
     134              :     /// shard. Shard 0 is a special case, where any keys that don't belong to
     135              :     /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
     136              :     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     137       145852 :     pub(crate) fn from_decoded_filtered(
     138       145852 :         decoded: DecodedWALRecord,
     139       145852 :         shard: &ShardIdentity,
     140       145852 :         next_record_lsn: Lsn,
     141       145852 :         pg_version: u32,
     142       145852 :     ) -> anyhow::Result<SerializedValueBatch> {
     143       145852 :         // First determine how big the buffer needs to be and allocate it up-front.
     144       145852 :         // This duplicates some of the work below, but it's empirically much faster.
     145       145852 :         let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
     146       145852 :         let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
     147       145852 : 
     148       145852 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
     149       145852 :         let mut max_lsn: Lsn = Lsn(0);
     150       145852 :         let mut len: usize = 0;
     151       145852 :         for blk in decoded.blocks.iter() {
     152       145642 :             let relative_off = buf.len() as u64;
     153       145642 : 
     154       145642 :             let rel = RelTag {
     155       145642 :                 spcnode: blk.rnode_spcnode,
     156       145642 :                 dbnode: blk.rnode_dbnode,
     157       145642 :                 relnode: blk.rnode_relnode,
     158       145642 :                 forknum: blk.forknum,
     159       145642 :             };
     160       145642 : 
     161       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     162       145642 : 
     163       145642 :             if !key.is_valid_key_on_write_path() {
     164            0 :                 anyhow::bail!(
     165            0 :                     "Unsupported key decoded at LSN {}: {}",
     166            0 :                     next_record_lsn,
     167            0 :                     key
     168            0 :                 );
     169       145642 :             }
     170       145642 : 
     171       145642 :             let key_is_local = shard.is_key_local(&key);
     172       145642 : 
     173       145642 :             tracing::debug!(
     174              :                 lsn=%next_record_lsn,
     175              :                 key=%key,
     176            0 :                 "ingest: shard decision {}",
     177            0 :                 if !key_is_local { "drop" } else { "keep" },
     178              :             );
     179              : 
     180       145642 :             if !key_is_local {
     181            0 :                 if shard.is_shard_zero() {
     182              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     183              :                     // its blkno in case it implicitly extends a relation.
     184            0 :                     metadata.push(ValueMeta::Observed(ObservedValueMeta {
     185            0 :                         key: key.to_compact(),
     186            0 :                         lsn: next_record_lsn,
     187            0 :                     }))
     188            0 :                 }
     189              : 
     190            0 :                 continue;
     191       145642 :             }
     192              : 
     193              :             // Instead of storing full-page-image WAL record,
     194              :             // it is better to store extracted image: we can skip wal-redo
     195              :             // in this case. Also some FPI records may contain multiple (up to 32) pages,
     196              :             // so them have to be copied multiple times.
     197              :             //
     198       145642 :             let val = if Self::block_is_image(&decoded, blk, pg_version) {
     199              :                 // Extract page image from FPI record
     200           24 :                 let img_len = blk.bimg_len as usize;
     201           24 :                 let img_offs = blk.bimg_offset as usize;
     202           24 :                 let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     203           24 :                 // TODO(vlad): skip the copy
     204           24 :                 image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     205           24 : 
     206           24 :                 if blk.hole_length != 0 {
     207            0 :                     let tail = image.split_off(blk.hole_offset as usize);
     208            0 :                     image.resize(image.len() + blk.hole_length as usize, 0u8);
     209            0 :                     image.unsplit(tail);
     210           24 :                 }
     211              :                 //
     212              :                 // Match the logic of XLogReadBufferForRedoExtended:
     213              :                 // The page may be uninitialized. If so, we can't set the LSN because
     214              :                 // that would corrupt the page.
     215              :                 //
     216           24 :                 if !page_is_new(&image) {
     217           18 :                     page_set_lsn(&mut image, next_record_lsn)
     218            6 :                 }
     219           24 :                 assert_eq!(image.len(), BLCKSZ as usize);
     220              : 
     221           24 :                 Value::Image(image.freeze())
     222              :             } else {
     223              :                 Value::WalRecord(NeonWalRecord::Postgres {
     224       145618 :                     will_init: blk.will_init || blk.apply_image,
     225       145618 :                     rec: decoded.record.clone(),
     226              :                 })
     227              :             };
     228              : 
     229       145642 :             val.ser_into(&mut buf)
     230       145642 :                 .expect("Writing into in-memory buffer is infallible");
     231       145642 : 
     232       145642 :             let val_ser_size = buf.len() - relative_off as usize;
     233       145642 : 
     234       145642 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     235       145642 :                 key: key.to_compact(),
     236       145642 :                 lsn: next_record_lsn,
     237       145642 :                 batch_offset: relative_off,
     238       145642 :                 len: val_ser_size,
     239       145642 :                 will_init: val.will_init(),
     240       145642 :             }));
     241       145642 :             max_lsn = std::cmp::max(max_lsn, next_record_lsn);
     242       145642 :             len += 1;
     243              :         }
     244              : 
     245       145852 :         if cfg!(any(debug_assertions, test)) {
     246       145852 :             let batch = Self {
     247       145852 :                 raw: buf,
     248       145852 :                 metadata,
     249       145852 :                 max_lsn,
     250       145852 :                 len,
     251       145852 :             };
     252       145852 : 
     253       145852 :             batch.validate_lsn_order();
     254       145852 : 
     255       145852 :             return Ok(batch);
     256            0 :         }
     257            0 : 
     258            0 :         Ok(Self {
     259            0 :             raw: buf,
     260            0 :             metadata,
     261            0 :             max_lsn,
     262            0 :             len,
     263            0 :         })
     264       145852 :     }
     265              : 
     266              :     /// Look into the decoded PG WAL record and determine
     267              :     /// roughly how large the buffer for serialized values needs to be.
     268       145852 :     fn estimate_buffer_size(
     269       145852 :         decoded: &DecodedWALRecord,
     270       145852 :         shard: &ShardIdentity,
     271       145852 :         pg_version: u32,
     272       145852 :     ) -> usize {
     273       145852 :         let mut estimate: usize = 0;
     274              : 
     275       145852 :         for blk in decoded.blocks.iter() {
     276       145642 :             let rel = RelTag {
     277       145642 :                 spcnode: blk.rnode_spcnode,
     278       145642 :                 dbnode: blk.rnode_dbnode,
     279       145642 :                 relnode: blk.rnode_relnode,
     280       145642 :                 forknum: blk.forknum,
     281       145642 :             };
     282       145642 : 
     283       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     284       145642 : 
     285       145642 :             if !shard.is_key_local(&key) {
     286            0 :                 continue;
     287       145642 :             }
     288       145642 : 
     289       145642 :             if Self::block_is_image(decoded, blk, pg_version) {
     290           24 :                 // 4 bytes for the Value::Image discriminator
     291           24 :                 // 8 bytes for encoding the size of the buffer
     292           24 :                 // BLCKSZ for the raw image
     293           24 :                 estimate += (4 + 8 + BLCKSZ) as usize;
     294       145618 :             } else {
     295       145618 :                 // 4 bytes for the Value::WalRecord discriminator
     296       145618 :                 // 4 bytes for the NeonWalRecord::Postgres discriminator
     297       145618 :                 // 1 bytes for NeonWalRecord::Postgres::will_init
     298       145618 :                 // 8 bytes for encoding the size of the buffer
     299       145618 :                 // length of the raw record
     300       145618 :                 estimate += 8 + 1 + 8 + decoded.record.len();
     301       145618 :             }
     302              :         }
     303              : 
     304       145852 :         estimate
     305       145852 :     }
     306              : 
     307       291284 :     fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
     308       291284 :         blk.apply_image
     309          120 :             && blk.has_image
     310          120 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     311           48 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     312            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     313              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     314           48 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
     315              :             // do not materialize null pages because them most likely be soon replaced with real data
     316           48 :             && blk.bimg_len != 0
     317       291284 :     }
     318              : 
     319              :     /// Encode a list of values and metadata into a serialized batch
     320              :     ///
     321              :     /// This is used by the pageserver ingest code to conveniently generate
     322              :     /// batches for metadata writes.
     323      4660945 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
     324      4660945 :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     325      4660945 :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     326      4664001 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     327      4660945 :         let mut buf = Vec::<u8>::with_capacity(buffer_size);
     328      4660945 : 
     329      4660945 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
     330      4660945 :         let mut max_lsn: Lsn = Lsn(0);
     331      4660945 :         let len = batch.len();
     332      9324946 :         for (key, lsn, val_ser_size, val) in batch {
     333      4664001 :             let relative_off = buf.len() as u64;
     334      4664001 : 
     335      4664001 :             val.ser_into(&mut buf)
     336      4664001 :                 .expect("Writing into in-memory buffer is infallible");
     337      4664001 : 
     338      4664001 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     339      4664001 :                 key,
     340      4664001 :                 lsn,
     341      4664001 :                 batch_offset: relative_off,
     342      4664001 :                 len: val_ser_size,
     343      4664001 :                 will_init: val.will_init(),
     344      4664001 :             }));
     345      4664001 :             max_lsn = std::cmp::max(max_lsn, lsn);
     346      4664001 :         }
     347              : 
     348              :         // Assert that we didn't do any extra allocations while building buffer.
     349      4660945 :         debug_assert!(buf.len() <= buffer_size);
     350              : 
     351      4660945 :         if cfg!(any(debug_assertions, test)) {
     352      4660945 :             let batch = Self {
     353      4660945 :                 raw: buf,
     354      4660945 :                 metadata,
     355      4660945 :                 max_lsn,
     356      4660945 :                 len,
     357      4660945 :             };
     358      4660945 : 
     359      4660945 :             batch.validate_lsn_order();
     360      4660945 : 
     361      4660945 :             return batch;
     362            0 :         }
     363            0 : 
     364            0 :         Self {
     365            0 :             raw: buf,
     366            0 :             metadata,
     367            0 :             max_lsn,
     368            0 :             len,
     369            0 :         }
     370      4660945 :     }
     371              : 
     372              :     /// Add one value to the batch
     373              :     ///
     374              :     /// This is used by the pageserver ingest code to include metadata block
     375              :     /// updates for a single key.
     376       280868 :     pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
     377       280868 :         let relative_off = self.raw.len() as u64;
     378       280868 :         value.ser_into(&mut self.raw).unwrap();
     379       280868 : 
     380       280868 :         let val_ser_size = self.raw.len() - relative_off as usize;
     381       280868 :         self.metadata
     382       280868 :             .push(ValueMeta::Serialized(SerializedValueMeta {
     383       280868 :                 key,
     384       280868 :                 lsn,
     385       280868 :                 batch_offset: relative_off,
     386       280868 :                 len: val_ser_size,
     387       280868 :                 will_init: value.will_init(),
     388       280868 :             }));
     389       280868 : 
     390       280868 :         self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     391       280868 :         self.len += 1;
     392       280868 : 
     393       280868 :         if cfg!(any(debug_assertions, test)) {
     394       280868 :             self.validate_lsn_order();
     395       280868 :         }
     396       280868 :     }
     397              : 
     398              :     /// Extend with the contents of another batch
     399              :     ///
     400              :     /// One batch is generated for each decoded PG WAL record.
     401              :     /// They are then merged to accumulate reasonably sized writes.
     402       264577 :     pub fn extend(&mut self, mut other: SerializedValueBatch) {
     403       264577 :         let extend_batch_start_offset = self.raw.len() as u64;
     404       264577 : 
     405       264577 :         self.raw.extend(other.raw);
     406       264577 : 
     407       264577 :         // Shift the offsets in the batch we are extending with
     408       266509 :         other.metadata.iter_mut().for_each(|meta| match meta {
     409       266509 :             ValueMeta::Serialized(ser) => {
     410       266509 :                 ser.batch_offset += extend_batch_start_offset;
     411       266509 :                 if cfg!(debug_assertions) {
     412       266509 :                     let value_end = ser.batch_offset + ser.len as u64;
     413       266509 :                     assert!((value_end as usize) <= self.raw.len());
     414            0 :                 }
     415              :             }
     416            0 :             ValueMeta::Observed(_) => {}
     417       266509 :         });
     418       264577 :         self.metadata.extend(other.metadata);
     419       264577 : 
     420       264577 :         self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
     421       264577 : 
     422       264577 :         self.len += other.len;
     423       264577 : 
     424       264577 :         if cfg!(any(debug_assertions, test)) {
     425       264577 :             self.validate_lsn_order();
     426       264577 :         }
     427       264577 :     }
     428              : 
     429              :     /// Add zero images for the (key, LSN) tuples specified
     430              :     ///
     431              :     /// PG versions below 16 do not zero out pages before extending
     432              :     /// a relation and may leave gaps. Such gaps need to be identified
     433              :     /// by the pageserver ingest logic and get patched up here.
     434              :     ///
     435              :     /// Note that this function does not validate that the gaps have been
     436              :     /// identified correctly (it does not know relation sizes), so it's up
     437              :     /// to the call-site to do it properly.
     438            1 :     pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
     439            1 :         // Implementation note:
     440            1 :         //
     441            1 :         // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
     442            1 :         // but the metadata entries should be ordered properly (see
     443            1 :         // [`SerializedValueBatch::metadata`]).
     444            1 :         //
     445            1 :         // Exploiting this observation we do:
     446            1 :         // 1. Drain all the metadata entries into an ordered set.
     447            1 :         // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
     448            1 :         // includes more than one update to the same block in the same WAL record.
     449            1 :         // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
     450            1 :         // and add an index entry to the ordered metadata set.
     451            1 :         // 3. Drain the ordered set back into a metadata vector
     452            1 : 
     453            1 :         let mut ordered_metas = self
     454            1 :             .metadata
     455            1 :             .drain(..)
     456            1 :             .map(OrderedValueMeta)
     457            1 :             .collect::<BTreeSet<_>>();
     458            3 :         for (keyspace, lsn) in gaps {
     459            2 :             self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     460              : 
     461            5 :             for gap_range in keyspace.ranges {
     462            3 :                 let mut key = gap_range.start;
     463           13 :                 while key != gap_range.end {
     464           10 :                     let relative_off = self.raw.len() as u64;
     465           10 : 
     466           10 :                     // TODO(vlad): Can we be cheeky and write only one zero image, and
     467           10 :                     // make all index entries requiring a zero page point to it?
     468           10 :                     // Alternatively, we can change the index entry format to represent zero pages
     469           10 :                     // without writing them at all.
     470           10 :                     Value::Image(ZERO_PAGE.clone())
     471           10 :                         .ser_into(&mut self.raw)
     472           10 :                         .unwrap();
     473           10 :                     let val_ser_size = self.raw.len() - relative_off as usize;
     474           10 : 
     475           10 :                     ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
     476           10 :                         SerializedValueMeta {
     477           10 :                             key: key.to_compact(),
     478           10 :                             lsn,
     479           10 :                             batch_offset: relative_off,
     480           10 :                             len: val_ser_size,
     481           10 :                             will_init: true,
     482           10 :                         },
     483           10 :                     )));
     484           10 : 
     485           10 :                     self.len += 1;
     486           10 : 
     487           10 :                     key = key.next();
     488           10 :                 }
     489              :             }
     490              :         }
     491              : 
     492           19 :         self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
     493            1 : 
     494            1 :         if cfg!(any(debug_assertions, test)) {
     495            1 :             self.validate_lsn_order();
     496            1 :         }
     497            1 :     }
     498              : 
     499              :     /// Checks if the batch is empty
     500              :     ///
     501              :     /// A batch is empty when it contains no serialized values.
     502              :     /// Note that it may still contain observed values.
     503      4950035 :     pub fn is_empty(&self) -> bool {
     504      4950035 :         let empty = self.raw.is_empty();
     505      4950035 : 
     506      4950035 :         if cfg!(debug_assertions) && empty {
     507          202 :             assert!(self
     508          202 :                 .metadata
     509          202 :                 .iter()
     510          202 :                 .all(|meta| matches!(meta, ValueMeta::Observed(_))));
     511      4949833 :         }
     512              : 
     513      4950035 :         empty
     514      4950035 :     }
     515              : 
     516              :     /// Returns the number of values serialized in the batch
     517       145656 :     pub fn len(&self) -> usize {
     518       145656 :         self.len
     519       145656 :     }
     520              : 
     521              :     /// Returns the size of the buffer wrapped by the batch
     522      4804208 :     pub fn buffer_size(&self) -> usize {
     523      4804208 :         self.raw.len()
     524      4804208 :     }
     525              : 
     526            0 :     pub fn updates_key(&self, key: &Key) -> bool {
     527            0 :         self.metadata.iter().any(|meta| match meta {
     528            0 :             ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
     529            0 :             ValueMeta::Observed(_) => false,
     530            0 :         })
     531            0 :     }
     532              : 
     533      5352249 :     pub fn validate_lsn_order(&self) {
     534              :         use std::collections::HashMap;
     535              : 
     536      5352249 :         let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
     537              : 
     538     28399929 :         for meta in self.metadata.iter() {
     539     28399929 :             let lsn = meta.lsn();
     540     28399929 :             let key = meta.key();
     541              : 
     542     28399929 :             if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
     543           15 :                 assert!(
     544           15 :                     lsn >= prev_lsn,
     545            0 :                     "Ordering violated by {}: {} < {}",
     546            0 :                     Key::from_compact(key),
     547              :                     lsn,
     548              :                     prev_lsn
     549              :                 );
     550     28399914 :             }
     551              :         }
     552      5352249 :     }
     553              : }
     554              : 
     555              : #[cfg(all(test, feature = "testing"))]
     556              : mod tests {
     557              :     use super::*;
     558              : 
     559            6 :     fn validate_batch(
     560            6 :         batch: &SerializedValueBatch,
     561            6 :         values: &[(CompactKey, Lsn, usize, Value)],
     562            6 :         gaps: Option<&Vec<(KeySpace, Lsn)>>,
     563            6 :     ) {
     564              :         // Invariant 1: The metadata for a given entry in the batch
     565              :         // is correct and can be used to deserialize back to the original value.
     566           28 :         for (key, lsn, size, value) in values.iter() {
     567           28 :             let meta = batch
     568           28 :                 .metadata
     569           28 :                 .iter()
     570          136 :                 .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
     571           28 :                 .unwrap();
     572           28 :             let meta = match meta {
     573           28 :                 ValueMeta::Serialized(ser) => ser,
     574            0 :                 ValueMeta::Observed(_) => unreachable!(),
     575              :             };
     576              : 
     577           28 :             assert_eq!(meta.len, *size);
     578           28 :             assert_eq!(meta.will_init, value.will_init());
     579              : 
     580           28 :             let start = meta.batch_offset as usize;
     581           28 :             let end = meta.batch_offset as usize + meta.len;
     582           28 :             let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     583           28 :             assert_eq!(&value_from_batch, value);
     584              :         }
     585              : 
     586           28 :         let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
     587            6 :         let mut gap_pages_count: usize = 0;
     588              : 
     589              :         // Invariant 2: Zero pages were added for identified gaps and their metadata
     590              :         // is correct.
     591            6 :         if let Some(gaps) = gaps {
     592            3 :             for (gap_keyspace, lsn) in gaps {
     593            5 :                 for gap_range in &gap_keyspace.ranges {
     594            3 :                     let mut gap_key = gap_range.start;
     595           13 :                     while gap_key != gap_range.end {
     596           10 :                         let meta = batch
     597           10 :                             .metadata
     598           10 :                             .iter()
     599          104 :                             .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
     600           10 :                             .unwrap();
     601           10 :                         let meta = match meta {
     602           10 :                             ValueMeta::Serialized(ser) => ser,
     603            0 :                             ValueMeta::Observed(_) => unreachable!(),
     604              :                         };
     605              : 
     606           10 :                         let zero_value = Value::Image(ZERO_PAGE.clone());
     607           10 :                         let zero_value_size = zero_value.serialized_size().unwrap() as usize;
     608           10 : 
     609           10 :                         assert_eq!(meta.len, zero_value_size);
     610           10 :                         assert_eq!(meta.will_init, zero_value.will_init());
     611              : 
     612           10 :                         let start = meta.batch_offset as usize;
     613           10 :                         let end = meta.batch_offset as usize + meta.len;
     614           10 :                         let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     615           10 :                         assert_eq!(value_from_batch, zero_value);
     616              : 
     617           10 :                         gap_pages_count += 1;
     618           10 :                         expected_buffer_size += zero_value_size;
     619           10 :                         gap_key = gap_key.next();
     620              :                     }
     621              :                 }
     622              :             }
     623            5 :         }
     624              : 
     625              :         // Invariant 3: The length of the batch is equal to the number
     626              :         // of values inserted, plus the number of gap pages. This extends
     627              :         // to the raw buffer size.
     628            6 :         assert_eq!(batch.len(), values.len() + gap_pages_count);
     629            6 :         assert_eq!(expected_buffer_size, batch.buffer_size());
     630              : 
     631              :         // Invariant 4: Metadata entries for any given key are sorted in LSN order.
     632            6 :         batch.validate_lsn_order();
     633            6 :     }
     634              : 
     635              :     #[test]
     636            1 :     fn test_creation_from_values() {
     637              :         const LSN: Lsn = Lsn(0x10);
     638            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     639            1 : 
     640            1 :         let values = vec![
     641            1 :             (
     642            1 :                 key.to_compact(),
     643            1 :                 LSN,
     644            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     645            1 :             ),
     646            1 :             (
     647            1 :                 key.next().to_compact(),
     648            1 :                 LSN,
     649            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     650            1 :             ),
     651            1 :             (
     652            1 :                 key.to_compact(),
     653            1 :                 Lsn(LSN.0 + 0x10),
     654            1 :                 Value::WalRecord(NeonWalRecord::wal_append("baz")),
     655            1 :             ),
     656            1 :             (
     657            1 :                 key.next().next().to_compact(),
     658            1 :                 LSN,
     659            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     660            1 :             ),
     661            1 :         ];
     662            1 : 
     663            1 :         let values = values
     664            1 :             .into_iter()
     665            4 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     666            1 :             .collect::<Vec<_>>();
     667            1 :         let batch = SerializedValueBatch::from_values(values.clone());
     668            1 : 
     669            1 :         validate_batch(&batch, &values, None);
     670            1 : 
     671            1 :         assert!(!batch.is_empty());
     672            1 :     }
     673              : 
     674              :     #[test]
     675            1 :     fn test_put() {
     676              :         const LSN: Lsn = Lsn(0x10);
     677            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     678            1 : 
     679            1 :         let values = vec![
     680            1 :             (
     681            1 :                 key.to_compact(),
     682            1 :                 LSN,
     683            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     684            1 :             ),
     685            1 :             (
     686            1 :                 key.next().to_compact(),
     687            1 :                 LSN,
     688            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     689            1 :             ),
     690            1 :         ];
     691            1 : 
     692            1 :         let mut values = values
     693            1 :             .into_iter()
     694            2 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     695            1 :             .collect::<Vec<_>>();
     696            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     697            1 : 
     698            1 :         validate_batch(&batch, &values, None);
     699            1 : 
     700            1 :         let value = (
     701            1 :             key.to_compact(),
     702            1 :             Lsn(LSN.0 + 0x10),
     703            1 :             Value::WalRecord(NeonWalRecord::wal_append("baz")),
     704            1 :         );
     705            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     706            1 :         let value = (value.0, value.1, serialized_size, value.2);
     707            1 :         values.push(value.clone());
     708            1 :         batch.put(value.0, value.3, value.1);
     709            1 : 
     710            1 :         validate_batch(&batch, &values, None);
     711            1 : 
     712            1 :         let value = (
     713            1 :             key.next().next().to_compact(),
     714            1 :             LSN,
     715            1 :             Value::WalRecord(NeonWalRecord::wal_append("taz")),
     716            1 :         );
     717            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     718            1 :         let value = (value.0, value.1, serialized_size, value.2);
     719            1 :         values.push(value.clone());
     720            1 :         batch.put(value.0, value.3, value.1);
     721            1 : 
     722            1 :         validate_batch(&batch, &values, None);
     723            1 :     }
     724              : 
     725              :     #[test]
     726            1 :     fn test_extension() {
     727              :         const LSN: Lsn = Lsn(0x10);
     728            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     729            1 : 
     730            1 :         let values = vec![
     731            1 :             (
     732            1 :                 key.to_compact(),
     733            1 :                 LSN,
     734            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     735            1 :             ),
     736            1 :             (
     737            1 :                 key.next().to_compact(),
     738            1 :                 LSN,
     739            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     740            1 :             ),
     741            1 :             (
     742            1 :                 key.next().next().to_compact(),
     743            1 :                 LSN,
     744            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     745            1 :             ),
     746            1 :         ];
     747            1 : 
     748            1 :         let mut values = values
     749            1 :             .into_iter()
     750            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     751            1 :             .collect::<Vec<_>>();
     752            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     753            1 : 
     754            1 :         let other_values = vec![
     755            1 :             (
     756            1 :                 key.to_compact(),
     757            1 :                 Lsn(LSN.0 + 0x10),
     758            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     759            1 :             ),
     760            1 :             (
     761            1 :                 key.next().to_compact(),
     762            1 :                 Lsn(LSN.0 + 0x10),
     763            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     764            1 :             ),
     765            1 :             (
     766            1 :                 key.next().next().to_compact(),
     767            1 :                 Lsn(LSN.0 + 0x10),
     768            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     769            1 :             ),
     770            1 :         ];
     771            1 : 
     772            1 :         let other_values = other_values
     773            1 :             .into_iter()
     774            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     775            1 :             .collect::<Vec<_>>();
     776            1 :         let other_batch = SerializedValueBatch::from_values(other_values.clone());
     777            1 : 
     778            1 :         values.extend(other_values);
     779            1 :         batch.extend(other_batch);
     780            1 : 
     781            1 :         validate_batch(&batch, &values, None);
     782            1 :     }
     783              : 
     784              :     #[test]
     785            1 :     fn test_gap_zeroing() {
     786              :         const LSN: Lsn = Lsn(0x10);
     787            1 :         let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     788            1 : 
     789            1 :         let rel_bar_base_key = {
     790            1 :             let mut key = rel_foo_base_key;
     791            1 :             key.field4 += 1;
     792            1 :             key
     793            1 :         };
     794            1 : 
     795            1 :         let values = vec![
     796            1 :             (
     797            1 :                 rel_foo_base_key.to_compact(),
     798            1 :                 LSN,
     799            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo1")),
     800            1 :             ),
     801            1 :             (
     802            1 :                 rel_foo_base_key.add(1).to_compact(),
     803            1 :                 LSN,
     804            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo2")),
     805            1 :             ),
     806            1 :             (
     807            1 :                 rel_foo_base_key.add(5).to_compact(),
     808            1 :                 LSN,
     809            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo3")),
     810            1 :             ),
     811            1 :             (
     812            1 :                 rel_foo_base_key.add(1).to_compact(),
     813            1 :                 Lsn(LSN.0 + 0x10),
     814            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo4")),
     815            1 :             ),
     816            1 :             (
     817            1 :                 rel_foo_base_key.add(10).to_compact(),
     818            1 :                 Lsn(LSN.0 + 0x10),
     819            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo5")),
     820            1 :             ),
     821            1 :             (
     822            1 :                 rel_foo_base_key.add(11).to_compact(),
     823            1 :                 Lsn(LSN.0 + 0x10),
     824            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo6")),
     825            1 :             ),
     826            1 :             (
     827            1 :                 rel_foo_base_key.add(12).to_compact(),
     828            1 :                 Lsn(LSN.0 + 0x10),
     829            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo7")),
     830            1 :             ),
     831            1 :             (
     832            1 :                 rel_bar_base_key.to_compact(),
     833            1 :                 LSN,
     834            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar1")),
     835            1 :             ),
     836            1 :             (
     837            1 :                 rel_bar_base_key.add(4).to_compact(),
     838            1 :                 LSN,
     839            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar2")),
     840            1 :             ),
     841            1 :         ];
     842            1 : 
     843            1 :         let values = values
     844            1 :             .into_iter()
     845            9 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     846            1 :             .collect::<Vec<_>>();
     847            1 : 
     848            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     849            1 : 
     850            1 :         let gaps = vec![
     851            1 :             (
     852            1 :                 KeySpace {
     853            1 :                     ranges: vec![
     854            1 :                         rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
     855            1 :                         rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
     856            1 :                     ],
     857            1 :                 },
     858            1 :                 LSN,
     859            1 :             ),
     860            1 :             (
     861            1 :                 KeySpace {
     862            1 :                     ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
     863            1 :                 },
     864            1 :                 Lsn(LSN.0 + 0x10),
     865            1 :             ),
     866            1 :         ];
     867            1 : 
     868            1 :         batch.zero_gaps(gaps.clone());
     869            1 :         validate_batch(&batch, &values, Some(&gaps));
     870            1 :     }
     871              : }
        

Generated by: LCOV version 2.1-beta