|             Line data    Source code 
       1              : //! This module implements batch type for serialized [`crate::models::value::Value`]
       2              : //! instances. Each batch contains a raw buffer (serialized values)
       3              : //! and a list of metadata for each (key, LSN) tuple present in the batch.
       4              : //!
       5              : //! Such batches are created from decoded PG wal records and ingested
       6              : //! by the pageserver by writing directly to the ephemeral file.
       7              : 
       8              : use std::collections::{BTreeSet, HashMap};
       9              : 
      10              : use bytes::{Bytes, BytesMut};
      11              : use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
      12              : use pageserver_api::keyspace::KeySpace;
      13              : use pageserver_api::reltag::RelTag;
      14              : use pageserver_api::shard::ShardIdentity;
      15              : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
      16              : use postgres_ffi::{BLCKSZ, PgMajorVersion, page_is_new, page_set_lsn, pg_constants};
      17              : use serde::{Deserialize, Serialize};
      18              : use utils::bin_ser::BeSer;
      19              : use utils::lsn::Lsn;
      20              : 
      21              : use crate::models::InterpretedWalRecord;
      22              : use crate::models::record::NeonWalRecord;
      23              : use crate::models::value::Value;
      24              : 
      25              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
      26              : 
      27              : /// Accompanying metadata for the batch
      28              : /// A value may be serialized and stored into the batch or just "observed".
      29              : /// Shard 0 currently "observes" all values in order to accurately track
      30              : /// relation sizes. In the case of "observed" values, we only need to know
      31              : /// the key and LSN, so two types of metadata are supported to save on network
      32              : /// bandwidth.
      33            0 : #[derive(Serialize, Deserialize, Clone)]
      34              : pub enum ValueMeta {
      35              :     Serialized(SerializedValueMeta),
      36              :     Observed(ObservedValueMeta),
      37              : }
      38              : 
      39              : impl ValueMeta {
      40     14293182 :     pub fn key(&self) -> CompactKey {
      41     14293182 :         match self {
      42     14293182 :             Self::Serialized(ser) => ser.key,
      43            0 :             Self::Observed(obs) => obs.key,
      44              :         }
      45     14293182 :     }
      46              : 
      47     14220361 :     pub fn lsn(&self) -> Lsn {
      48     14220361 :         match self {
      49     14220361 :             Self::Serialized(ser) => ser.lsn,
      50            0 :             Self::Observed(obs) => obs.lsn,
      51              :         }
      52     14220361 :     }
      53              : }
      54              : 
      55              : /// Wrapper around [`ValueMeta`] that implements ordering by
      56              : /// (key, LSN) tuples
      57              : struct OrderedValueMeta(ValueMeta);
      58              : 
      59              : impl Ord for OrderedValueMeta {
      60           59 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      61           59 :         (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
      62           59 :     }
      63              : }
      64              : 
      65              : impl PartialOrd for OrderedValueMeta {
      66            9 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      67            9 :         Some(self.cmp(other))
      68            9 :     }
      69              : }
      70              : 
      71              : impl PartialEq for OrderedValueMeta {
      72            8 :     fn eq(&self, other: &Self) -> bool {
      73            8 :         (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
      74            8 :     }
      75              : }
      76              : 
      77              : impl Eq for OrderedValueMeta {}
      78              : 
      79              : /// Metadata for a [`Value`] serialized into the batch.
      80            0 : #[derive(Serialize, Deserialize, Clone)]
      81              : pub struct SerializedValueMeta {
      82              :     pub key: CompactKey,
      83              :     pub lsn: Lsn,
      84              :     /// Starting offset of the value for the (key, LSN) tuple
      85              :     /// in [`SerializedValueBatch::raw`]
      86              :     pub batch_offset: u64,
      87              :     pub len: usize,
      88              :     pub will_init: bool,
      89              : }
      90              : 
      91              : /// Metadata for a [`Value`] observed by the batch
      92            0 : #[derive(Serialize, Deserialize, Clone)]
      93              : pub struct ObservedValueMeta {
      94              :     pub key: CompactKey,
      95              :     pub lsn: Lsn,
      96              : }
      97              : 
      98              : /// Batch of serialized [`Value`]s.
      99            0 : #[derive(Serialize, Deserialize, Clone)]
     100              : pub struct SerializedValueBatch {
     101              :     /// [`Value`]s serialized in EphemeralFile's native format,
     102              :     /// ready for disk write by the pageserver
     103              :     pub raw: Vec<u8>,
     104              : 
     105              :     /// Metadata to make sense of the bytes in [`Self::raw`]
     106              :     /// and represent "observed" values.
     107              :     ///
     108              :     /// Invariant: Metadata entries for any given key are ordered
     109              :     /// by LSN. Note that entries for a key do not have to be contiguous.
     110              :     pub metadata: Vec<ValueMeta>,
     111              : 
     112              :     /// The highest LSN of any value in the batch
     113              :     pub max_lsn: Lsn,
     114              : 
     115              :     /// Number of values encoded by [`Self::raw`]
     116              :     pub len: usize,
     117              : }
     118              : 
     119              : impl Default for SerializedValueBatch {
     120       204826 :     fn default() -> Self {
     121       204826 :         Self {
     122       204826 :             raw: Default::default(),
     123       204826 :             metadata: Default::default(),
     124       204826 :             max_lsn: Lsn(0),
     125       204826 :             len: 0,
     126       204826 :         }
     127       204826 :     }
     128              : }
     129              : 
     130              : impl SerializedValueBatch {
     131              :     /// Populates the given `shard_records` with value batches from this WAL record, if any,
     132              :     /// discarding those belonging to other shards.
     133              :     ///
     134              :     /// The batch will only contain values for keys targeting the specifiec
     135              :     /// shard. Shard 0 is a special case, where any keys that don't belong to
     136              :     /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
     137              :     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     138        73532 :     pub(crate) fn from_decoded_filtered(
     139        73532 :         decoded: DecodedWALRecord,
     140        73532 :         shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
     141        73532 :         next_record_lsn: Lsn,
     142        73532 :         pg_version: PgMajorVersion,
     143        73532 :     ) -> anyhow::Result<()> {
     144              :         // First determine how big the buffers need to be and allocate it up-front.
     145              :         // This duplicates some of the work below, but it's empirically much faster.
     146        73732 :         for (shard, record) in shard_records.iter_mut() {
     147        73732 :             assert!(record.batch.is_empty());
     148              : 
     149        73732 :             let estimate = Self::estimate_buffer_size(&decoded, shard, pg_version);
     150        73732 :             record.batch.raw = Vec::with_capacity(estimate);
     151              :         }
     152              : 
     153        73532 :         for blk in decoded.blocks.iter() {
     154        72821 :             let rel = RelTag {
     155        72821 :                 spcnode: blk.rnode_spcnode,
     156        72821 :                 dbnode: blk.rnode_dbnode,
     157        72821 :                 relnode: blk.rnode_relnode,
     158        72821 :                 forknum: blk.forknum,
     159        72821 :             };
     160              : 
     161        72821 :             let key = rel_block_to_key(rel, blk.blkno);
     162              : 
     163        72821 :             if !key.is_valid_key_on_write_path() {
     164            0 :                 anyhow::bail!(
     165            0 :                     "Unsupported key decoded at LSN {}: {}",
     166              :                     next_record_lsn,
     167              :                     key
     168              :                 );
     169        72821 :             }
     170              : 
     171        72821 :             for (shard, record) in shard_records.iter_mut() {
     172        72821 :                 let key_is_local = shard.is_key_local(&key);
     173              : 
     174        72821 :                 tracing::debug!(
     175              :                     lsn=%next_record_lsn,
     176              :                     key=%key,
     177            0 :                     "ingest: shard decision {}",
     178            0 :                     if !key_is_local { "drop" } else { "keep" },
     179              :                 );
     180              : 
     181        72821 :                 if !key_is_local {
     182            0 :                     if shard.is_shard_zero() {
     183              :                         // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     184              :                         // its blkno in case it implicitly extends a relation.
     185            0 :                         record
     186            0 :                             .batch
     187            0 :                             .metadata
     188            0 :                             .push(ValueMeta::Observed(ObservedValueMeta {
     189            0 :                                 key: key.to_compact(),
     190            0 :                                 lsn: next_record_lsn,
     191            0 :                             }))
     192            0 :                     }
     193              : 
     194            0 :                     continue;
     195        72821 :                 }
     196              : 
     197              :                 // Instead of storing full-page-image WAL record,
     198              :                 // it is better to store extracted image: we can skip wal-redo
     199              :                 // in this case. Also some FPI records may contain multiple (up to 32) pages,
     200              :                 // so them have to be copied multiple times.
     201              :                 //
     202        72821 :                 let val = if Self::block_is_image(&decoded, blk, pg_version) {
     203              :                     // Extract page image from FPI record
     204           12 :                     let img_len = blk.bimg_len as usize;
     205           12 :                     let img_offs = blk.bimg_offset as usize;
     206           12 :                     let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     207              :                     // TODO(vlad): skip the copy
     208           12 :                     image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     209              : 
     210           12 :                     if blk.hole_length != 0 {
     211            0 :                         let tail = image.split_off(blk.hole_offset as usize);
     212            0 :                         image.resize(image.len() + blk.hole_length as usize, 0u8);
     213            0 :                         image.unsplit(tail);
     214           12 :                     }
     215              :                     //
     216              :                     // Match the logic of XLogReadBufferForRedoExtended:
     217              :                     // The page may be uninitialized. If so, we can't set the LSN because
     218              :                     // that would corrupt the page.
     219              :                     //
     220           12 :                     if !page_is_new(&image) {
     221            9 :                         page_set_lsn(&mut image, next_record_lsn)
     222            3 :                     }
     223           12 :                     assert_eq!(image.len(), BLCKSZ as usize);
     224              : 
     225           12 :                     Value::Image(image.freeze())
     226              :                 } else {
     227              :                     Value::WalRecord(NeonWalRecord::Postgres {
     228        72809 :                         will_init: blk.will_init || blk.apply_image,
     229        72809 :                         rec: decoded.record.clone(),
     230              :                     })
     231              :                 };
     232              : 
     233        72821 :                 let relative_off = record.batch.raw.len() as u64;
     234              : 
     235        72821 :                 val.ser_into(&mut record.batch.raw)
     236        72821 :                     .expect("Writing into in-memory buffer is infallible");
     237              : 
     238        72821 :                 let val_ser_size = record.batch.raw.len() - relative_off as usize;
     239              : 
     240        72821 :                 record
     241        72821 :                     .batch
     242        72821 :                     .metadata
     243        72821 :                     .push(ValueMeta::Serialized(SerializedValueMeta {
     244        72821 :                         key: key.to_compact(),
     245        72821 :                         lsn: next_record_lsn,
     246        72821 :                         batch_offset: relative_off,
     247        72821 :                         len: val_ser_size,
     248        72821 :                         will_init: val.will_init(),
     249        72821 :                     }));
     250        72821 :                 record.batch.max_lsn = std::cmp::max(record.batch.max_lsn, next_record_lsn);
     251        72821 :                 record.batch.len += 1;
     252              :             }
     253              :         }
     254              : 
     255        73532 :         if cfg!(any(debug_assertions, test)) {
     256              :             // Validate that the batches are correct
     257        73732 :             for record in shard_records.values() {
     258        73732 :                 record.batch.validate_lsn_order();
     259        73732 :             }
     260            0 :         }
     261              : 
     262        73532 :         Ok(())
     263        73532 :     }
     264              : 
     265              :     /// Look into the decoded PG WAL record and determine
     266              :     /// roughly how large the buffer for serialized values needs to be.
     267        73732 :     fn estimate_buffer_size(
     268        73732 :         decoded: &DecodedWALRecord,
     269        73732 :         shard: &ShardIdentity,
     270        73732 :         pg_version: PgMajorVersion,
     271        73732 :     ) -> usize {
     272        73732 :         let mut estimate: usize = 0;
     273              : 
     274        73732 :         for blk in decoded.blocks.iter() {
     275        72821 :             let rel = RelTag {
     276        72821 :                 spcnode: blk.rnode_spcnode,
     277        72821 :                 dbnode: blk.rnode_dbnode,
     278        72821 :                 relnode: blk.rnode_relnode,
     279        72821 :                 forknum: blk.forknum,
     280        72821 :             };
     281              : 
     282        72821 :             let key = rel_block_to_key(rel, blk.blkno);
     283              : 
     284        72821 :             if !shard.is_key_local(&key) {
     285            0 :                 continue;
     286        72821 :             }
     287              : 
     288        72821 :             if Self::block_is_image(decoded, blk, pg_version) {
     289           12 :                 // 4 bytes for the Value::Image discriminator
     290           12 :                 // 8 bytes for encoding the size of the buffer
     291           12 :                 // BLCKSZ for the raw image
     292           12 :                 estimate += (4 + 8 + BLCKSZ) as usize;
     293        72809 :             } else {
     294        72809 :                 // 4 bytes for the Value::WalRecord discriminator
     295        72809 :                 // 4 bytes for the NeonWalRecord::Postgres discriminator
     296        72809 :                 // 1 bytes for NeonWalRecord::Postgres::will_init
     297        72809 :                 // 8 bytes for encoding the size of the buffer
     298        72809 :                 // length of the raw record
     299        72809 :                 estimate += 8 + 1 + 8 + decoded.record.len();
     300        72809 :             }
     301              :         }
     302              : 
     303        73732 :         estimate
     304        73732 :     }
     305              : 
     306       145642 :     fn block_is_image(
     307       145642 :         decoded: &DecodedWALRecord,
     308       145642 :         blk: &DecodedBkpBlock,
     309       145642 :         pg_version: PgMajorVersion,
     310       145642 :     ) -> bool {
     311       145642 :         blk.apply_image
     312           60 :             && blk.has_image
     313           60 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     314           24 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     315            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     316              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     317           24 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
     318              :             // do not materialize null pages because them most likely be soon replaced with real data
     319           24 :             && blk.bimg_len != 0
     320       145642 :     }
     321              : 
     322              :     /// Encode a list of values and metadata into a serialized batch
     323              :     ///
     324              :     /// This is used by the pageserver ingest code to conveniently generate
     325              :     /// batches for metadata writes.
     326      2330506 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
     327              :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     328              :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     329      2330506 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     330      2330506 :         let mut buf = Vec::<u8>::with_capacity(buffer_size);
     331              : 
     332      2330506 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
     333      2330506 :         let mut max_lsn: Lsn = Lsn(0);
     334      2330506 :         let len = batch.len();
     335      4682494 :         for (key, lsn, val_ser_size, val) in batch {
     336      2351988 :             let relative_off = buf.len() as u64;
     337      2351988 : 
     338      2351988 :             val.ser_into(&mut buf)
     339      2351988 :                 .expect("Writing into in-memory buffer is infallible");
     340      2351988 : 
     341      2351988 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     342      2351988 :                 key,
     343      2351988 :                 lsn,
     344      2351988 :                 batch_offset: relative_off,
     345      2351988 :                 len: val_ser_size,
     346      2351988 :                 will_init: val.will_init(),
     347      2351988 :             }));
     348      2351988 :             max_lsn = std::cmp::max(max_lsn, lsn);
     349      2351988 :         }
     350              : 
     351              :         // Assert that we didn't do any extra allocations while building buffer.
     352      2330506 :         debug_assert!(buf.len() <= buffer_size);
     353              : 
     354      2330506 :         if cfg!(any(debug_assertions, test)) {
     355      2330506 :             let batch = Self {
     356      2330506 :                 raw: buf,
     357      2330506 :                 metadata,
     358      2330506 :                 max_lsn,
     359      2330506 :                 len,
     360      2330506 :             };
     361              : 
     362      2330506 :             batch.validate_lsn_order();
     363              : 
     364      2330506 :             return batch;
     365            0 :         }
     366              : 
     367            0 :         Self {
     368            0 :             raw: buf,
     369            0 :             metadata,
     370            0 :             max_lsn,
     371            0 :             len,
     372            0 :         }
     373      2330506 :     }
     374              : 
     375              :     /// Add one value to the batch
     376              :     ///
     377              :     /// This is used by the pageserver ingest code to include metadata block
     378              :     /// updates for a single key.
     379       140435 :     pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
     380       140435 :         let relative_off = self.raw.len() as u64;
     381       140435 :         value.ser_into(&mut self.raw).unwrap();
     382              : 
     383       140435 :         let val_ser_size = self.raw.len() - relative_off as usize;
     384       140435 :         self.metadata
     385       140435 :             .push(ValueMeta::Serialized(SerializedValueMeta {
     386       140435 :                 key,
     387       140435 :                 lsn,
     388       140435 :                 batch_offset: relative_off,
     389       140435 :                 len: val_ser_size,
     390       140435 :                 will_init: value.will_init(),
     391       140435 :             }));
     392              : 
     393       140435 :         self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     394       140435 :         self.len += 1;
     395              : 
     396       140435 :         if cfg!(any(debug_assertions, test)) {
     397       140435 :             self.validate_lsn_order();
     398       140435 :         }
     399       140435 :     }
     400              : 
     401              :     /// Extend with the contents of another batch
     402              :     ///
     403              :     /// One batch is generated for each decoded PG WAL record.
     404              :     /// They are then merged to accumulate reasonably sized writes.
     405       132289 :     pub fn extend(&mut self, mut other: SerializedValueBatch) {
     406       132289 :         let extend_batch_start_offset = self.raw.len() as u64;
     407              : 
     408       132289 :         self.raw.extend(other.raw);
     409              : 
     410              :         // Shift the offsets in the batch we are extending with
     411       133256 :         other.metadata.iter_mut().for_each(|meta| match meta {
     412       133256 :             ValueMeta::Serialized(ser) => {
     413       133256 :                 ser.batch_offset += extend_batch_start_offset;
     414       133256 :                 if cfg!(debug_assertions) {
     415       133256 :                     let value_end = ser.batch_offset + ser.len as u64;
     416       133256 :                     assert!((value_end as usize) <= self.raw.len());
     417            0 :                 }
     418              :             }
     419            0 :             ValueMeta::Observed(_) => {}
     420       133256 :         });
     421       132289 :         self.metadata.extend(other.metadata);
     422              : 
     423       132289 :         self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
     424              : 
     425       132289 :         self.len += other.len;
     426              : 
     427       132289 :         if cfg!(any(debug_assertions, test)) {
     428       132289 :             self.validate_lsn_order();
     429       132289 :         }
     430       132289 :     }
     431              : 
     432              :     /// Add zero images for the (key, LSN) tuples specified
     433              :     ///
     434              :     /// PG versions below 16 do not zero out pages before extending
     435              :     /// a relation and may leave gaps. Such gaps need to be identified
     436              :     /// by the pageserver ingest logic and get patched up here.
     437              :     ///
     438              :     /// Note that this function does not validate that the gaps have been
     439              :     /// identified correctly (it does not know relation sizes), so it's up
     440              :     /// to the call-site to do it properly.
     441            1 :     pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
     442              :         // Implementation note:
     443              :         //
     444              :         // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
     445              :         // but the metadata entries should be ordered properly (see
     446              :         // [`SerializedValueBatch::metadata`]).
     447              :         //
     448              :         // Exploiting this observation we do:
     449              :         // 1. Drain all the metadata entries into an ordered set.
     450              :         // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
     451              :         // includes more than one update to the same block in the same WAL record.
     452              :         // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
     453              :         // and add an index entry to the ordered metadata set.
     454              :         // 3. Drain the ordered set back into a metadata vector
     455              : 
     456            1 :         let mut ordered_metas = self
     457            1 :             .metadata
     458            1 :             .drain(..)
     459            1 :             .map(OrderedValueMeta)
     460            1 :             .collect::<BTreeSet<_>>();
     461            3 :         for (keyspace, lsn) in gaps {
     462            2 :             self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     463              : 
     464            5 :             for gap_range in keyspace.ranges {
     465            3 :                 let mut key = gap_range.start;
     466           13 :                 while key != gap_range.end {
     467           10 :                     let relative_off = self.raw.len() as u64;
     468           10 : 
     469           10 :                     // TODO(vlad): Can we be cheeky and write only one zero image, and
     470           10 :                     // make all index entries requiring a zero page point to it?
     471           10 :                     // Alternatively, we can change the index entry format to represent zero pages
     472           10 :                     // without writing them at all.
     473           10 :                     Value::Image(ZERO_PAGE.clone())
     474           10 :                         .ser_into(&mut self.raw)
     475           10 :                         .unwrap();
     476           10 :                     let val_ser_size = self.raw.len() - relative_off as usize;
     477           10 : 
     478           10 :                     ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
     479           10 :                         SerializedValueMeta {
     480           10 :                             key: key.to_compact(),
     481           10 :                             lsn,
     482           10 :                             batch_offset: relative_off,
     483           10 :                             len: val_ser_size,
     484           10 :                             will_init: true,
     485           10 :                         },
     486           10 :                     )));
     487           10 : 
     488           10 :                     self.len += 1;
     489           10 : 
     490           10 :                     key = key.next();
     491           10 :                 }
     492              :             }
     493              :         }
     494              : 
     495            1 :         self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
     496              : 
     497            1 :         if cfg!(any(debug_assertions, test)) {
     498            1 :             self.validate_lsn_order();
     499            1 :         }
     500            1 :     }
     501              : 
     502              :     /// Checks if the batch contains any serialized or observed values
     503        73933 :     pub fn is_empty(&self) -> bool {
     504        73933 :         !self.has_data() && self.metadata.is_empty()
     505        73933 :     }
     506              : 
     507              :     /// Checks if the batch contains only observed values
     508            0 :     pub fn is_observed(&self) -> bool {
     509            0 :         !self.has_data() && !self.metadata.is_empty()
     510            0 :     }
     511              : 
     512              :     /// Checks if the batch contains data
     513              :     ///
     514              :     /// Note that if this returns false, it may still contain observed values or
     515              :     /// a metadata record.
     516      2548977 :     pub fn has_data(&self) -> bool {
     517      2548977 :         let empty = self.raw.is_empty();
     518              : 
     519      2548977 :         if cfg!(debug_assertions) && empty {
     520        74033 :             assert!(
     521        74033 :                 self.metadata
     522        74033 :                     .iter()
     523        74033 :                     .all(|meta| matches!(meta, ValueMeta::Observed(_)))
     524              :             );
     525      2474944 :         }
     526              : 
     527      2548977 :         !empty
     528      2548977 :     }
     529              : 
     530              :     /// Returns the number of values serialized in the batch
     531        72831 :     pub fn len(&self) -> usize {
     532        72831 :         self.len
     533        72831 :     }
     534              : 
     535              :     /// Returns the size of the buffer wrapped by the batch
     536      2402134 :     pub fn buffer_size(&self) -> usize {
     537      2402134 :         self.raw.len()
     538      2402134 :     }
     539              : 
     540            0 :     pub fn updates_key(&self, key: &Key) -> bool {
     541            0 :         self.metadata.iter().any(|meta| match meta {
     542            0 :             ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
     543            0 :             ValueMeta::Observed(_) => false,
     544            0 :         })
     545            0 :     }
     546              : 
     547      2676969 :     pub fn validate_lsn_order(&self) {
     548              :         use std::collections::HashMap;
     549              : 
     550      2676969 :         let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
     551              : 
     552     14219987 :         for meta in self.metadata.iter() {
     553     14219987 :             let lsn = meta.lsn();
     554     14219987 :             let key = meta.key();
     555              : 
     556     14219987 :             if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
     557        19617 :                 assert!(
     558        19617 :                     lsn >= prev_lsn,
     559            0 :                     "Ordering violated by {}: {} < {}",
     560            0 :                     Key::from_compact(key),
     561              :                     lsn,
     562              :                     prev_lsn
     563              :                 );
     564     14200370 :             }
     565              :         }
     566      2676969 :     }
     567              : }
     568              : 
     569              : #[cfg(all(test, feature = "testing"))]
     570              : mod tests {
     571              :     use super::*;
     572              : 
     573            6 :     fn validate_batch(
     574            6 :         batch: &SerializedValueBatch,
     575            6 :         values: &[(CompactKey, Lsn, usize, Value)],
     576            6 :         gaps: Option<&Vec<(KeySpace, Lsn)>>,
     577            6 :     ) {
     578              :         // Invariant 1: The metadata for a given entry in the batch
     579              :         // is correct and can be used to deserialize back to the original value.
     580           28 :         for (key, lsn, size, value) in values.iter() {
     581           28 :             let meta = batch
     582           28 :                 .metadata
     583           28 :                 .iter()
     584          136 :                 .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
     585           28 :                 .unwrap();
     586           28 :             let meta = match meta {
     587           28 :                 ValueMeta::Serialized(ser) => ser,
     588            0 :                 ValueMeta::Observed(_) => unreachable!(),
     589              :             };
     590              : 
     591           28 :             assert_eq!(meta.len, *size);
     592           28 :             assert_eq!(meta.will_init, value.will_init());
     593              : 
     594           28 :             let start = meta.batch_offset as usize;
     595           28 :             let end = meta.batch_offset as usize + meta.len;
     596           28 :             let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     597           28 :             assert_eq!(&value_from_batch, value);
     598              :         }
     599              : 
     600            6 :         let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
     601            6 :         let mut gap_pages_count: usize = 0;
     602              : 
     603              :         // Invariant 2: Zero pages were added for identified gaps and their metadata
     604              :         // is correct.
     605            6 :         if let Some(gaps) = gaps {
     606            3 :             for (gap_keyspace, lsn) in gaps {
     607            5 :                 for gap_range in &gap_keyspace.ranges {
     608            3 :                     let mut gap_key = gap_range.start;
     609           13 :                     while gap_key != gap_range.end {
     610           10 :                         let meta = batch
     611           10 :                             .metadata
     612           10 :                             .iter()
     613          104 :                             .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
     614           10 :                             .unwrap();
     615           10 :                         let meta = match meta {
     616           10 :                             ValueMeta::Serialized(ser) => ser,
     617            0 :                             ValueMeta::Observed(_) => unreachable!(),
     618              :                         };
     619              : 
     620           10 :                         let zero_value = Value::Image(ZERO_PAGE.clone());
     621           10 :                         let zero_value_size = zero_value.serialized_size().unwrap() as usize;
     622              : 
     623           10 :                         assert_eq!(meta.len, zero_value_size);
     624           10 :                         assert_eq!(meta.will_init, zero_value.will_init());
     625              : 
     626           10 :                         let start = meta.batch_offset as usize;
     627           10 :                         let end = meta.batch_offset as usize + meta.len;
     628           10 :                         let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     629           10 :                         assert_eq!(value_from_batch, zero_value);
     630              : 
     631           10 :                         gap_pages_count += 1;
     632           10 :                         expected_buffer_size += zero_value_size;
     633           10 :                         gap_key = gap_key.next();
     634              :                     }
     635              :                 }
     636              :             }
     637            5 :         }
     638              : 
     639              :         // Invariant 3: The length of the batch is equal to the number
     640              :         // of values inserted, plus the number of gap pages. This extends
     641              :         // to the raw buffer size.
     642            6 :         assert_eq!(batch.len(), values.len() + gap_pages_count);
     643            6 :         assert_eq!(expected_buffer_size, batch.buffer_size());
     644              : 
     645              :         // Invariant 4: Metadata entries for any given key are sorted in LSN order.
     646            6 :         batch.validate_lsn_order();
     647            6 :     }
     648              : 
     649              :     #[test]
     650            1 :     fn test_creation_from_values() {
     651              :         const LSN: Lsn = Lsn(0x10);
     652            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     653              : 
     654            1 :         let values = vec![
     655            1 :             (
     656            1 :                 key.to_compact(),
     657            1 :                 LSN,
     658            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     659            1 :             ),
     660            1 :             (
     661            1 :                 key.next().to_compact(),
     662            1 :                 LSN,
     663            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     664            1 :             ),
     665            1 :             (
     666            1 :                 key.to_compact(),
     667            1 :                 Lsn(LSN.0 + 0x10),
     668            1 :                 Value::WalRecord(NeonWalRecord::wal_append("baz")),
     669            1 :             ),
     670            1 :             (
     671            1 :                 key.next().next().to_compact(),
     672            1 :                 LSN,
     673            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     674            1 :             ),
     675              :         ];
     676              : 
     677            1 :         let values = values
     678            1 :             .into_iter()
     679            4 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     680            1 :             .collect::<Vec<_>>();
     681            1 :         let batch = SerializedValueBatch::from_values(values.clone());
     682              : 
     683            1 :         validate_batch(&batch, &values, None);
     684              : 
     685            1 :         assert!(!batch.is_empty());
     686            1 :     }
     687              : 
     688              :     #[test]
     689            1 :     fn test_put() {
     690              :         const LSN: Lsn = Lsn(0x10);
     691            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     692              : 
     693            1 :         let values = vec![
     694            1 :             (
     695            1 :                 key.to_compact(),
     696            1 :                 LSN,
     697            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     698            1 :             ),
     699            1 :             (
     700            1 :                 key.next().to_compact(),
     701            1 :                 LSN,
     702            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     703            1 :             ),
     704              :         ];
     705              : 
     706            1 :         let mut values = values
     707            1 :             .into_iter()
     708            2 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     709            1 :             .collect::<Vec<_>>();
     710            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     711              : 
     712            1 :         validate_batch(&batch, &values, None);
     713              : 
     714            1 :         let value = (
     715            1 :             key.to_compact(),
     716            1 :             Lsn(LSN.0 + 0x10),
     717            1 :             Value::WalRecord(NeonWalRecord::wal_append("baz")),
     718            1 :         );
     719            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     720            1 :         let value = (value.0, value.1, serialized_size, value.2);
     721            1 :         values.push(value.clone());
     722            1 :         batch.put(value.0, value.3, value.1);
     723              : 
     724            1 :         validate_batch(&batch, &values, None);
     725              : 
     726            1 :         let value = (
     727            1 :             key.next().next().to_compact(),
     728            1 :             LSN,
     729            1 :             Value::WalRecord(NeonWalRecord::wal_append("taz")),
     730            1 :         );
     731            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     732            1 :         let value = (value.0, value.1, serialized_size, value.2);
     733            1 :         values.push(value.clone());
     734            1 :         batch.put(value.0, value.3, value.1);
     735              : 
     736            1 :         validate_batch(&batch, &values, None);
     737            1 :     }
     738              : 
     739              :     #[test]
     740            1 :     fn test_extension() {
     741              :         const LSN: Lsn = Lsn(0x10);
     742            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     743              : 
     744            1 :         let values = vec![
     745            1 :             (
     746            1 :                 key.to_compact(),
     747            1 :                 LSN,
     748            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     749            1 :             ),
     750            1 :             (
     751            1 :                 key.next().to_compact(),
     752            1 :                 LSN,
     753            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     754            1 :             ),
     755            1 :             (
     756            1 :                 key.next().next().to_compact(),
     757            1 :                 LSN,
     758            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     759            1 :             ),
     760              :         ];
     761              : 
     762            1 :         let mut values = values
     763            1 :             .into_iter()
     764            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     765            1 :             .collect::<Vec<_>>();
     766            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     767              : 
     768            1 :         let other_values = vec![
     769            1 :             (
     770            1 :                 key.to_compact(),
     771            1 :                 Lsn(LSN.0 + 0x10),
     772            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     773            1 :             ),
     774            1 :             (
     775            1 :                 key.next().to_compact(),
     776            1 :                 Lsn(LSN.0 + 0x10),
     777            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     778            1 :             ),
     779            1 :             (
     780            1 :                 key.next().next().to_compact(),
     781            1 :                 Lsn(LSN.0 + 0x10),
     782            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     783            1 :             ),
     784              :         ];
     785              : 
     786            1 :         let other_values = other_values
     787            1 :             .into_iter()
     788            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     789            1 :             .collect::<Vec<_>>();
     790            1 :         let other_batch = SerializedValueBatch::from_values(other_values.clone());
     791              : 
     792            1 :         values.extend(other_values);
     793            1 :         batch.extend(other_batch);
     794              : 
     795            1 :         validate_batch(&batch, &values, None);
     796            1 :     }
     797              : 
     798              :     #[test]
     799            1 :     fn test_gap_zeroing() {
     800              :         const LSN: Lsn = Lsn(0x10);
     801            1 :         let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     802              : 
     803            1 :         let rel_bar_base_key = {
     804            1 :             let mut key = rel_foo_base_key;
     805            1 :             key.field4 += 1;
     806            1 :             key
     807              :         };
     808              : 
     809            1 :         let values = vec![
     810            1 :             (
     811            1 :                 rel_foo_base_key.to_compact(),
     812            1 :                 LSN,
     813            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo1")),
     814            1 :             ),
     815            1 :             (
     816            1 :                 rel_foo_base_key.add(1).to_compact(),
     817            1 :                 LSN,
     818            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo2")),
     819            1 :             ),
     820            1 :             (
     821            1 :                 rel_foo_base_key.add(5).to_compact(),
     822            1 :                 LSN,
     823            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo3")),
     824            1 :             ),
     825            1 :             (
     826            1 :                 rel_foo_base_key.add(1).to_compact(),
     827            1 :                 Lsn(LSN.0 + 0x10),
     828            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo4")),
     829            1 :             ),
     830            1 :             (
     831            1 :                 rel_foo_base_key.add(10).to_compact(),
     832            1 :                 Lsn(LSN.0 + 0x10),
     833            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo5")),
     834            1 :             ),
     835            1 :             (
     836            1 :                 rel_foo_base_key.add(11).to_compact(),
     837            1 :                 Lsn(LSN.0 + 0x10),
     838            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo6")),
     839            1 :             ),
     840            1 :             (
     841            1 :                 rel_foo_base_key.add(12).to_compact(),
     842            1 :                 Lsn(LSN.0 + 0x10),
     843            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo7")),
     844            1 :             ),
     845            1 :             (
     846            1 :                 rel_bar_base_key.to_compact(),
     847            1 :                 LSN,
     848            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar1")),
     849            1 :             ),
     850            1 :             (
     851            1 :                 rel_bar_base_key.add(4).to_compact(),
     852            1 :                 LSN,
     853            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar2")),
     854            1 :             ),
     855              :         ];
     856              : 
     857            1 :         let values = values
     858            1 :             .into_iter()
     859            9 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     860            1 :             .collect::<Vec<_>>();
     861              : 
     862            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     863              : 
     864            1 :         let gaps = vec![
     865            1 :             (
     866            1 :                 KeySpace {
     867            1 :                     ranges: vec![
     868            1 :                         rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
     869            1 :                         rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
     870            1 :                     ],
     871            1 :                 },
     872            1 :                 LSN,
     873            1 :             ),
     874            1 :             (
     875            1 :                 KeySpace {
     876            1 :                     ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
     877            1 :                 },
     878            1 :                 Lsn(LSN.0 + 0x10),
     879            1 :             ),
     880              :         ];
     881              : 
     882            1 :         batch.zero_gaps(gaps.clone());
     883            1 :         validate_batch(&batch, &values, Some(&gaps));
     884            1 :     }
     885              : }
         |