LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 88.4 % 541 478
Test Date: 2024-10-22 22:13:45 Functions: 77.6 % 49 38

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64};
       8              : use crate::config::PageServerConf;
       9              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      10              : use crate::repository::{Key, Value};
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::timeline::GetVectoredError;
      13              : use crate::tenant::PageReconstructError;
      14              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      15              : use crate::{l0_flush, page_cache};
      16              : use anyhow::{anyhow, Context, Result};
      17              : use camino::Utf8PathBuf;
      18              : use pageserver_api::key::CompactKey;
      19              : use pageserver_api::keyspace::KeySpace;
      20              : use pageserver_api::models::InMemoryLayerInfo;
      21              : use pageserver_api::shard::TenantShardId;
      22              : use std::collections::{BTreeMap, HashMap};
      23              : use std::sync::{Arc, OnceLock};
      24              : use std::time::Instant;
      25              : use tracing::*;
      26              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      27              : // avoid binding to Write (conflicts with std::io::Write)
      28              : // while being able to use std::fmt::Write's methods
      29              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      30              : use std::cmp::Ordering;
      31              : use std::fmt::Write;
      32              : use std::ops::Range;
      33              : use std::sync::atomic::Ordering as AtomicOrdering;
      34              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      35              : use tokio::sync::RwLock;
      36              : 
      37              : use super::{
      38              :     DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
      39              : };
      40              : 
      41              : pub(crate) mod vectored_dio_read;
      42              : 
      43              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      44              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      45              : 
      46              : pub struct InMemoryLayer {
      47              :     conf: &'static PageServerConf,
      48              :     tenant_shard_id: TenantShardId,
      49              :     timeline_id: TimelineId,
      50              :     file_id: InMemoryLayerFileId,
      51              : 
      52              :     /// This layer contains all the changes from 'start_lsn'. The
      53              :     /// start is inclusive.
      54              :     start_lsn: Lsn,
      55              : 
      56              :     /// Frozen layers have an exclusive end LSN.
      57              :     /// Writes are only allowed when this is `None`.
      58              :     pub(crate) end_lsn: OnceLock<Lsn>,
      59              : 
      60              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      61              :     frozen_local_path_str: OnceLock<Arc<str>>,
      62              : 
      63              :     opened_at: Instant,
      64              : 
      65              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      66              :     /// All other changing parts are in `inner`, and protected by a mutex.
      67              :     inner: RwLock<InMemoryLayerInner>,
      68              : }
      69              : 
      70              : impl std::fmt::Debug for InMemoryLayer {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         f.debug_struct("InMemoryLayer")
      73            0 :             .field("start_lsn", &self.start_lsn)
      74            0 :             .field("end_lsn", &self.end_lsn)
      75            0 :             .field("inner", &self.inner)
      76            0 :             .finish()
      77            0 :     }
      78              : }
      79              : 
      80              : pub struct InMemoryLayerInner {
      81              :     /// All versions of all pages in the layer are kept here. Indexed
      82              :     /// by block number and LSN. The [`IndexEntry`] is an offset into the
      83              :     /// ephemeral file where the page version is stored.
      84              :     index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
      85              : 
      86              :     /// The values are stored in a serialized format in this file.
      87              :     /// Each serialized Value is preceded by a 'u32' length field.
      88              :     /// PerSeg::page_versions map stores offsets into this file.
      89              :     file: EphemeralFile,
      90              : 
      91              :     resource_units: GlobalResourceUnits,
      92              : }
      93              : 
      94              : /// Support the same max blob length as blob_io, because ultimately
      95              : /// all the InMemoryLayer contents end up being written into a delta layer,
      96              : /// using the [`crate::tenant::blob_io`].
      97              : const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN;
      98              : const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
      99              :     let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize;
     100              :     let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize;
     101              :     assert!(trailing_ones + leading_zeroes == std::mem::size_of::<usize>() * 8);
     102              :     trailing_ones
     103              : };
     104              : 
     105              : /// See [`InMemoryLayerInner::index`].
     106              : ///
     107              : /// For memory efficiency, the data is packed into a u64.
     108              : ///
     109              : /// Layout:
     110              : /// - 1 bit: `will_init`
     111              : /// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
     112              : /// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
     113              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     114              : pub struct IndexEntry(u64);
     115              : 
     116              : impl IndexEntry {
     117              :     /// See [`Self::MAX_SUPPORTED_POS`].
     118              :     const MAX_SUPPORTED_POS_BITS: usize = {
     119              :         let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS;
     120              :         if remainder < 32 {
     121              :             panic!("pos can be u32 as per type system, support that");
     122              :         }
     123              :         remainder
     124              :     };
     125              :     /// The maximum supported blob offset that can be represented by [`Self`].
     126              :     /// See also [`Self::validate_checkpoint_distance`].
     127              :     const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1;
     128              : 
     129              :     // Layout
     130              :     const WILL_INIT_RANGE: Range<usize> = 0..1;
     131              :     const LEN_RANGE: Range<usize> =
     132              :         Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS;
     133              :     const POS_RANGE: Range<usize> =
     134              :         Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS;
     135              :     const _ASSERT: () = {
     136              :         if Self::POS_RANGE.end != 64 {
     137              :             panic!("we don't want undefined bits for our own sanity")
     138              :         }
     139              :     };
     140              : 
     141              :     /// Fails if and only if the offset or length encoded in `arg` is too large to be represented by [`Self`].
     142              :     ///
     143              :     /// The only reason why that can happen in the system is if the [`InMemoryLayer`] grows too long.
     144              :     /// The [`InMemoryLayer`] size is determined by the checkpoint distance, enforced by [`crate::tenant::Timeline::should_roll`].
     145              :     ///
     146              :     /// Thus, to avoid failure of this function, whenever we start up and/or change checkpoint distance,
     147              :     /// call [`Self::validate_checkpoint_distance`] with the new checkpoint distance value.
     148              :     ///
     149              :     /// TODO: this check should happen ideally at config parsing time (and in the request handler when a change to checkpoint distance is requested)
     150              :     /// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer.
     151              :     #[inline(always)]
     152     10180942 :     fn new(arg: IndexEntryNewArgs) -> anyhow::Result<Self> {
     153     10180942 :         let IndexEntryNewArgs {
     154     10180942 :             base_offset,
     155     10180942 :             batch_offset,
     156     10180942 :             len,
     157     10180942 :             will_init,
     158     10180942 :         } = arg;
     159              : 
     160     10180942 :         let pos = base_offset
     161     10180942 :             .checked_add(batch_offset)
     162     10180942 :             .ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?;
     163              : 
     164     10180942 :         if pos.into_usize() > Self::MAX_SUPPORTED_POS {
     165            8 :             anyhow::bail!(
     166            8 :                 "base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}",
     167            8 :                 max = Self::MAX_SUPPORTED_POS
     168            8 :             );
     169     10180934 :         }
     170     10180934 : 
     171     10180934 :         if len > MAX_SUPPORTED_BLOB_LEN {
     172            2 :             anyhow::bail!(
     173            2 :                 "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
     174            2 :             );
     175     10180932 :         }
     176     10180932 : 
     177     10180932 :         let mut data: u64 = 0;
     178              :         use bit_field::BitField;
     179     10180932 :         data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
     180     10180932 :         data.set_bits(Self::LEN_RANGE, len.into_u64());
     181     10180932 :         data.set_bits(Self::POS_RANGE, pos);
     182     10180932 : 
     183     10180932 :         Ok(Self(data))
     184     10180942 :     }
     185              : 
     186              :     #[inline(always)]
     187      9974917 :     fn unpack(&self) -> IndexEntryUnpacked {
     188              :         use bit_field::BitField;
     189      9974917 :         IndexEntryUnpacked {
     190      9974917 :             will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
     191      9974917 :             len: self.0.get_bits(Self::LEN_RANGE),
     192      9974917 :             pos: self.0.get_bits(Self::POS_RANGE),
     193      9974917 :         }
     194      9974917 :     }
     195              : 
     196              :     /// See [`Self::new`].
     197          204 :     pub(crate) const fn validate_checkpoint_distance(
     198          204 :         checkpoint_distance: u64,
     199          204 :     ) -> Result<(), &'static str> {
     200          204 :         if checkpoint_distance > Self::MAX_SUPPORTED_POS as u64 {
     201            0 :             return Err("exceeds the maximum supported value");
     202          204 :         }
     203          204 :         let res = u64_to_usize(checkpoint_distance).checked_add(MAX_SUPPORTED_BLOB_LEN);
     204          204 :         if res.is_none() {
     205            0 :             return Err(
     206            0 :                 "checkpoint distance + max supported blob len overflows in-memory addition",
     207            0 :             );
     208          204 :         }
     209          204 : 
     210          204 :         // NB: it is ok for the result of the addition to be larger than MAX_SUPPORTED_POS
     211          204 : 
     212          204 :         Ok(())
     213          204 :     }
     214              : 
     215              :     const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
     216              :         let res = Self::validate_checkpoint_distance(
     217              :             pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
     218              :         );
     219              :         if res.is_err() {
     220              :             panic!("default checkpoint distance is valid")
     221              :         }
     222              :     };
     223              : }
     224              : 
     225              : /// Args to [`IndexEntry::new`].
     226              : #[derive(Clone, Copy)]
     227              : struct IndexEntryNewArgs {
     228              :     base_offset: u64,
     229              :     batch_offset: u64,
     230              :     len: usize,
     231              :     will_init: bool,
     232              : }
     233              : 
     234              : /// Unpacked representation of the bitfielded [`IndexEntry`].
     235              : #[derive(Clone, Copy, PartialEq, Eq, Debug)]
     236              : struct IndexEntryUnpacked {
     237              :     will_init: bool,
     238              :     len: u64,
     239              :     pos: u64,
     240              : }
     241              : 
     242              : impl std::fmt::Debug for InMemoryLayerInner {
     243            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     244            0 :         f.debug_struct("InMemoryLayerInner").finish()
     245            0 :     }
     246              : }
     247              : 
     248              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     249              : /// to minimize contention.
     250              : ///
     251              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     252              : /// rolling layers proactively to limit the total amount of dirty data.
     253              : pub(crate) struct GlobalResources {
     254              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     255              :     // Zero means unlimited.
     256              :     pub(crate) max_dirty_bytes: AtomicU64,
     257              :     // How many bytes are in all EphemeralFile objects
     258              :     dirty_bytes: AtomicU64,
     259              :     // How many layers are contributing to dirty_bytes
     260              :     dirty_layers: AtomicUsize,
     261              : }
     262              : 
     263              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     264              : struct GlobalResourceUnits {
     265              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     266              :     // for decrementing the global counter by this many bytes when dropped.
     267              :     dirty_bytes: u64,
     268              : }
     269              : 
     270              : impl GlobalResourceUnits {
     271              :     // Hint for the layer append path to update us when the layer size differs from the last
     272              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     273              :     // updated when the Timeline "ticks" in the background.
     274              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     275              : 
     276         1262 :     fn new() -> Self {
     277         1262 :         GLOBAL_RESOURCES
     278         1262 :             .dirty_layers
     279         1262 :             .fetch_add(1, AtomicOrdering::Relaxed);
     280         1262 :         Self { dirty_bytes: 0 }
     281         1262 :     }
     282              : 
     283              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     284              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     285              :     ///
     286              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     287              :     /// the total number of dirty bytes below the configured maximum.
     288         1148 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     289         1148 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     290         1138 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     291              :             Ordering::Greater => {
     292            8 :                 let delta = size - self.dirty_bytes;
     293            8 :                 let old = GLOBAL_RESOURCES
     294            8 :                     .dirty_bytes
     295            8 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     296            8 :                 old + delta
     297              :             }
     298              :             Ordering::Less => {
     299            2 :                 let delta = self.dirty_bytes - size;
     300            2 :                 let old = GLOBAL_RESOURCES
     301            2 :                     .dirty_bytes
     302            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     303            2 :                 old - delta
     304              :             }
     305              :         };
     306              : 
     307              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     308              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     309              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     310              :         // be literally the last update.
     311         1148 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     312         1148 : 
     313         1148 :         self.dirty_bytes = size;
     314         1148 : 
     315         1148 :         let max_dirty_bytes = GLOBAL_RESOURCES
     316         1148 :             .max_dirty_bytes
     317         1148 :             .load(AtomicOrdering::Relaxed);
     318         1148 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     319              :             // Set the layer file limit to the average layer size: this implies that all above-average
     320              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     321              :             // next enter publish_size.
     322            0 :             Some(
     323            0 :                 new_global_dirty_bytes
     324            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     325            0 :             )
     326              :         } else {
     327         1148 :             None
     328              :         }
     329         1148 :     }
     330              : 
     331              :     // Call publish_size if the input size differs from last published size by more than
     332              :     // the drift limit
     333      4804196 :     fn maybe_publish_size(&mut self, size: u64) {
     334      4804196 :         let publish = match size.cmp(&self.dirty_bytes) {
     335            0 :             Ordering::Equal => false,
     336      4804196 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     337            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     338              :         };
     339              : 
     340      4804196 :         if publish {
     341            8 :             self.publish_size(size);
     342      4804188 :         }
     343      4804196 :     }
     344              : }
     345              : 
     346              : impl Drop for GlobalResourceUnits {
     347         1140 :     fn drop(&mut self) {
     348         1140 :         GLOBAL_RESOURCES
     349         1140 :             .dirty_layers
     350         1140 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     351         1140 : 
     352         1140 :         // Subtract our contribution to the global total dirty bytes
     353         1140 :         self.publish_size(0);
     354         1140 :     }
     355              : }
     356              : 
     357              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     358              :     max_dirty_bytes: AtomicU64::new(0),
     359              :     dirty_bytes: AtomicU64::new(0),
     360              :     dirty_layers: AtomicUsize::new(0),
     361              : };
     362              : 
     363              : impl InMemoryLayer {
     364       606250 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     365       606250 :         self.file_id
     366       606250 :     }
     367              : 
     368         1138 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     369         1138 :         self.timeline_id
     370         1138 :     }
     371              : 
     372            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     373            0 :         let lsn_start = self.start_lsn;
     374              : 
     375            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     376            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     377              :         } else {
     378            0 :             InMemoryLayerInfo::Open { lsn_start }
     379              :         }
     380            0 :     }
     381              : 
     382            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     383            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     384            0 :     }
     385              : 
     386      4804196 :     pub(crate) fn assert_writable(&self) {
     387      4804196 :         assert!(self.end_lsn.get().is_none());
     388      4804196 :     }
     389              : 
     390      1521172 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     391      1521172 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     392      1521172 :     }
     393              : 
     394      1520034 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     395      1520034 :         self.start_lsn..self.end_lsn_or_max()
     396      1520034 :     }
     397              : 
     398              :     /// debugging function to print out the contents of the layer
     399              :     ///
     400              :     /// this is likely completly unused
     401            0 :     pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
     402            0 :         let end_str = self.end_lsn_or_max();
     403            0 : 
     404            0 :         println!(
     405            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     406            0 :             self.timeline_id, self.start_lsn, end_str,
     407            0 :         );
     408            0 : 
     409            0 :         Ok(())
     410            0 :     }
     411              : 
     412              :     // Look up the keys in the provided keyspace and update
     413              :     // the reconstruct state with whatever is found.
     414              :     //
     415              :     // If the key is cached, go no further than the cached Lsn.
     416       606250 :     pub(crate) async fn get_values_reconstruct_data(
     417       606250 :         &self,
     418       606250 :         keyspace: KeySpace,
     419       606250 :         end_lsn: Lsn,
     420       606250 :         reconstruct_state: &mut ValuesReconstructState,
     421       606250 :         ctx: &RequestContext,
     422       606250 :     ) -> Result<(), GetVectoredError> {
     423       606250 :         let ctx = RequestContextBuilder::extend(ctx)
     424       606250 :             .page_content_kind(PageContentKind::InMemoryLayer)
     425       606250 :             .build();
     426              : 
     427       606250 :         let inner = self.inner.read().await;
     428              : 
     429              :         struct ValueRead {
     430              :             entry_lsn: Lsn,
     431              :             read: vectored_dio_read::LogicalRead<Vec<u8>>,
     432              :         }
     433       606250 :         let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
     434              : 
     435       606250 :         for range in keyspace.ranges.iter() {
     436       606250 :             for (key, vec_map) in inner
     437       606250 :                 .index
     438       606250 :                 .range(range.start.to_compact()..range.end.to_compact())
     439              :             {
     440       498919 :                 let key = Key::from_compact(*key);
     441       498919 :                 let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     442            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     443       498919 :                     None => self.start_lsn..end_lsn,
     444              :                 };
     445              : 
     446       498919 :                 let slice = vec_map.slice_range(lsn_range);
     447              : 
     448       498919 :                 for (entry_lsn, index_entry) in slice.iter().rev() {
     449              :                     let IndexEntryUnpacked {
     450       498915 :                         pos,
     451       498915 :                         len,
     452       498915 :                         will_init,
     453       498915 :                     } = index_entry.unpack();
     454       498915 :                     reads.entry(key).or_default().push(ValueRead {
     455       498915 :                         entry_lsn: *entry_lsn,
     456       498915 :                         read: vectored_dio_read::LogicalRead::new(
     457       498915 :                             pos,
     458       498915 :                             Vec::with_capacity(len as usize),
     459       498915 :                         ),
     460       498915 :                     });
     461       498915 :                     if will_init {
     462       498915 :                         break;
     463            0 :                     }
     464              :                 }
     465              :             }
     466              :         }
     467              : 
     468              :         // Execute the reads.
     469              : 
     470       606250 :         let f = vectored_dio_read::execute(
     471       606250 :             &inner.file,
     472       606250 :             reads
     473       606250 :                 .iter()
     474       606250 :                 .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
     475       606250 :             &ctx,
     476       606250 :         );
     477       606250 :         send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
     478        85692 :             .await;
     479              : 
     480              :         // Process results into the reconstruct state
     481      1105165 :         'next_key: for (key, value_reads) in reads {
     482       498915 :             for ValueRead { entry_lsn, read } in value_reads {
     483       498915 :                 match read.into_result().expect("we run execute() above") {
     484            0 :                     Err(e) => {
     485            0 :                         reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
     486            0 :                         continue 'next_key;
     487              :                     }
     488       498915 :                     Ok(value_buf) => {
     489       498915 :                         let value = Value::des(&value_buf);
     490       498915 :                         if let Err(e) = value {
     491            0 :                             reconstruct_state
     492            0 :                                 .on_key_error(key, PageReconstructError::from(anyhow!(e)));
     493            0 :                             continue 'next_key;
     494       498915 :                         }
     495       498915 : 
     496       498915 :                         let key_situation =
     497       498915 :                             reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
     498       498915 :                         if key_situation == ValueReconstructSituation::Complete {
     499              :                             // TODO: metric to see if we fetched more values than necessary
     500       498915 :                             continue 'next_key;
     501            0 :                         }
     502              : 
     503              :                         // process the next value in the next iteration of the loop
     504              :                     }
     505              :                 }
     506              :             }
     507              :         }
     508              : 
     509       606250 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     510       606250 : 
     511       606250 :         Ok(())
     512       606250 :     }
     513              : }
     514              : 
     515              : /// Offset of a particular Value within a serialized batch.
     516              : struct SerializedBatchOffset {
     517              :     key: CompactKey,
     518              :     lsn: Lsn,
     519              :     // TODO: separate type when we start serde-serializing this value, to avoid coupling
     520              :     // in-memory representation to serialization format.
     521              :     index_entry: IndexEntry,
     522              : }
     523              : 
     524              : pub struct SerializedBatch {
     525              :     /// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`].
     526              :     pub(crate) raw: Vec<u8>,
     527              : 
     528              :     /// Index of values in [`Self::raw`], using offsets relative to the start of the buffer.
     529              :     offsets: Vec<SerializedBatchOffset>,
     530              : 
     531              :     /// The highest LSN of any value in the batch
     532              :     pub(crate) max_lsn: Lsn,
     533              : }
     534              : 
     535              : impl SerializedBatch {
     536      4804196 :     pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> anyhow::Result<Self> {
     537      4804196 :         // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
     538      4804196 :         // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
     539      5090446 :         let buffer_size = batch.iter().map(|i| i.2).sum::<usize>();
     540      4804196 :         let mut cursor = std::io::Cursor::new(Vec::<u8>::with_capacity(buffer_size));
     541      4804196 : 
     542      4804196 :         let mut offsets: Vec<SerializedBatchOffset> = Vec::with_capacity(batch.len());
     543      4804196 :         let mut max_lsn: Lsn = Lsn(0);
     544      9894642 :         for (key, lsn, val_ser_size, val) in batch {
     545      5090446 :             let relative_off = cursor.position();
     546      5090446 : 
     547      5090446 :             val.ser_into(&mut cursor)
     548      5090446 :                 .expect("Writing into in-memory buffer is infallible");
     549      5090446 : 
     550      5090446 :             offsets.push(SerializedBatchOffset {
     551      5090446 :                 key,
     552      5090446 :                 lsn,
     553      5090446 :                 index_entry: IndexEntry::new(IndexEntryNewArgs {
     554      5090446 :                     base_offset: 0,
     555      5090446 :                     batch_offset: relative_off,
     556      5090446 :                     len: val_ser_size,
     557      5090446 :                     will_init: val.will_init(),
     558      5090446 :                 })
     559      5090446 :                 .context("higher-level code ensures that values are within supported ranges")?,
     560              :             });
     561      5090446 :             max_lsn = std::cmp::max(max_lsn, lsn);
     562              :         }
     563              : 
     564      4804196 :         let buffer = cursor.into_inner();
     565      4804196 : 
     566      4804196 :         // Assert that we didn't do any extra allocations while building buffer.
     567      4804196 :         debug_assert!(buffer.len() <= buffer_size);
     568              : 
     569      4804196 :         Ok(Self {
     570      4804196 :             raw: buffer,
     571      4804196 :             offsets,
     572      4804196 :             max_lsn,
     573      4804196 :         })
     574      4804196 :     }
     575              : }
     576              : 
     577         2276 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     578         2276 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     579         2276 : }
     580              : 
     581         1138 : fn inmem_layer_log_display(
     582         1138 :     mut f: impl Write,
     583         1138 :     timeline: TimelineId,
     584         1138 :     start_lsn: Lsn,
     585         1138 :     end_lsn: Lsn,
     586         1138 : ) -> std::fmt::Result {
     587         1138 :     write!(f, "timeline {} in-memory ", timeline)?;
     588         1138 :     inmem_layer_display(f, start_lsn, end_lsn)
     589         1138 : }
     590              : 
     591              : impl std::fmt::Display for InMemoryLayer {
     592         1138 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     593         1138 :         let end_lsn = self.end_lsn_or_max();
     594         1138 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     595         1138 :     }
     596              : }
     597              : 
     598              : impl InMemoryLayer {
     599              :     /// Get layer size.
     600         1262 :     pub async fn size(&self) -> Result<u64> {
     601         1262 :         let inner = self.inner.read().await;
     602         1262 :         Ok(inner.file.len())
     603         1262 :     }
     604              : 
     605              :     /// Create a new, empty, in-memory layer
     606         1262 :     pub async fn create(
     607         1262 :         conf: &'static PageServerConf,
     608         1262 :         timeline_id: TimelineId,
     609         1262 :         tenant_shard_id: TenantShardId,
     610         1262 :         start_lsn: Lsn,
     611         1262 :         gate_guard: utils::sync::gate::GateGuard,
     612         1262 :         ctx: &RequestContext,
     613         1262 :     ) -> Result<InMemoryLayer> {
     614         1262 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     615              : 
     616         1262 :         let file =
     617         1262 :             EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
     618         1262 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     619         1262 : 
     620         1262 :         Ok(InMemoryLayer {
     621         1262 :             file_id: key,
     622         1262 :             frozen_local_path_str: OnceLock::new(),
     623         1262 :             conf,
     624         1262 :             timeline_id,
     625         1262 :             tenant_shard_id,
     626         1262 :             start_lsn,
     627         1262 :             end_lsn: OnceLock::new(),
     628         1262 :             opened_at: Instant::now(),
     629         1262 :             inner: RwLock::new(InMemoryLayerInner {
     630         1262 :                 index: BTreeMap::new(),
     631         1262 :                 file,
     632         1262 :                 resource_units: GlobalResourceUnits::new(),
     633         1262 :             }),
     634         1262 :         })
     635         1262 :     }
     636              : 
     637              :     /// Write path.
     638              :     ///
     639              :     /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
     640              :     /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
     641              :     /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
     642      4804196 :     pub async fn put_batch(
     643      4804196 :         &self,
     644      4804196 :         serialized_batch: SerializedBatch,
     645      4804196 :         ctx: &RequestContext,
     646      4804196 :     ) -> anyhow::Result<()> {
     647      4804196 :         let mut inner = self.inner.write().await;
     648      4804196 :         self.assert_writable();
     649      4804196 : 
     650      4804196 :         let base_offset = inner.file.len();
     651      4804196 : 
     652      4804196 :         let SerializedBatch {
     653      4804196 :             raw,
     654      4804196 :             mut offsets,
     655      4804196 :             max_lsn: _,
     656      4804196 :         } = serialized_batch;
     657              : 
     658              :         // Add the base_offset to the batch's index entries which are relative to the batch start.
     659      9894642 :         for offset in &mut offsets {
     660              :             let IndexEntryUnpacked {
     661      5090446 :                 will_init,
     662      5090446 :                 len,
     663      5090446 :                 pos,
     664      5090446 :             } = offset.index_entry.unpack();
     665      5090446 :             offset.index_entry = IndexEntry::new(IndexEntryNewArgs {
     666      5090446 :                 base_offset,
     667      5090446 :                 batch_offset: pos,
     668      5090446 :                 len: len.into_usize(),
     669      5090446 :                 will_init,
     670      5090446 :             })?;
     671              :         }
     672              : 
     673              :         // Write the batch to the file
     674      4804196 :         inner.file.write_raw(&raw, ctx).await?;
     675      4804196 :         let new_size = inner.file.len();
     676      4804196 :         let expected_new_len = base_offset
     677      4804196 :             .checked_add(raw.len().into_u64())
     678      4804196 :             // write_raw would error if we were to overflow u64.
     679      4804196 :             // also IndexEntry and higher levels in
     680      4804196 :             //the code don't allow the file to grow that large
     681      4804196 :             .unwrap();
     682      4804196 :         assert_eq!(new_size, expected_new_len);
     683              : 
     684              :         // Update the index with the new entries
     685              :         for SerializedBatchOffset {
     686      5090446 :             key,
     687      5090446 :             lsn,
     688      5090446 :             index_entry,
     689      9894642 :         } in offsets
     690              :         {
     691      5090446 :             let vec_map = inner.index.entry(key).or_default();
     692      5090446 :             let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
     693      5090446 :             if old.is_some() {
     694              :                 // This should not break anything, but is unexpected: ingestion code aims to filter out
     695              :                 // multiple writes to the same key at the same LSN.  This happens in cases where our
     696              :                 // ingenstion code generates some write like an empty page, and we see a write from postgres
     697              :                 // to the same key in the same wal record.  If one such write makes it through, we
     698              :                 // index the most recent write, implicitly ignoring the earlier write.  We log a warning
     699              :                 // because this case is unexpected, and we would like tests to fail if this happens.
     700            0 :                 warn!("Key {} at {} written twice at same LSN", key, lsn);
     701      5090446 :             }
     702              :         }
     703              : 
     704      4804196 :         inner.resource_units.maybe_publish_size(new_size);
     705      4804196 : 
     706      4804196 :         Ok(())
     707      4804196 :     }
     708              : 
     709      4803010 :     pub(crate) fn get_opened_at(&self) -> Instant {
     710      4803010 :         self.opened_at
     711      4803010 :     }
     712              : 
     713            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     714            0 :         let mut inner = self.inner.write().await;
     715            0 :         let size = inner.file.len();
     716            0 :         inner.resource_units.publish_size(size)
     717            0 :     }
     718              : 
     719            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     720            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     721            2 :         Ok(())
     722            2 :     }
     723              : 
     724              :     /// Records the end_lsn for non-dropped layers.
     725              :     /// `end_lsn` is exclusive
     726         1138 :     pub async fn freeze(&self, end_lsn: Lsn) {
     727         1138 :         assert!(
     728         1138 :             self.start_lsn < end_lsn,
     729            0 :             "{} >= {}",
     730              :             self.start_lsn,
     731              :             end_lsn
     732              :         );
     733         1138 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     734         1138 : 
     735         1138 :         self.frozen_local_path_str
     736         1138 :             .set({
     737         1138 :                 let mut buf = String::new();
     738         1138 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     739         1138 :                     .unwrap();
     740         1138 :                 buf.into()
     741         1138 :             })
     742         1138 :             .expect("frozen_local_path_str set only once");
     743              : 
     744              :         #[cfg(debug_assertions)]
     745              :         {
     746         1138 :             let inner = self.inner.write().await;
     747      4255886 :             for vec_map in inner.index.values() {
     748      4386706 :                 for (lsn, _) in vec_map.as_slice() {
     749      4386706 :                     assert!(*lsn < end_lsn);
     750              :                 }
     751              :             }
     752              :         }
     753         1138 :     }
     754              : 
     755              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     756              :     /// layer will only contain the key range the user specifies, and may return `None`
     757              :     /// if there are no matching keys.
     758              :     ///
     759              :     /// Returns a new delta layer with all the same data as this in-memory layer
     760          968 :     pub async fn write_to_disk(
     761          968 :         &self,
     762          968 :         ctx: &RequestContext,
     763          968 :         key_range: Option<Range<Key>>,
     764          968 :         l0_flush_global_state: &l0_flush::Inner,
     765          968 :     ) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
     766              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     767              :         // layer is not writeable anymore, no one should be trying to acquire the
     768              :         // write lock on it, so we shouldn't block anyone. There's one exception
     769              :         // though: another thread might have grabbed a reference to this layer
     770              :         // in `get_layer_for_write' just before the checkpointer called
     771              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     772              :         // lock, it will see that it's not writeable anymore and retry, but it
     773              :         // would have to wait until we release it. That race condition is very
     774              :         // rare though, so we just accept the potential latency hit for now.
     775          968 :         let inner = self.inner.read().await;
     776              : 
     777              :         use l0_flush::Inner;
     778          968 :         let _concurrency_permit = match l0_flush_global_state {
     779          968 :             Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
     780              :         };
     781              : 
     782          968 :         let end_lsn = *self.end_lsn.get().unwrap();
     783              : 
     784          968 :         let key_count = if let Some(key_range) = key_range {
     785            0 :             let key_range = key_range.start.to_compact()..key_range.end.to_compact();
     786            0 : 
     787            0 :             inner
     788            0 :                 .index
     789            0 :                 .iter()
     790            0 :                 .filter(|(k, _)| key_range.contains(k))
     791            0 :                 .count()
     792              :         } else {
     793          968 :             inner.index.len()
     794              :         };
     795          968 :         if key_count == 0 {
     796            0 :             return Ok(None);
     797          968 :         }
     798              : 
     799          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     800          968 :             self.conf,
     801          968 :             self.timeline_id,
     802          968 :             self.tenant_shard_id,
     803          968 :             Key::MIN,
     804          968 :             self.start_lsn..end_lsn,
     805          968 :             ctx,
     806          968 :         )
     807          499 :         .await?;
     808              : 
     809          968 :         match l0_flush_global_state {
     810              :             l0_flush::Inner::Direct { .. } => {
     811          968 :                 let file_contents = inner.file.load_to_io_buf(ctx).await?;
     812          968 :                 let file_contents = file_contents.freeze();
     813              : 
     814      4254696 :                 for (key, vec_map) in inner.index.iter() {
     815              :                     // Write all page versions
     816      4385516 :                     for (lsn, entry) in vec_map
     817      4254696 :                         .as_slice()
     818      4254696 :                         .iter()
     819      4385516 :                         .map(|(lsn, entry)| (lsn, entry.unpack()))
     820              :                     {
     821              :                         let IndexEntryUnpacked {
     822      4385516 :                             pos,
     823      4385516 :                             len,
     824      4385516 :                             will_init,
     825      4385516 :                         } = entry;
     826      4385516 :                         let buf = file_contents.slice(pos as usize..(pos + len) as usize);
     827      4385516 :                         let (_buf, res) = delta_layer_writer
     828      4385516 :                             .put_value_bytes(
     829      4385516 :                                 Key::from_compact(*key),
     830      4385516 :                                 *lsn,
     831      4385516 :                                 buf.slice_len(),
     832      4385516 :                                 will_init,
     833      4385516 :                                 ctx,
     834      4385516 :                             )
     835         2730 :                             .await;
     836      4385516 :                         res?;
     837              :                     }
     838              :                 }
     839              :             }
     840              :         }
     841              : 
     842              :         // MAX is used here because we identify L0 layers by full key range
     843         6636 :         let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
     844              : 
     845              :         // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
     846              :         //
     847              :         // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
     848              :         // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
     849              :         // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
     850              :         //
     851              :         // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
     852              :         // we dirtied when writing to the filesystem have been flushed and marked !dirty.
     853          968 :         drop(_concurrency_permit);
     854          968 : 
     855          968 :         Ok(Some((desc, path)))
     856          968 :     }
     857              : }
     858              : 
     859              : #[cfg(test)]
     860              : mod tests {
     861              :     use super::*;
     862              : 
     863              :     #[test]
     864            2 :     fn test_index_entry() {
     865              :         const MAX_SUPPORTED_POS: usize = IndexEntry::MAX_SUPPORTED_POS;
     866              :         use IndexEntryNewArgs as Args;
     867              :         use IndexEntryUnpacked as Unpacked;
     868              : 
     869           40 :         let roundtrip = |args, expect: Unpacked| {
     870           40 :             let res = IndexEntry::new(args).expect("this tests expects no errors");
     871           40 :             let IndexEntryUnpacked {
     872           40 :                 will_init,
     873           40 :                 len,
     874           40 :                 pos,
     875           40 :             } = res.unpack();
     876           40 :             assert_eq!(will_init, expect.will_init);
     877           40 :             assert_eq!(len, expect.len);
     878           40 :             assert_eq!(pos, expect.pos);
     879           40 :         };
     880              : 
     881              :         // basic roundtrip
     882            6 :         for pos in [0, MAX_SUPPORTED_POS] {
     883           12 :             for len in [0, MAX_SUPPORTED_BLOB_LEN] {
     884           24 :                 for will_init in [true, false] {
     885           16 :                     let expect = Unpacked {
     886           16 :                         will_init,
     887           16 :                         len: len.into_u64(),
     888           16 :                         pos: pos.into_u64(),
     889           16 :                     };
     890           16 :                     roundtrip(
     891           16 :                         Args {
     892           16 :                             will_init,
     893           16 :                             base_offset: pos.into_u64(),
     894           16 :                             batch_offset: 0,
     895           16 :                             len,
     896           16 :                         },
     897           16 :                         expect,
     898           16 :                     );
     899           16 :                     roundtrip(
     900           16 :                         Args {
     901           16 :                             will_init,
     902           16 :                             base_offset: 0,
     903           16 :                             batch_offset: pos.into_u64(),
     904           16 :                             len,
     905           16 :                         },
     906           16 :                         expect,
     907           16 :                     );
     908           16 :                 }
     909              :             }
     910              :         }
     911              : 
     912              :         // too-large len
     913            2 :         let too_large = Args {
     914            2 :             will_init: false,
     915            2 :             len: MAX_SUPPORTED_BLOB_LEN + 1,
     916            2 :             base_offset: 0,
     917            2 :             batch_offset: 0,
     918            2 :         };
     919            2 :         assert!(IndexEntry::new(too_large).is_err());
     920              : 
     921              :         // too-large pos
     922              :         {
     923            2 :             let too_large = Args {
     924            2 :                 will_init: false,
     925            2 :                 len: 0,
     926            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64() + 1,
     927            2 :                 batch_offset: 0,
     928            2 :             };
     929            2 :             assert!(IndexEntry::new(too_large).is_err());
     930            2 :             let too_large = Args {
     931            2 :                 will_init: false,
     932            2 :                 len: 0,
     933            2 :                 base_offset: 0,
     934            2 :                 batch_offset: MAX_SUPPORTED_POS.into_u64() + 1,
     935            2 :             };
     936            2 :             assert!(IndexEntry::new(too_large).is_err());
     937              :         }
     938              : 
     939              :         // too large (base_offset + batch_offset)
     940              :         {
     941            2 :             let too_large = Args {
     942            2 :                 will_init: false,
     943            2 :                 len: 0,
     944            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64(),
     945            2 :                 batch_offset: 1,
     946            2 :             };
     947            2 :             assert!(IndexEntry::new(too_large).is_err());
     948            2 :             let too_large = Args {
     949            2 :                 will_init: false,
     950            2 :                 len: 0,
     951            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64() - 1,
     952            2 :                 batch_offset: MAX_SUPPORTED_POS.into_u64() - 1,
     953            2 :             };
     954            2 :             assert!(IndexEntry::new(too_large).is_err());
     955              :         }
     956              : 
     957              :         // valid special cases
     958              :         // - area past the max supported pos that is accessible by len
     959            6 :         for len in [1, MAX_SUPPORTED_BLOB_LEN] {
     960            4 :             roundtrip(
     961            4 :                 Args {
     962            4 :                     will_init: false,
     963            4 :                     len,
     964            4 :                     base_offset: MAX_SUPPORTED_POS.into_u64(),
     965            4 :                     batch_offset: 0,
     966            4 :                 },
     967            4 :                 Unpacked {
     968            4 :                     will_init: false,
     969            4 :                     len: len as u64,
     970            4 :                     pos: MAX_SUPPORTED_POS.into_u64(),
     971            4 :                 },
     972            4 :             );
     973            4 :             roundtrip(
     974            4 :                 Args {
     975            4 :                     will_init: false,
     976            4 :                     len,
     977            4 :                     base_offset: 0,
     978            4 :                     batch_offset: MAX_SUPPORTED_POS.into_u64(),
     979            4 :                 },
     980            4 :                 Unpacked {
     981            4 :                     will_init: false,
     982            4 :                     len: len as u64,
     983            4 :                     pos: MAX_SUPPORTED_POS.into_u64(),
     984            4 :                 },
     985            4 :             );
     986            4 :         }
     987            2 :     }
     988              : }
        

Generated by: LCOV version 2.1-beta