LCOV - code coverage report
Current view: top level - libs/wal_decoder/src - serialized_batch.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 92.8 % 627 582
Test Date: 2024-11-13 18:23:39 Functions: 91.7 % 36 33

            Line data    Source code
       1              : //! This module implements batch type for serialized [`pageserver_api::value::Value`]
       2              : //! instances. Each batch contains a raw buffer (serialized values)
       3              : //! and a list of metadata for each (key, LSN) tuple present in the batch.
       4              : //!
       5              : //! Such batches are created from decoded PG wal records and ingested
       6              : //! by the pageserver by writing directly to the ephemeral file.
       7              : 
       8              : use std::collections::BTreeSet;
       9              : 
      10              : use bytes::{Bytes, BytesMut};
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use pageserver_api::keyspace::KeySpace;
      13              : use pageserver_api::record::NeonWalRecord;
      14              : use pageserver_api::reltag::RelTag;
      15              : use pageserver_api::shard::ShardIdentity;
      16              : use pageserver_api::{key::CompactKey, value::Value};
      17              : use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
      18              : use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
      19              : use utils::bin_ser::BeSer;
      20              : use utils::lsn::Lsn;
      21              : 
      22              : use pageserver_api::key::Key;
      23              : 
      24              : static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
      25              : 
      26              : /// Accompanying metadata for the batch
      27              : /// A value may be serialized and stored into the batch or just "observed".
      28              : /// Shard 0 currently "observes" all values in order to accurately track
      29              : /// relation sizes. In the case of "observed" values, we only need to know
      30              : /// the key and LSN, so two types of metadata are supported to save on network
      31              : /// bandwidth.
      32              : pub enum ValueMeta {
      33              :     Serialized(SerializedValueMeta),
      34              :     Observed(ObservedValueMeta),
      35              : }
      36              : 
      37              : impl ValueMeta {
      38     28545931 :     pub fn key(&self) -> CompactKey {
      39     28545931 :         match self {
      40     28545931 :             Self::Serialized(ser) => ser.key,
      41            0 :             Self::Observed(obs) => obs.key,
      42              :         }
      43     28545931 :     }
      44              : 
      45     28400289 :     pub fn lsn(&self) -> Lsn {
      46     28400289 :         match self {
      47     28400289 :             Self::Serialized(ser) => ser.lsn,
      48            0 :             Self::Observed(obs) => obs.lsn,
      49              :         }
      50     28400289 :     }
      51              : }
      52              : 
      53              : /// Wrapper around [`ValueMeta`] that implements ordering by
      54              : /// (key, LSN) tuples
      55              : struct OrderedValueMeta(ValueMeta);
      56              : 
      57              : impl Ord for OrderedValueMeta {
      58           59 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      59           59 :         (self.0.key(), self.0.lsn()).cmp(&(other.0.key(), other.0.lsn()))
      60           59 :     }
      61              : }
      62              : 
      63              : impl PartialOrd for OrderedValueMeta {
      64            9 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      65            9 :         Some(self.cmp(other))
      66            9 :     }
      67              : }
      68              : 
      69              : impl PartialEq for OrderedValueMeta {
      70            8 :     fn eq(&self, other: &Self) -> bool {
      71            8 :         (self.0.key(), self.0.lsn()) == (other.0.key(), other.0.lsn())
      72            8 :     }
      73              : }
      74              : 
      75              : impl Eq for OrderedValueMeta {}
      76              : 
      77              : /// Metadata for a [`Value`] serialized into the batch.
      78              : pub struct SerializedValueMeta {
      79              :     pub key: CompactKey,
      80              :     pub lsn: Lsn,
      81              :     /// Starting offset of the value for the (key, LSN) tuple
      82              :     /// in [`SerializedValueBatch::raw`]
      83              :     pub batch_offset: u64,
      84              :     pub len: usize,
      85              :     pub will_init: bool,
      86              : }
      87              : 
      88              : /// Metadata for a [`Value`] observed by the batch
      89              : pub struct ObservedValueMeta {
      90              :     pub key: CompactKey,
      91              :     pub lsn: Lsn,
      92              : }
      93              : 
      94              : /// Batch of serialized [`Value`]s.
      95              : pub struct SerializedValueBatch {
      96              :     /// [`Value`]s serialized in EphemeralFile's native format,
      97              :     /// ready for disk write by the pageserver
      98              :     pub raw: Vec<u8>,
      99              : 
     100              :     /// Metadata to make sense of the bytes in [`Self::raw`]
     101              :     /// and represent "observed" values.
     102              :     ///
     103              :     /// Invariant: Metadata entries for any given key are ordered
     104              :     /// by LSN. Note that entries for a key do not have to be contiguous.
     105              :     pub metadata: Vec<ValueMeta>,
     106              : 
     107              :     /// The highest LSN of any value in the batch
     108              :     pub max_lsn: Lsn,
     109              : 
     110              :     /// Number of values encoded by [`Self::raw`]
     111              :     pub len: usize,
     112              : }
     113              : 
     114              : impl Default for SerializedValueBatch {
     115       262188 :     fn default() -> Self {
     116       262188 :         Self {
     117       262188 :             raw: Default::default(),
     118       262188 :             metadata: Default::default(),
     119       262188 :             max_lsn: Lsn(0),
     120       262188 :             len: 0,
     121       262188 :         }
     122       262188 :     }
     123              : }
     124              : 
     125              : impl SerializedValueBatch {
     126              :     /// Build a batch of serialized values from a decoded PG WAL record
     127              :     ///
     128              :     /// The batch will only contain values for keys targeting the specifiec
     129              :     /// shard. Shard 0 is a special case, where any keys that don't belong to
     130              :     /// it are "observed" by the batch (i.e. present in [`SerializedValueBatch::metadata`],
     131              :     /// but absent from the raw buffer [`SerializedValueBatch::raw`]).
     132       145852 :     pub(crate) fn from_decoded_filtered(
     133       145852 :         decoded: DecodedWALRecord,
     134       145852 :         shard: &ShardIdentity,
     135       145852 :         record_end_lsn: Lsn,
     136       145852 :         pg_version: u32,
     137       145852 :     ) -> anyhow::Result<SerializedValueBatch> {
     138       145852 :         // First determine how big the buffer needs to be and allocate it up-front.
     139       145852 :         // This duplicates some of the work below, but it's empirically much faster.
     140       145852 :         let estimated_buffer_size = Self::estimate_buffer_size(&decoded, shard, pg_version);
     141       145852 :         let mut buf = Vec::<u8>::with_capacity(estimated_buffer_size);
     142       145852 : 
     143       145852 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(decoded.blocks.len());
     144       145852 :         let mut max_lsn: Lsn = Lsn(0);
     145       145852 :         let mut len: usize = 0;
     146       145852 :         for blk in decoded.blocks.iter() {
     147       145642 :             let relative_off = buf.len() as u64;
     148       145642 : 
     149       145642 :             let rel = RelTag {
     150       145642 :                 spcnode: blk.rnode_spcnode,
     151       145642 :                 dbnode: blk.rnode_dbnode,
     152       145642 :                 relnode: blk.rnode_relnode,
     153       145642 :                 forknum: blk.forknum,
     154       145642 :             };
     155       145642 : 
     156       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     157       145642 : 
     158       145642 :             if !key.is_valid_key_on_write_path() {
     159            0 :                 anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key);
     160       145642 :             }
     161       145642 : 
     162       145642 :             let key_is_local = shard.is_key_local(&key);
     163       145642 : 
     164       145642 :             tracing::debug!(
     165              :                 lsn=%record_end_lsn,
     166              :                 key=%key,
     167            0 :                 "ingest: shard decision {}",
     168            0 :                 if !key_is_local { "drop" } else { "keep" },
     169              :             );
     170              : 
     171       145642 :             if !key_is_local {
     172            0 :                 if shard.is_shard_zero() {
     173              :                     // Shard 0 tracks relation sizes.  Although we will not store this block, we will observe
     174              :                     // its blkno in case it implicitly extends a relation.
     175            0 :                     metadata.push(ValueMeta::Observed(ObservedValueMeta {
     176            0 :                         key: key.to_compact(),
     177            0 :                         lsn: record_end_lsn,
     178            0 :                     }))
     179            0 :                 }
     180              : 
     181            0 :                 continue;
     182       145642 :             }
     183              : 
     184              :             // Instead of storing full-page-image WAL record,
     185              :             // it is better to store extracted image: we can skip wal-redo
     186              :             // in this case. Also some FPI records may contain multiple (up to 32) pages,
     187              :             // so them have to be copied multiple times.
     188              :             //
     189       145642 :             let val = if Self::block_is_image(&decoded, blk, pg_version) {
     190              :                 // Extract page image from FPI record
     191           24 :                 let img_len = blk.bimg_len as usize;
     192           24 :                 let img_offs = blk.bimg_offset as usize;
     193           24 :                 let mut image = BytesMut::with_capacity(BLCKSZ as usize);
     194           24 :                 // TODO(vlad): skip the copy
     195           24 :                 image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
     196           24 : 
     197           24 :                 if blk.hole_length != 0 {
     198            0 :                     let tail = image.split_off(blk.hole_offset as usize);
     199            0 :                     image.resize(image.len() + blk.hole_length as usize, 0u8);
     200            0 :                     image.unsplit(tail);
     201           24 :                 }
     202              :                 //
     203              :                 // Match the logic of XLogReadBufferForRedoExtended:
     204              :                 // The page may be uninitialized. If so, we can't set the LSN because
     205              :                 // that would corrupt the page.
     206              :                 //
     207           24 :                 if !page_is_new(&image) {
     208           18 :                     page_set_lsn(&mut image, record_end_lsn)
     209            6 :                 }
     210           24 :                 assert_eq!(image.len(), BLCKSZ as usize);
     211              : 
     212           24 :                 Value::Image(image.freeze())
     213              :             } else {
     214              :                 Value::WalRecord(NeonWalRecord::Postgres {
     215       145618 :                     will_init: blk.will_init || blk.apply_image,
     216       145618 :                     rec: decoded.record.clone(),
     217              :                 })
     218              :             };
     219              : 
     220       145642 :             val.ser_into(&mut buf)
     221       145642 :                 .expect("Writing into in-memory buffer is infallible");
     222       145642 : 
     223       145642 :             let val_ser_size = buf.len() - relative_off as usize;
     224       145642 : 
     225       145642 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     226       145642 :                 key: key.to_compact(),
     227       145642 :                 lsn: record_end_lsn,
     228       145642 :                 batch_offset: relative_off,
     229       145642 :                 len: val_ser_size,
     230       145642 :                 will_init: val.will_init(),
     231       145642 :             }));
     232       145642 :             max_lsn = std::cmp::max(max_lsn, record_end_lsn);
     233       145642 :             len += 1;
     234              :         }
     235              : 
     236       145852 :         if cfg!(any(debug_assertions, test)) {
     237       145852 :             let batch = Self {
     238       145852 :                 raw: buf,
     239       145852 :                 metadata,
     240       145852 :                 max_lsn,
     241       145852 :                 len,
     242       145852 :             };
     243       145852 : 
     244       145852 :             batch.validate_lsn_order();
     245       145852 : 
     246       145852 :             return Ok(batch);
     247            0 :         }
     248            0 : 
     249            0 :         Ok(Self {
     250            0 :             raw: buf,
     251            0 :             metadata,
     252            0 :             max_lsn,
     253            0 :             len,
     254            0 :         })
     255       145852 :     }
     256              : 
     257              :     /// Look into the decoded PG WAL record and determine
     258              :     /// roughly how large the buffer for serialized values needs to be.
     259       145852 :     fn estimate_buffer_size(
     260       145852 :         decoded: &DecodedWALRecord,
     261       145852 :         shard: &ShardIdentity,
     262       145852 :         pg_version: u32,
     263       145852 :     ) -> usize {
     264       145852 :         let mut estimate: usize = 0;
     265              : 
     266       145852 :         for blk in decoded.blocks.iter() {
     267       145642 :             let rel = RelTag {
     268       145642 :                 spcnode: blk.rnode_spcnode,
     269       145642 :                 dbnode: blk.rnode_dbnode,
     270       145642 :                 relnode: blk.rnode_relnode,
     271       145642 :                 forknum: blk.forknum,
     272       145642 :             };
     273       145642 : 
     274       145642 :             let key = rel_block_to_key(rel, blk.blkno);
     275       145642 : 
     276       145642 :             if !shard.is_key_local(&key) {
     277            0 :                 continue;
     278       145642 :             }
     279       145642 : 
     280       145642 :             if Self::block_is_image(decoded, blk, pg_version) {
     281           24 :                 // 4 bytes for the Value::Image discriminator
     282           24 :                 // 8 bytes for encoding the size of the buffer
     283           24 :                 // BLCKSZ for the raw image
     284           24 :                 estimate += (4 + 8 + BLCKSZ) as usize;
     285       145618 :             } else {
     286       145618 :                 // 4 bytes for the Value::WalRecord discriminator
     287       145618 :                 // 4 bytes for the NeonWalRecord::Postgres discriminator
     288       145618 :                 // 1 bytes for NeonWalRecord::Postgres::will_init
     289       145618 :                 // 8 bytes for encoding the size of the buffer
     290       145618 :                 // length of the raw record
     291       145618 :                 estimate += 8 + 1 + 8 + decoded.record.len();
     292       145618 :             }
     293              :         }
     294              : 
     295       145852 :         estimate
     296       145852 :     }
     297              : 
     298       291284 :     fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
     299       291284 :         blk.apply_image
     300          120 :             && blk.has_image
     301          120 :             && decoded.xl_rmid == pg_constants::RM_XLOG_ID
     302           48 :             && (decoded.xl_info == pg_constants::XLOG_FPI
     303            0 :             || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
     304              :             // compression of WAL is not yet supported: fall back to storing the original WAL record
     305           48 :             && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)
     306              :             // do not materialize null pages because them most likely be soon replaced with real data
     307           48 :             && blk.bimg_len != 0
     308       291284 :     }
     309              : 
     310              :     /// Encode a list of values and metadata into a serialized batch
     311              :     ///
     312              :     /// This is used by the pageserver ingest code to conveniently generate
     313              :     /// batches for metadata writes.
     314      4660943 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
     315      4660943 :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     316      4660943 :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     317      4663987 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     318      4660943 :         let mut buf = Vec::<u8>::with_capacity(buffer_size);
     319      4660943 : 
     320      4660943 :         let mut metadata: Vec<ValueMeta> = Vec::with_capacity(batch.len());
     321      4660943 :         let mut max_lsn: Lsn = Lsn(0);
     322      4660943 :         let len = batch.len();
     323      9324930 :         for (key, lsn, val_ser_size, val) in batch {
     324      4663987 :             let relative_off = buf.len() as u64;
     325      4663987 : 
     326      4663987 :             val.ser_into(&mut buf)
     327      4663987 :                 .expect("Writing into in-memory buffer is infallible");
     328      4663987 : 
     329      4663987 :             metadata.push(ValueMeta::Serialized(SerializedValueMeta {
     330      4663987 :                 key,
     331      4663987 :                 lsn,
     332      4663987 :                 batch_offset: relative_off,
     333      4663987 :                 len: val_ser_size,
     334      4663987 :                 will_init: val.will_init(),
     335      4663987 :             }));
     336      4663987 :             max_lsn = std::cmp::max(max_lsn, lsn);
     337      4663987 :         }
     338              : 
     339              :         // Assert that we didn't do any extra allocations while building buffer.
     340      4660943 :         debug_assert!(buf.len() <= buffer_size);
     341              : 
     342      4660943 :         if cfg!(any(debug_assertions, test)) {
     343      4660943 :             let batch = Self {
     344      4660943 :                 raw: buf,
     345      4660943 :                 metadata,
     346      4660943 :                 max_lsn,
     347      4660943 :                 len,
     348      4660943 :             };
     349      4660943 : 
     350      4660943 :             batch.validate_lsn_order();
     351      4660943 : 
     352      4660943 :             return batch;
     353            0 :         }
     354            0 : 
     355            0 :         Self {
     356            0 :             raw: buf,
     357            0 :             metadata,
     358            0 :             max_lsn,
     359            0 :             len,
     360            0 :         }
     361      4660943 :     }
     362              : 
     363              :     /// Add one value to the batch
     364              :     ///
     365              :     /// This is used by the pageserver ingest code to include metadata block
     366              :     /// updates for a single key.
     367       280868 :     pub fn put(&mut self, key: CompactKey, value: Value, lsn: Lsn) {
     368       280868 :         let relative_off = self.raw.len() as u64;
     369       280868 :         value.ser_into(&mut self.raw).unwrap();
     370       280868 : 
     371       280868 :         let val_ser_size = self.raw.len() - relative_off as usize;
     372       280868 :         self.metadata
     373       280868 :             .push(ValueMeta::Serialized(SerializedValueMeta {
     374       280868 :                 key,
     375       280868 :                 lsn,
     376       280868 :                 batch_offset: relative_off,
     377       280868 :                 len: val_ser_size,
     378       280868 :                 will_init: value.will_init(),
     379       280868 :             }));
     380       280868 : 
     381       280868 :         self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     382       280868 :         self.len += 1;
     383       280868 : 
     384       280868 :         if cfg!(any(debug_assertions, test)) {
     385       280868 :             self.validate_lsn_order();
     386       280868 :         }
     387       280868 :     }
     388              : 
     389              :     /// Extend with the contents of another batch
     390              :     ///
     391              :     /// One batch is generated for each decoded PG WAL record.
     392              :     /// They are then merged to accumulate reasonably sized writes.
     393       264577 :     pub fn extend(&mut self, mut other: SerializedValueBatch) {
     394       264577 :         let extend_batch_start_offset = self.raw.len() as u64;
     395       264577 : 
     396       264577 :         self.raw.extend(other.raw);
     397       264577 : 
     398       264577 :         // Shift the offsets in the batch we are extending with
     399       266509 :         other.metadata.iter_mut().for_each(|meta| match meta {
     400       266509 :             ValueMeta::Serialized(ser) => {
     401       266509 :                 ser.batch_offset += extend_batch_start_offset;
     402       266509 :                 if cfg!(debug_assertions) {
     403       266509 :                     let value_end = ser.batch_offset + ser.len as u64;
     404       266509 :                     assert!((value_end as usize) <= self.raw.len());
     405            0 :                 }
     406              :             }
     407            0 :             ValueMeta::Observed(_) => {}
     408       266509 :         });
     409       264577 :         self.metadata.extend(other.metadata);
     410       264577 : 
     411       264577 :         self.max_lsn = std::cmp::max(self.max_lsn, other.max_lsn);
     412       264577 : 
     413       264577 :         self.len += other.len;
     414       264577 : 
     415       264577 :         if cfg!(any(debug_assertions, test)) {
     416       264577 :             self.validate_lsn_order();
     417       264577 :         }
     418       264577 :     }
     419              : 
     420              :     /// Add zero images for the (key, LSN) tuples specified
     421              :     ///
     422              :     /// PG versions below 16 do not zero out pages before extending
     423              :     /// a relation and may leave gaps. Such gaps need to be identified
     424              :     /// by the pageserver ingest logic and get patched up here.
     425              :     ///
     426              :     /// Note that this function does not validate that the gaps have been
     427              :     /// identified correctly (it does not know relation sizes), so it's up
     428              :     /// to the call-site to do it properly.
     429            1 :     pub fn zero_gaps(&mut self, gaps: Vec<(KeySpace, Lsn)>) {
     430            1 :         // Implementation note:
     431            1 :         //
     432            1 :         // Values within [`SerializedValueBatch::raw`] do not have any ordering requirements,
     433            1 :         // but the metadata entries should be ordered properly (see
     434            1 :         // [`SerializedValueBatch::metadata`]).
     435            1 :         //
     436            1 :         // Exploiting this observation we do:
     437            1 :         // 1. Drain all the metadata entries into an ordered set.
     438            1 :         // The use of a BTreeSet keyed by (Key, Lsn) relies on the observation that Postgres never
     439            1 :         // includes more than one update to the same block in the same WAL record.
     440            1 :         // 2. For each (key, LSN) gap tuple, append a zero image to the raw buffer
     441            1 :         // and add an index entry to the ordered metadata set.
     442            1 :         // 3. Drain the ordered set back into a metadata vector
     443            1 : 
     444            1 :         let mut ordered_metas = self
     445            1 :             .metadata
     446            1 :             .drain(..)
     447            1 :             .map(OrderedValueMeta)
     448            1 :             .collect::<BTreeSet<_>>();
     449            3 :         for (keyspace, lsn) in gaps {
     450            2 :             self.max_lsn = std::cmp::max(self.max_lsn, lsn);
     451              : 
     452            5 :             for gap_range in keyspace.ranges {
     453            3 :                 let mut key = gap_range.start;
     454           13 :                 while key != gap_range.end {
     455           10 :                     let relative_off = self.raw.len() as u64;
     456           10 : 
     457           10 :                     // TODO(vlad): Can we be cheeky and write only one zero image, and
     458           10 :                     // make all index entries requiring a zero page point to it?
     459           10 :                     // Alternatively, we can change the index entry format to represent zero pages
     460           10 :                     // without writing them at all.
     461           10 :                     Value::Image(ZERO_PAGE.clone())
     462           10 :                         .ser_into(&mut self.raw)
     463           10 :                         .unwrap();
     464           10 :                     let val_ser_size = self.raw.len() - relative_off as usize;
     465           10 : 
     466           10 :                     ordered_metas.insert(OrderedValueMeta(ValueMeta::Serialized(
     467           10 :                         SerializedValueMeta {
     468           10 :                             key: key.to_compact(),
     469           10 :                             lsn,
     470           10 :                             batch_offset: relative_off,
     471           10 :                             len: val_ser_size,
     472           10 :                             will_init: true,
     473           10 :                         },
     474           10 :                     )));
     475           10 : 
     476           10 :                     self.len += 1;
     477           10 : 
     478           10 :                     key = key.next();
     479           10 :                 }
     480              :             }
     481              :         }
     482              : 
     483           19 :         self.metadata = ordered_metas.into_iter().map(|ord| ord.0).collect();
     484            1 : 
     485            1 :         if cfg!(any(debug_assertions, test)) {
     486            1 :             self.validate_lsn_order();
     487            1 :         }
     488            1 :     }
     489              : 
     490              :     /// Checks if the batch is empty
     491              :     ///
     492              :     /// A batch is empty when it contains no serialized values.
     493              :     /// Note that it may still contain observed values.
     494      4950033 :     pub fn is_empty(&self) -> bool {
     495      4950033 :         let empty = self.raw.is_empty();
     496      4950033 : 
     497      4950033 :         if cfg!(debug_assertions) && empty {
     498          202 :             assert!(self
     499          202 :                 .metadata
     500          202 :                 .iter()
     501          202 :                 .all(|meta| matches!(meta, ValueMeta::Observed(_))));
     502      4949831 :         }
     503              : 
     504      4950033 :         empty
     505      4950033 :     }
     506              : 
     507              :     /// Returns the number of values serialized in the batch
     508       145656 :     pub fn len(&self) -> usize {
     509       145656 :         self.len
     510       145656 :     }
     511              : 
     512              :     /// Returns the size of the buffer wrapped by the batch
     513      4804206 :     pub fn buffer_size(&self) -> usize {
     514      4804206 :         self.raw.len()
     515      4804206 :     }
     516              : 
     517            0 :     pub fn updates_key(&self, key: &Key) -> bool {
     518            0 :         self.metadata.iter().any(|meta| match meta {
     519            0 :             ValueMeta::Serialized(ser) => key.to_compact() == ser.key,
     520            0 :             ValueMeta::Observed(_) => false,
     521            0 :         })
     522            0 :     }
     523              : 
     524      5352247 :     pub fn validate_lsn_order(&self) {
     525              :         use std::collections::HashMap;
     526              : 
     527      5352247 :         let mut last_seen_lsn_per_key: HashMap<CompactKey, Lsn> = HashMap::default();
     528              : 
     529     28399915 :         for meta in self.metadata.iter() {
     530     28399915 :             let lsn = meta.lsn();
     531     28399915 :             let key = meta.key();
     532              : 
     533     28399915 :             if let Some(prev_lsn) = last_seen_lsn_per_key.insert(key, lsn) {
     534           15 :                 assert!(
     535           15 :                     lsn >= prev_lsn,
     536            0 :                     "Ordering violated by {}: {} < {}",
     537            0 :                     Key::from_compact(key),
     538              :                     lsn,
     539              :                     prev_lsn
     540              :                 );
     541     28399900 :             }
     542              :         }
     543      5352247 :     }
     544              : }
     545              : 
     546              : #[cfg(all(test, feature = "testing"))]
     547              : mod tests {
     548              :     use super::*;
     549              : 
     550            6 :     fn validate_batch(
     551            6 :         batch: &SerializedValueBatch,
     552            6 :         values: &[(CompactKey, Lsn, usize, Value)],
     553            6 :         gaps: Option<&Vec<(KeySpace, Lsn)>>,
     554            6 :     ) {
     555              :         // Invariant 1: The metadata for a given entry in the batch
     556              :         // is correct and can be used to deserialize back to the original value.
     557           28 :         for (key, lsn, size, value) in values.iter() {
     558           28 :             let meta = batch
     559           28 :                 .metadata
     560           28 :                 .iter()
     561          136 :                 .find(|meta| (meta.key(), meta.lsn()) == (*key, *lsn))
     562           28 :                 .unwrap();
     563           28 :             let meta = match meta {
     564           28 :                 ValueMeta::Serialized(ser) => ser,
     565            0 :                 ValueMeta::Observed(_) => unreachable!(),
     566              :             };
     567              : 
     568           28 :             assert_eq!(meta.len, *size);
     569           28 :             assert_eq!(meta.will_init, value.will_init());
     570              : 
     571           28 :             let start = meta.batch_offset as usize;
     572           28 :             let end = meta.batch_offset as usize + meta.len;
     573           28 :             let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     574           28 :             assert_eq!(&value_from_batch, value);
     575              :         }
     576              : 
     577           28 :         let mut expected_buffer_size: usize = values.iter().map(|(_, _, size, _)| size).sum();
     578            6 :         let mut gap_pages_count: usize = 0;
     579              : 
     580              :         // Invariant 2: Zero pages were added for identified gaps and their metadata
     581              :         // is correct.
     582            6 :         if let Some(gaps) = gaps {
     583            3 :             for (gap_keyspace, lsn) in gaps {
     584            5 :                 for gap_range in &gap_keyspace.ranges {
     585            3 :                     let mut gap_key = gap_range.start;
     586           13 :                     while gap_key != gap_range.end {
     587           10 :                         let meta = batch
     588           10 :                             .metadata
     589           10 :                             .iter()
     590          104 :                             .find(|meta| (meta.key(), meta.lsn()) == (gap_key.to_compact(), *lsn))
     591           10 :                             .unwrap();
     592           10 :                         let meta = match meta {
     593           10 :                             ValueMeta::Serialized(ser) => ser,
     594            0 :                             ValueMeta::Observed(_) => unreachable!(),
     595              :                         };
     596              : 
     597           10 :                         let zero_value = Value::Image(ZERO_PAGE.clone());
     598           10 :                         let zero_value_size = zero_value.serialized_size().unwrap() as usize;
     599           10 : 
     600           10 :                         assert_eq!(meta.len, zero_value_size);
     601           10 :                         assert_eq!(meta.will_init, zero_value.will_init());
     602              : 
     603           10 :                         let start = meta.batch_offset as usize;
     604           10 :                         let end = meta.batch_offset as usize + meta.len;
     605           10 :                         let value_from_batch = Value::des(&batch.raw[start..end]).unwrap();
     606           10 :                         assert_eq!(value_from_batch, zero_value);
     607              : 
     608           10 :                         gap_pages_count += 1;
     609           10 :                         expected_buffer_size += zero_value_size;
     610           10 :                         gap_key = gap_key.next();
     611              :                     }
     612              :                 }
     613              :             }
     614            5 :         }
     615              : 
     616              :         // Invariant 3: The length of the batch is equal to the number
     617              :         // of values inserted, plus the number of gap pages. This extends
     618              :         // to the raw buffer size.
     619            6 :         assert_eq!(batch.len(), values.len() + gap_pages_count);
     620            6 :         assert_eq!(expected_buffer_size, batch.buffer_size());
     621              : 
     622              :         // Invariant 4: Metadata entries for any given key are sorted in LSN order.
     623            6 :         batch.validate_lsn_order();
     624            6 :     }
     625              : 
     626              :     #[test]
     627            1 :     fn test_creation_from_values() {
     628              :         const LSN: Lsn = Lsn(0x10);
     629            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     630            1 : 
     631            1 :         let values = vec![
     632            1 :             (
     633            1 :                 key.to_compact(),
     634            1 :                 LSN,
     635            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     636            1 :             ),
     637            1 :             (
     638            1 :                 key.next().to_compact(),
     639            1 :                 LSN,
     640            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     641            1 :             ),
     642            1 :             (
     643            1 :                 key.to_compact(),
     644            1 :                 Lsn(LSN.0 + 0x10),
     645            1 :                 Value::WalRecord(NeonWalRecord::wal_append("baz")),
     646            1 :             ),
     647            1 :             (
     648            1 :                 key.next().next().to_compact(),
     649            1 :                 LSN,
     650            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     651            1 :             ),
     652            1 :         ];
     653            1 : 
     654            1 :         let values = values
     655            1 :             .into_iter()
     656            4 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     657            1 :             .collect::<Vec<_>>();
     658            1 :         let batch = SerializedValueBatch::from_values(values.clone());
     659            1 : 
     660            1 :         validate_batch(&batch, &values, None);
     661            1 : 
     662            1 :         assert!(!batch.is_empty());
     663            1 :     }
     664              : 
     665              :     #[test]
     666            1 :     fn test_put() {
     667              :         const LSN: Lsn = Lsn(0x10);
     668            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     669            1 : 
     670            1 :         let values = vec![
     671            1 :             (
     672            1 :                 key.to_compact(),
     673            1 :                 LSN,
     674            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     675            1 :             ),
     676            1 :             (
     677            1 :                 key.next().to_compact(),
     678            1 :                 LSN,
     679            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     680            1 :             ),
     681            1 :         ];
     682            1 : 
     683            1 :         let mut values = values
     684            1 :             .into_iter()
     685            2 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     686            1 :             .collect::<Vec<_>>();
     687            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     688            1 : 
     689            1 :         validate_batch(&batch, &values, None);
     690            1 : 
     691            1 :         let value = (
     692            1 :             key.to_compact(),
     693            1 :             Lsn(LSN.0 + 0x10),
     694            1 :             Value::WalRecord(NeonWalRecord::wal_append("baz")),
     695            1 :         );
     696            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     697            1 :         let value = (value.0, value.1, serialized_size, value.2);
     698            1 :         values.push(value.clone());
     699            1 :         batch.put(value.0, value.3, value.1);
     700            1 : 
     701            1 :         validate_batch(&batch, &values, None);
     702            1 : 
     703            1 :         let value = (
     704            1 :             key.next().next().to_compact(),
     705            1 :             LSN,
     706            1 :             Value::WalRecord(NeonWalRecord::wal_append("taz")),
     707            1 :         );
     708            1 :         let serialized_size = value.2.serialized_size().unwrap() as usize;
     709            1 :         let value = (value.0, value.1, serialized_size, value.2);
     710            1 :         values.push(value.clone());
     711            1 :         batch.put(value.0, value.3, value.1);
     712            1 : 
     713            1 :         validate_batch(&batch, &values, None);
     714            1 :     }
     715              : 
     716              :     #[test]
     717            1 :     fn test_extension() {
     718              :         const LSN: Lsn = Lsn(0x10);
     719            1 :         let key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     720            1 : 
     721            1 :         let values = vec![
     722            1 :             (
     723            1 :                 key.to_compact(),
     724            1 :                 LSN,
     725            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     726            1 :             ),
     727            1 :             (
     728            1 :                 key.next().to_compact(),
     729            1 :                 LSN,
     730            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     731            1 :             ),
     732            1 :             (
     733            1 :                 key.next().next().to_compact(),
     734            1 :                 LSN,
     735            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     736            1 :             ),
     737            1 :         ];
     738            1 : 
     739            1 :         let mut values = values
     740            1 :             .into_iter()
     741            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     742            1 :             .collect::<Vec<_>>();
     743            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     744            1 : 
     745            1 :         let other_values = vec![
     746            1 :             (
     747            1 :                 key.to_compact(),
     748            1 :                 Lsn(LSN.0 + 0x10),
     749            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo")),
     750            1 :             ),
     751            1 :             (
     752            1 :                 key.next().to_compact(),
     753            1 :                 Lsn(LSN.0 + 0x10),
     754            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar")),
     755            1 :             ),
     756            1 :             (
     757            1 :                 key.next().next().to_compact(),
     758            1 :                 Lsn(LSN.0 + 0x10),
     759            1 :                 Value::WalRecord(NeonWalRecord::wal_append("taz")),
     760            1 :             ),
     761            1 :         ];
     762            1 : 
     763            1 :         let other_values = other_values
     764            1 :             .into_iter()
     765            3 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     766            1 :             .collect::<Vec<_>>();
     767            1 :         let other_batch = SerializedValueBatch::from_values(other_values.clone());
     768            1 : 
     769            1 :         values.extend(other_values);
     770            1 :         batch.extend(other_batch);
     771            1 : 
     772            1 :         validate_batch(&batch, &values, None);
     773            1 :     }
     774              : 
     775              :     #[test]
     776            1 :     fn test_gap_zeroing() {
     777              :         const LSN: Lsn = Lsn(0x10);
     778            1 :         let rel_foo_base_key = Key::from_hex("110000000033333333444444445500000001").unwrap();
     779            1 : 
     780            1 :         let rel_bar_base_key = {
     781            1 :             let mut key = rel_foo_base_key;
     782            1 :             key.field4 += 1;
     783            1 :             key
     784            1 :         };
     785            1 : 
     786            1 :         let values = vec![
     787            1 :             (
     788            1 :                 rel_foo_base_key.to_compact(),
     789            1 :                 LSN,
     790            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo1")),
     791            1 :             ),
     792            1 :             (
     793            1 :                 rel_foo_base_key.add(1).to_compact(),
     794            1 :                 LSN,
     795            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo2")),
     796            1 :             ),
     797            1 :             (
     798            1 :                 rel_foo_base_key.add(5).to_compact(),
     799            1 :                 LSN,
     800            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo3")),
     801            1 :             ),
     802            1 :             (
     803            1 :                 rel_foo_base_key.add(1).to_compact(),
     804            1 :                 Lsn(LSN.0 + 0x10),
     805            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo4")),
     806            1 :             ),
     807            1 :             (
     808            1 :                 rel_foo_base_key.add(10).to_compact(),
     809            1 :                 Lsn(LSN.0 + 0x10),
     810            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo5")),
     811            1 :             ),
     812            1 :             (
     813            1 :                 rel_foo_base_key.add(11).to_compact(),
     814            1 :                 Lsn(LSN.0 + 0x10),
     815            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo6")),
     816            1 :             ),
     817            1 :             (
     818            1 :                 rel_foo_base_key.add(12).to_compact(),
     819            1 :                 Lsn(LSN.0 + 0x10),
     820            1 :                 Value::WalRecord(NeonWalRecord::wal_append("foo7")),
     821            1 :             ),
     822            1 :             (
     823            1 :                 rel_bar_base_key.to_compact(),
     824            1 :                 LSN,
     825            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar1")),
     826            1 :             ),
     827            1 :             (
     828            1 :                 rel_bar_base_key.add(4).to_compact(),
     829            1 :                 LSN,
     830            1 :                 Value::WalRecord(NeonWalRecord::wal_append("bar2")),
     831            1 :             ),
     832            1 :         ];
     833            1 : 
     834            1 :         let values = values
     835            1 :             .into_iter()
     836            9 :             .map(|(key, lsn, value)| (key, lsn, value.serialized_size().unwrap() as usize, value))
     837            1 :             .collect::<Vec<_>>();
     838            1 : 
     839            1 :         let mut batch = SerializedValueBatch::from_values(values.clone());
     840            1 : 
     841            1 :         let gaps = vec![
     842            1 :             (
     843            1 :                 KeySpace {
     844            1 :                     ranges: vec![
     845            1 :                         rel_foo_base_key.add(2)..rel_foo_base_key.add(5),
     846            1 :                         rel_bar_base_key.add(1)..rel_bar_base_key.add(4),
     847            1 :                     ],
     848            1 :                 },
     849            1 :                 LSN,
     850            1 :             ),
     851            1 :             (
     852            1 :                 KeySpace {
     853            1 :                     ranges: vec![rel_foo_base_key.add(6)..rel_foo_base_key.add(10)],
     854            1 :                 },
     855            1 :                 Lsn(LSN.0 + 0x10),
     856            1 :             ),
     857            1 :         ];
     858            1 : 
     859            1 :         batch.zero_gaps(gaps.clone());
     860            1 :         validate_batch(&batch, &values, Some(&gaps));
     861            1 :     }
     862              : }
        

Generated by: LCOV version 2.1-beta