LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 87.7 % 520 456
Test Date: 2024-11-13 18:23:39 Functions: 77.1 % 48 37

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64};
       8              : use crate::config::PageServerConf;
       9              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      10              : use crate::tenant::ephemeral_file::EphemeralFile;
      11              : use crate::tenant::timeline::GetVectoredError;
      12              : use crate::tenant::PageReconstructError;
      13              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      14              : use crate::{l0_flush, page_cache};
      15              : use anyhow::{anyhow, Result};
      16              : use camino::Utf8PathBuf;
      17              : use pageserver_api::key::CompactKey;
      18              : use pageserver_api::key::Key;
      19              : use pageserver_api::keyspace::KeySpace;
      20              : use pageserver_api::models::InMemoryLayerInfo;
      21              : use pageserver_api::shard::TenantShardId;
      22              : use pageserver_api::value::Value;
      23              : use std::collections::{BTreeMap, HashMap};
      24              : use std::sync::{Arc, OnceLock};
      25              : use std::time::Instant;
      26              : use tracing::*;
      27              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      28              : use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
      29              : // avoid binding to Write (conflicts with std::io::Write)
      30              : // while being able to use std::fmt::Write's methods
      31              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      32              : use std::cmp::Ordering;
      33              : use std::fmt::Write;
      34              : use std::ops::Range;
      35              : use std::sync::atomic::Ordering as AtomicOrdering;
      36              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      37              : use tokio::sync::RwLock;
      38              : 
      39              : use super::{
      40              :     DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
      41              : };
      42              : 
      43              : pub(crate) mod vectored_dio_read;
      44              : 
      45              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      46              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      47              : 
      48              : pub struct InMemoryLayer {
      49              :     conf: &'static PageServerConf,
      50              :     tenant_shard_id: TenantShardId,
      51              :     timeline_id: TimelineId,
      52              :     file_id: InMemoryLayerFileId,
      53              : 
      54              :     /// This layer contains all the changes from 'start_lsn'. The
      55              :     /// start is inclusive.
      56              :     start_lsn: Lsn,
      57              : 
      58              :     /// Frozen layers have an exclusive end LSN.
      59              :     /// Writes are only allowed when this is `None`.
      60              :     pub(crate) end_lsn: OnceLock<Lsn>,
      61              : 
      62              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      63              :     frozen_local_path_str: OnceLock<Arc<str>>,
      64              : 
      65              :     opened_at: Instant,
      66              : 
      67              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      68              :     /// All other changing parts are in `inner`, and protected by a mutex.
      69              :     inner: RwLock<InMemoryLayerInner>,
      70              : 
      71              :     estimated_in_mem_size: AtomicU64,
      72              : }
      73              : 
      74              : impl std::fmt::Debug for InMemoryLayer {
      75            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      76            0 :         f.debug_struct("InMemoryLayer")
      77            0 :             .field("start_lsn", &self.start_lsn)
      78            0 :             .field("end_lsn", &self.end_lsn)
      79            0 :             .field("inner", &self.inner)
      80            0 :             .finish()
      81            0 :     }
      82              : }
      83              : 
      84              : pub struct InMemoryLayerInner {
      85              :     /// All versions of all pages in the layer are kept here. Indexed
      86              :     /// by block number and LSN. The [`IndexEntry`] is an offset into the
      87              :     /// ephemeral file where the page version is stored.
      88              :     index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
      89              : 
      90              :     /// The values are stored in a serialized format in this file.
      91              :     /// Each serialized Value is preceded by a 'u32' length field.
      92              :     /// PerSeg::page_versions map stores offsets into this file.
      93              :     file: EphemeralFile,
      94              : 
      95              :     resource_units: GlobalResourceUnits,
      96              : }
      97              : 
      98              : /// Support the same max blob length as blob_io, because ultimately
      99              : /// all the InMemoryLayer contents end up being written into a delta layer,
     100              : /// using the [`crate::tenant::blob_io`].
     101              : const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN;
     102              : const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
     103              :     let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize;
     104              :     let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize;
     105              :     assert!(trailing_ones + leading_zeroes == std::mem::size_of::<usize>() * 8);
     106              :     trailing_ones
     107              : };
     108              : 
     109              : /// See [`InMemoryLayerInner::index`].
     110              : ///
     111              : /// For memory efficiency, the data is packed into a u64.
     112              : ///
     113              : /// Layout:
     114              : /// - 1 bit: `will_init`
     115              : /// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
     116              : /// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
     117              : #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     118              : pub struct IndexEntry(u64);
     119              : 
     120              : impl IndexEntry {
     121              :     /// See [`Self::MAX_SUPPORTED_POS`].
     122              :     const MAX_SUPPORTED_POS_BITS: usize = {
     123              :         let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS;
     124              :         if remainder < 32 {
     125              :             panic!("pos can be u32 as per type system, support that");
     126              :         }
     127              :         remainder
     128              :     };
     129              :     /// The maximum supported blob offset that can be represented by [`Self`].
     130              :     /// See also [`Self::validate_checkpoint_distance`].
     131              :     const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1;
     132              : 
     133              :     // Layout
     134              :     const WILL_INIT_RANGE: Range<usize> = 0..1;
     135              :     const LEN_RANGE: Range<usize> =
     136              :         Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS;
     137              :     const POS_RANGE: Range<usize> =
     138              :         Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS;
     139              :     const _ASSERT: () = {
     140              :         if Self::POS_RANGE.end != 64 {
     141              :             panic!("we don't want undefined bits for our own sanity")
     142              :         }
     143              :     };
     144              : 
     145              :     /// Fails if and only if the offset or length encoded in `arg` is too large to be represented by [`Self`].
     146              :     ///
     147              :     /// The only reason why that can happen in the system is if the [`InMemoryLayer`] grows too long.
     148              :     /// The [`InMemoryLayer`] size is determined by the checkpoint distance, enforced by [`crate::tenant::Timeline::should_roll`].
     149              :     ///
     150              :     /// Thus, to avoid failure of this function, whenever we start up and/or change checkpoint distance,
     151              :     /// call [`Self::validate_checkpoint_distance`] with the new checkpoint distance value.
     152              :     ///
     153              :     /// TODO: this check should happen ideally at config parsing time (and in the request handler when a change to checkpoint distance is requested)
     154              :     /// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer.
     155              :     #[inline(always)]
     156      5090524 :     fn new(arg: IndexEntryNewArgs) -> anyhow::Result<Self> {
     157      5090524 :         let IndexEntryNewArgs {
     158      5090524 :             base_offset,
     159      5090524 :             batch_offset,
     160      5090524 :             len,
     161      5090524 :             will_init,
     162      5090524 :         } = arg;
     163              : 
     164      5090524 :         let pos = base_offset
     165      5090524 :             .checked_add(batch_offset)
     166      5090524 :             .ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?;
     167              : 
     168      5090524 :         if pos.into_usize() > Self::MAX_SUPPORTED_POS {
     169            8 :             anyhow::bail!(
     170            8 :                 "base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}",
     171            8 :                 max = Self::MAX_SUPPORTED_POS
     172            8 :             );
     173      5090516 :         }
     174      5090516 : 
     175      5090516 :         if len > MAX_SUPPORTED_BLOB_LEN {
     176            2 :             anyhow::bail!(
     177            2 :                 "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}",
     178            2 :             );
     179      5090514 :         }
     180      5090514 : 
     181      5090514 :         let mut data: u64 = 0;
     182              :         use bit_field::BitField;
     183      5090514 :         data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 });
     184      5090514 :         data.set_bits(Self::LEN_RANGE, len.into_u64());
     185      5090514 :         data.set_bits(Self::POS_RANGE, pos);
     186      5090514 : 
     187      5090514 :         Ok(Self(data))
     188      5090524 :     }
     189              : 
     190              :     #[inline(always)]
     191      4884376 :     fn unpack(&self) -> IndexEntryUnpacked {
     192              :         use bit_field::BitField;
     193      4884376 :         IndexEntryUnpacked {
     194      4884376 :             will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0,
     195      4884376 :             len: self.0.get_bits(Self::LEN_RANGE),
     196      4884376 :             pos: self.0.get_bits(Self::POS_RANGE),
     197      4884376 :         }
     198      4884376 :     }
     199              : 
     200              :     /// See [`Self::new`].
     201          208 :     pub(crate) const fn validate_checkpoint_distance(
     202          208 :         checkpoint_distance: u64,
     203          208 :     ) -> Result<(), &'static str> {
     204          208 :         if checkpoint_distance > Self::MAX_SUPPORTED_POS as u64 {
     205            0 :             return Err("exceeds the maximum supported value");
     206          208 :         }
     207          208 :         let res = u64_to_usize(checkpoint_distance).checked_add(MAX_SUPPORTED_BLOB_LEN);
     208          208 :         if res.is_none() {
     209            0 :             return Err(
     210            0 :                 "checkpoint distance + max supported blob len overflows in-memory addition",
     211            0 :             );
     212          208 :         }
     213          208 : 
     214          208 :         // NB: it is ok for the result of the addition to be larger than MAX_SUPPORTED_POS
     215          208 : 
     216          208 :         Ok(())
     217          208 :     }
     218              : 
     219              :     const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = {
     220              :         let res = Self::validate_checkpoint_distance(
     221              :             pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE,
     222              :         );
     223              :         if res.is_err() {
     224              :             panic!("default checkpoint distance is valid")
     225              :         }
     226              :     };
     227              : }
     228              : 
     229              : /// Args to [`IndexEntry::new`].
     230              : #[derive(Clone, Copy)]
     231              : struct IndexEntryNewArgs {
     232              :     base_offset: u64,
     233              :     batch_offset: u64,
     234              :     len: usize,
     235              :     will_init: bool,
     236              : }
     237              : 
     238              : /// Unpacked representation of the bitfielded [`IndexEntry`].
     239              : #[derive(Clone, Copy, PartialEq, Eq, Debug)]
     240              : struct IndexEntryUnpacked {
     241              :     will_init: bool,
     242              :     len: u64,
     243              :     pos: u64,
     244              : }
     245              : 
     246              : impl std::fmt::Debug for InMemoryLayerInner {
     247            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     248            0 :         f.debug_struct("InMemoryLayerInner").finish()
     249            0 :     }
     250              : }
     251              : 
     252              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     253              : /// to minimize contention.
     254              : ///
     255              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     256              : /// rolling layers proactively to limit the total amount of dirty data.
     257              : pub(crate) struct GlobalResources {
     258              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     259              :     // Zero means unlimited.
     260              :     pub(crate) max_dirty_bytes: AtomicU64,
     261              :     // How many bytes are in all EphemeralFile objects
     262              :     dirty_bytes: AtomicU64,
     263              :     // How many layers are contributing to dirty_bytes
     264              :     dirty_layers: AtomicUsize,
     265              : }
     266              : 
     267              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     268              : struct GlobalResourceUnits {
     269              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     270              :     // for decrementing the global counter by this many bytes when dropped.
     271              :     dirty_bytes: u64,
     272              : }
     273              : 
     274              : impl GlobalResourceUnits {
     275              :     // Hint for the layer append path to update us when the layer size differs from the last
     276              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     277              :     // updated when the Timeline "ticks" in the background.
     278              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     279              : 
     280         1266 :     fn new() -> Self {
     281         1266 :         GLOBAL_RESOURCES
     282         1266 :             .dirty_layers
     283         1266 :             .fetch_add(1, AtomicOrdering::Relaxed);
     284         1266 :         Self { dirty_bytes: 0 }
     285         1266 :     }
     286              : 
     287              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     288              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     289              :     ///
     290              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     291              :     /// the total number of dirty bytes below the configured maximum.
     292         1152 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     293         1152 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     294         1142 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     295              :             Ordering::Greater => {
     296            8 :                 let delta = size - self.dirty_bytes;
     297            8 :                 let old = GLOBAL_RESOURCES
     298            8 :                     .dirty_bytes
     299            8 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     300            8 :                 old + delta
     301              :             }
     302              :             Ordering::Less => {
     303            2 :                 let delta = self.dirty_bytes - size;
     304            2 :                 let old = GLOBAL_RESOURCES
     305            2 :                     .dirty_bytes
     306            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     307            2 :                 old - delta
     308              :             }
     309              :         };
     310              : 
     311              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     312              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     313              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     314              :         // be literally the last update.
     315         1152 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     316         1152 : 
     317         1152 :         self.dirty_bytes = size;
     318         1152 : 
     319         1152 :         let max_dirty_bytes = GLOBAL_RESOURCES
     320         1152 :             .max_dirty_bytes
     321         1152 :             .load(AtomicOrdering::Relaxed);
     322         1152 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     323              :             // Set the layer file limit to the average layer size: this implies that all above-average
     324              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     325              :             // next enter publish_size.
     326            0 :             Some(
     327            0 :                 new_global_dirty_bytes
     328            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     329            0 :             )
     330              :         } else {
     331         1152 :             None
     332              :         }
     333         1152 :     }
     334              : 
     335              :     // Call publish_size if the input size differs from last published size by more than
     336              :     // the drift limit
     337      4804200 :     fn maybe_publish_size(&mut self, size: u64) {
     338      4804200 :         let publish = match size.cmp(&self.dirty_bytes) {
     339            0 :             Ordering::Equal => false,
     340      4804200 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     341            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     342              :         };
     343              : 
     344      4804200 :         if publish {
     345            8 :             self.publish_size(size);
     346      4804192 :         }
     347      4804200 :     }
     348              : }
     349              : 
     350              : impl Drop for GlobalResourceUnits {
     351         1144 :     fn drop(&mut self) {
     352         1144 :         GLOBAL_RESOURCES
     353         1144 :             .dirty_layers
     354         1144 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     355         1144 : 
     356         1144 :         // Subtract our contribution to the global total dirty bytes
     357         1144 :         self.publish_size(0);
     358         1144 :     }
     359              : }
     360              : 
     361              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     362              :     max_dirty_bytes: AtomicU64::new(0),
     363              :     dirty_bytes: AtomicU64::new(0),
     364              :     dirty_layers: AtomicUsize::new(0),
     365              : };
     366              : 
     367              : impl InMemoryLayer {
     368       606295 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     369       606295 :         self.file_id
     370       606295 :     }
     371              : 
     372         1142 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     373         1142 :         self.timeline_id
     374         1142 :     }
     375              : 
     376            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     377            0 :         let lsn_start = self.start_lsn;
     378              : 
     379            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     380            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     381              :         } else {
     382            0 :             InMemoryLayerInfo::Open { lsn_start }
     383              :         }
     384            0 :     }
     385              : 
     386            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     387            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     388            0 :     }
     389              : 
     390      4804200 :     pub(crate) fn assert_writable(&self) {
     391      4804200 :         assert!(self.end_lsn.get().is_none());
     392      4804200 :     }
     393              : 
     394      1521543 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     395      1521543 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     396      1521543 :     }
     397              : 
     398      1520401 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     399      1520401 :         self.start_lsn..self.end_lsn_or_max()
     400      1520401 :     }
     401              : 
     402              :     /// debugging function to print out the contents of the layer
     403              :     ///
     404              :     /// this is likely completly unused
     405            0 :     pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
     406            0 :         let end_str = self.end_lsn_or_max();
     407            0 : 
     408            0 :         println!(
     409            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     410            0 :             self.timeline_id, self.start_lsn, end_str,
     411            0 :         );
     412            0 : 
     413            0 :         Ok(())
     414            0 :     }
     415              : 
     416              :     // Look up the keys in the provided keyspace and update
     417              :     // the reconstruct state with whatever is found.
     418              :     //
     419              :     // If the key is cached, go no further than the cached Lsn.
     420       606295 :     pub(crate) async fn get_values_reconstruct_data(
     421       606295 :         &self,
     422       606295 :         keyspace: KeySpace,
     423       606295 :         end_lsn: Lsn,
     424       606295 :         reconstruct_state: &mut ValuesReconstructState,
     425       606295 :         ctx: &RequestContext,
     426       606295 :     ) -> Result<(), GetVectoredError> {
     427       606295 :         let ctx = RequestContextBuilder::extend(ctx)
     428       606295 :             .page_content_kind(PageContentKind::InMemoryLayer)
     429       606295 :             .build();
     430              : 
     431       606295 :         let inner = self.inner.read().await;
     432              : 
     433              :         struct ValueRead {
     434              :             entry_lsn: Lsn,
     435              :             read: vectored_dio_read::LogicalRead<Vec<u8>>,
     436              :         }
     437       606295 :         let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
     438              : 
     439       606295 :         for range in keyspace.ranges.iter() {
     440       606295 :             for (key, vec_map) in inner
     441       606295 :                 .index
     442       606295 :                 .range(range.start.to_compact()..range.end.to_compact())
     443              :             {
     444       498824 :                 let key = Key::from_compact(*key);
     445       498824 :                 let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     446            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     447       498824 :                     None => self.start_lsn..end_lsn,
     448              :                 };
     449              : 
     450       498824 :                 let slice = vec_map.slice_range(lsn_range);
     451              : 
     452       498824 :                 for (entry_lsn, index_entry) in slice.iter().rev() {
     453              :                     let IndexEntryUnpacked {
     454       498820 :                         pos,
     455       498820 :                         len,
     456       498820 :                         will_init,
     457       498820 :                     } = index_entry.unpack();
     458       498820 : 
     459       498820 :                     reads.entry(key).or_default().push(ValueRead {
     460       498820 :                         entry_lsn: *entry_lsn,
     461       498820 :                         read: vectored_dio_read::LogicalRead::new(
     462       498820 :                             pos,
     463       498820 :                             Vec::with_capacity(len as usize),
     464       498820 :                         ),
     465       498820 :                     });
     466       498820 :                     if will_init {
     467       498820 :                         break;
     468            0 :                     }
     469              :                 }
     470              :             }
     471              :         }
     472              : 
     473              :         // Execute the reads.
     474              : 
     475       606295 :         let f = vectored_dio_read::execute(
     476       606295 :             &inner.file,
     477       606295 :             reads
     478       606295 :                 .iter()
     479       606295 :                 .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
     480       606295 :             &ctx,
     481       606295 :         );
     482       606295 :         send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
     483        85742 :             .await;
     484              : 
     485              :         // Process results into the reconstruct state
     486      1105115 :         'next_key: for (key, value_reads) in reads {
     487       498820 :             for ValueRead { entry_lsn, read } in value_reads {
     488       498820 :                 match read.into_result().expect("we run execute() above") {
     489            0 :                     Err(e) => {
     490            0 :                         reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
     491            0 :                         continue 'next_key;
     492              :                     }
     493       498820 :                     Ok(value_buf) => {
     494       498820 :                         let value = Value::des(&value_buf);
     495       498820 :                         if let Err(e) = value {
     496            0 :                             reconstruct_state
     497            0 :                                 .on_key_error(key, PageReconstructError::from(anyhow!(e)));
     498            0 :                             continue 'next_key;
     499       498820 :                         }
     500       498820 : 
     501       498820 :                         let key_situation =
     502       498820 :                             reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
     503       498820 :                         if key_situation == ValueReconstructSituation::Complete {
     504              :                             // TODO: metric to see if we fetched more values than necessary
     505       498820 :                             continue 'next_key;
     506            0 :                         }
     507              : 
     508              :                         // process the next value in the next iteration of the loop
     509              :                     }
     510              :                 }
     511              :             }
     512              :         }
     513              : 
     514       606295 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     515       606295 : 
     516       606295 :         Ok(())
     517       606295 :     }
     518              : }
     519              : 
     520         2284 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     521         2284 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     522         2284 : }
     523              : 
     524         1142 : fn inmem_layer_log_display(
     525         1142 :     mut f: impl Write,
     526         1142 :     timeline: TimelineId,
     527         1142 :     start_lsn: Lsn,
     528         1142 :     end_lsn: Lsn,
     529         1142 : ) -> std::fmt::Result {
     530         1142 :     write!(f, "timeline {} in-memory ", timeline)?;
     531         1142 :     inmem_layer_display(f, start_lsn, end_lsn)
     532         1142 : }
     533              : 
     534              : impl std::fmt::Display for InMemoryLayer {
     535         1142 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     536         1142 :         let end_lsn = self.end_lsn_or_max();
     537         1142 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     538         1142 :     }
     539              : }
     540              : 
     541              : impl InMemoryLayer {
     542              :     /// Get layer size.
     543         1266 :     pub async fn size(&self) -> Result<u64> {
     544         1266 :         let inner = self.inner.read().await;
     545         1266 :         Ok(inner.file.len())
     546         1266 :     }
     547              : 
     548         1180 :     pub fn estimated_in_mem_size(&self) -> u64 {
     549         1180 :         self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
     550         1180 :     }
     551              : 
     552              :     /// Create a new, empty, in-memory layer
     553         1266 :     pub async fn create(
     554         1266 :         conf: &'static PageServerConf,
     555         1266 :         timeline_id: TimelineId,
     556         1266 :         tenant_shard_id: TenantShardId,
     557         1266 :         start_lsn: Lsn,
     558         1266 :         gate_guard: utils::sync::gate::GateGuard,
     559         1266 :         ctx: &RequestContext,
     560         1266 :     ) -> Result<InMemoryLayer> {
     561         1266 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     562              : 
     563         1266 :         let file =
     564         1266 :             EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
     565         1266 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     566         1266 : 
     567         1266 :         Ok(InMemoryLayer {
     568         1266 :             file_id: key,
     569         1266 :             frozen_local_path_str: OnceLock::new(),
     570         1266 :             conf,
     571         1266 :             timeline_id,
     572         1266 :             tenant_shard_id,
     573         1266 :             start_lsn,
     574         1266 :             end_lsn: OnceLock::new(),
     575         1266 :             opened_at: Instant::now(),
     576         1266 :             inner: RwLock::new(InMemoryLayerInner {
     577         1266 :                 index: BTreeMap::new(),
     578         1266 :                 file,
     579         1266 :                 resource_units: GlobalResourceUnits::new(),
     580         1266 :             }),
     581         1266 :             estimated_in_mem_size: AtomicU64::new(0),
     582         1266 :         })
     583         1266 :     }
     584              : 
     585              :     /// Write path.
     586              :     ///
     587              :     /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
     588              :     /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
     589              :     /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
     590      4804200 :     pub async fn put_batch(
     591      4804200 :         &self,
     592      4804200 :         serialized_batch: SerializedValueBatch,
     593      4804200 :         ctx: &RequestContext,
     594      4804200 :     ) -> anyhow::Result<()> {
     595      4804200 :         let mut inner = self.inner.write().await;
     596      4804200 :         self.assert_writable();
     597      4804200 : 
     598      4804200 :         let base_offset = inner.file.len();
     599      4804200 : 
     600      4804200 :         let SerializedValueBatch {
     601      4804200 :             raw,
     602      4804200 :             metadata,
     603      4804200 :             max_lsn: _,
     604      4804200 :             len: _,
     605      4804200 :         } = serialized_batch;
     606      4804200 : 
     607      4804200 :         // Write the batch to the file
     608      4804200 :         inner.file.write_raw(&raw, ctx).await?;
     609      4804200 :         let new_size = inner.file.len();
     610      4804200 :         let expected_new_len = base_offset
     611      4804200 :             .checked_add(raw.len().into_u64())
     612      4804200 :             // write_raw would error if we were to overflow u64.
     613      4804200 :             // also IndexEntry and higher levels in
     614      4804200 :             //the code don't allow the file to grow that large
     615      4804200 :             .unwrap();
     616      4804200 :         assert_eq!(new_size, expected_new_len);
     617              : 
     618              :         // Update the index with the new entries
     619      9894674 :         for meta in metadata {
     620              :             let SerializedValueMeta {
     621      5090474 :                 key,
     622      5090474 :                 lsn,
     623      5090474 :                 batch_offset,
     624      5090474 :                 len,
     625      5090474 :                 will_init,
     626      5090474 :             } = match meta {
     627      5090474 :                 ValueMeta::Serialized(ser) => ser,
     628              :                 ValueMeta::Observed(_) => {
     629            0 :                     continue;
     630              :                 }
     631              :             };
     632              : 
     633              :             // Add the base_offset to the batch's index entries which are relative to the batch start.
     634      5090474 :             let index_entry = IndexEntry::new(IndexEntryNewArgs {
     635      5090474 :                 base_offset,
     636      5090474 :                 batch_offset,
     637      5090474 :                 len,
     638      5090474 :                 will_init,
     639      5090474 :             })?;
     640              : 
     641      5090474 :             let vec_map = inner.index.entry(key).or_default();
     642      5090474 :             let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
     643      5090474 :             if old.is_some() {
     644              :                 // This should not break anything, but is unexpected: ingestion code aims to filter out
     645              :                 // multiple writes to the same key at the same LSN.  This happens in cases where our
     646              :                 // ingenstion code generates some write like an empty page, and we see a write from postgres
     647              :                 // to the same key in the same wal record.  If one such write makes it through, we
     648              :                 // index the most recent write, implicitly ignoring the earlier write.  We log a warning
     649              :                 // because this case is unexpected, and we would like tests to fail if this happens.
     650            0 :                 warn!("Key {} at {} written twice at same LSN", key, lsn);
     651      5090474 :             }
     652      5090474 :             self.estimated_in_mem_size.fetch_add(
     653      5090474 :                 (std::mem::size_of::<CompactKey>()
     654      5090474 :                     + std::mem::size_of::<Lsn>()
     655      5090474 :                     + std::mem::size_of::<IndexEntry>()) as u64,
     656      5090474 :                 AtomicOrdering::Relaxed,
     657      5090474 :             );
     658              :         }
     659              : 
     660      4804200 :         inner.resource_units.maybe_publish_size(new_size);
     661      4804200 : 
     662      4804200 :         Ok(())
     663      4804200 :     }
     664              : 
     665      4803010 :     pub(crate) fn get_opened_at(&self) -> Instant {
     666      4803010 :         self.opened_at
     667      4803010 :     }
     668              : 
     669            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     670            0 :         let mut inner = self.inner.write().await;
     671            0 :         let size = inner.file.len();
     672            0 :         inner.resource_units.publish_size(size)
     673            0 :     }
     674              : 
     675            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     676            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     677            2 :         Ok(())
     678            2 :     }
     679              : 
     680              :     /// Records the end_lsn for non-dropped layers.
     681              :     /// `end_lsn` is exclusive
     682         1142 :     pub async fn freeze(&self, end_lsn: Lsn) {
     683         1142 :         assert!(
     684         1142 :             self.start_lsn < end_lsn,
     685            0 :             "{} >= {}",
     686              :             self.start_lsn,
     687              :             end_lsn
     688              :         );
     689         1142 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     690         1142 : 
     691         1142 :         self.frozen_local_path_str
     692         1142 :             .set({
     693         1142 :                 let mut buf = String::new();
     694         1142 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     695         1142 :                     .unwrap();
     696         1142 :                 buf.into()
     697         1142 :             })
     698         1142 :             .expect("frozen_local_path_str set only once");
     699              : 
     700              :         #[cfg(debug_assertions)]
     701              :         {
     702         1142 :             let inner = self.inner.write().await;
     703      4255904 :             for vec_map in inner.index.values() {
     704      4386734 :                 for (lsn, _) in vec_map.as_slice() {
     705      4386734 :                     assert!(*lsn < end_lsn);
     706              :                 }
     707              :             }
     708              :         }
     709         1142 :     }
     710              : 
     711              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     712              :     /// layer will only contain the key range the user specifies, and may return `None`
     713              :     /// if there are no matching keys.
     714              :     ///
     715              :     /// Returns a new delta layer with all the same data as this in-memory layer
     716          968 :     pub async fn write_to_disk(
     717          968 :         &self,
     718          968 :         ctx: &RequestContext,
     719          968 :         key_range: Option<Range<Key>>,
     720          968 :         l0_flush_global_state: &l0_flush::Inner,
     721          968 :     ) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
     722              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     723              :         // layer is not writeable anymore, no one should be trying to acquire the
     724              :         // write lock on it, so we shouldn't block anyone. There's one exception
     725              :         // though: another thread might have grabbed a reference to this layer
     726              :         // in `get_layer_for_write' just before the checkpointer called
     727              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     728              :         // lock, it will see that it's not writeable anymore and retry, but it
     729              :         // would have to wait until we release it. That race condition is very
     730              :         // rare though, so we just accept the potential latency hit for now.
     731          968 :         let inner = self.inner.read().await;
     732              : 
     733              :         use l0_flush::Inner;
     734          968 :         let _concurrency_permit = match l0_flush_global_state {
     735          968 :             Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
     736              :         };
     737              : 
     738          968 :         let end_lsn = *self.end_lsn.get().unwrap();
     739              : 
     740          968 :         let key_count = if let Some(key_range) = key_range {
     741            0 :             let key_range = key_range.start.to_compact()..key_range.end.to_compact();
     742            0 : 
     743            0 :             inner
     744            0 :                 .index
     745            0 :                 .iter()
     746            0 :                 .filter(|(k, _)| key_range.contains(k))
     747            0 :                 .count()
     748              :         } else {
     749          968 :             inner.index.len()
     750              :         };
     751          968 :         if key_count == 0 {
     752            0 :             return Ok(None);
     753          968 :         }
     754              : 
     755          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     756          968 :             self.conf,
     757          968 :             self.timeline_id,
     758          968 :             self.tenant_shard_id,
     759          968 :             Key::MIN,
     760          968 :             self.start_lsn..end_lsn,
     761          968 :             ctx,
     762          968 :         )
     763          497 :         .await?;
     764              : 
     765          968 :         match l0_flush_global_state {
     766              :             l0_flush::Inner::Direct { .. } => {
     767          968 :                 let file_contents = inner.file.load_to_io_buf(ctx).await?;
     768          968 :                 let file_contents = file_contents.freeze();
     769              : 
     770      4254686 :                 for (key, vec_map) in inner.index.iter() {
     771              :                     // Write all page versions
     772      4385516 :                     for (lsn, entry) in vec_map
     773      4254686 :                         .as_slice()
     774      4254686 :                         .iter()
     775      4385516 :                         .map(|(lsn, entry)| (lsn, entry.unpack()))
     776              :                     {
     777              :                         let IndexEntryUnpacked {
     778      4385516 :                             pos,
     779      4385516 :                             len,
     780      4385516 :                             will_init,
     781      4385516 :                         } = entry;
     782      4385516 :                         let buf = file_contents.slice(pos as usize..(pos + len) as usize);
     783      4385516 :                         let (_buf, res) = delta_layer_writer
     784      4385516 :                             .put_value_bytes(
     785      4385516 :                                 Key::from_compact(*key),
     786      4385516 :                                 *lsn,
     787      4385516 :                                 buf.slice_len(),
     788      4385516 :                                 will_init,
     789      4385516 :                                 ctx,
     790      4385516 :                             )
     791         2731 :                             .await;
     792      4385516 :                         res?;
     793              :                     }
     794              :                 }
     795              :             }
     796              :         }
     797              : 
     798              :         // MAX is used here because we identify L0 layers by full key range
     799         6639 :         let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
     800              : 
     801              :         // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
     802              :         //
     803              :         // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
     804              :         // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
     805              :         // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
     806              :         //
     807              :         // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
     808              :         // we dirtied when writing to the filesystem have been flushed and marked !dirty.
     809          968 :         drop(_concurrency_permit);
     810          968 : 
     811          968 :         Ok(Some((desc, path)))
     812          968 :     }
     813              : }
     814              : 
     815              : #[cfg(test)]
     816              : mod tests {
     817              :     use super::*;
     818              : 
     819              :     #[test]
     820            2 :     fn test_index_entry() {
     821              :         const MAX_SUPPORTED_POS: usize = IndexEntry::MAX_SUPPORTED_POS;
     822              :         use IndexEntryNewArgs as Args;
     823              :         use IndexEntryUnpacked as Unpacked;
     824              : 
     825           40 :         let roundtrip = |args, expect: Unpacked| {
     826           40 :             let res = IndexEntry::new(args).expect("this tests expects no errors");
     827           40 :             let IndexEntryUnpacked {
     828           40 :                 will_init,
     829           40 :                 len,
     830           40 :                 pos,
     831           40 :             } = res.unpack();
     832           40 :             assert_eq!(will_init, expect.will_init);
     833           40 :             assert_eq!(len, expect.len);
     834           40 :             assert_eq!(pos, expect.pos);
     835           40 :         };
     836              : 
     837              :         // basic roundtrip
     838            6 :         for pos in [0, MAX_SUPPORTED_POS] {
     839           12 :             for len in [0, MAX_SUPPORTED_BLOB_LEN] {
     840           24 :                 for will_init in [true, false] {
     841           16 :                     let expect = Unpacked {
     842           16 :                         will_init,
     843           16 :                         len: len.into_u64(),
     844           16 :                         pos: pos.into_u64(),
     845           16 :                     };
     846           16 :                     roundtrip(
     847           16 :                         Args {
     848           16 :                             will_init,
     849           16 :                             base_offset: pos.into_u64(),
     850           16 :                             batch_offset: 0,
     851           16 :                             len,
     852           16 :                         },
     853           16 :                         expect,
     854           16 :                     );
     855           16 :                     roundtrip(
     856           16 :                         Args {
     857           16 :                             will_init,
     858           16 :                             base_offset: 0,
     859           16 :                             batch_offset: pos.into_u64(),
     860           16 :                             len,
     861           16 :                         },
     862           16 :                         expect,
     863           16 :                     );
     864           16 :                 }
     865              :             }
     866              :         }
     867              : 
     868              :         // too-large len
     869            2 :         let too_large = Args {
     870            2 :             will_init: false,
     871            2 :             len: MAX_SUPPORTED_BLOB_LEN + 1,
     872            2 :             base_offset: 0,
     873            2 :             batch_offset: 0,
     874            2 :         };
     875            2 :         assert!(IndexEntry::new(too_large).is_err());
     876              : 
     877              :         // too-large pos
     878              :         {
     879            2 :             let too_large = Args {
     880            2 :                 will_init: false,
     881            2 :                 len: 0,
     882            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64() + 1,
     883            2 :                 batch_offset: 0,
     884            2 :             };
     885            2 :             assert!(IndexEntry::new(too_large).is_err());
     886            2 :             let too_large = Args {
     887            2 :                 will_init: false,
     888            2 :                 len: 0,
     889            2 :                 base_offset: 0,
     890            2 :                 batch_offset: MAX_SUPPORTED_POS.into_u64() + 1,
     891            2 :             };
     892            2 :             assert!(IndexEntry::new(too_large).is_err());
     893              :         }
     894              : 
     895              :         // too large (base_offset + batch_offset)
     896              :         {
     897            2 :             let too_large = Args {
     898            2 :                 will_init: false,
     899            2 :                 len: 0,
     900            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64(),
     901            2 :                 batch_offset: 1,
     902            2 :             };
     903            2 :             assert!(IndexEntry::new(too_large).is_err());
     904            2 :             let too_large = Args {
     905            2 :                 will_init: false,
     906            2 :                 len: 0,
     907            2 :                 base_offset: MAX_SUPPORTED_POS.into_u64() - 1,
     908            2 :                 batch_offset: MAX_SUPPORTED_POS.into_u64() - 1,
     909            2 :             };
     910            2 :             assert!(IndexEntry::new(too_large).is_err());
     911              :         }
     912              : 
     913              :         // valid special cases
     914              :         // - area past the max supported pos that is accessible by len
     915            6 :         for len in [1, MAX_SUPPORTED_BLOB_LEN] {
     916            4 :             roundtrip(
     917            4 :                 Args {
     918            4 :                     will_init: false,
     919            4 :                     len,
     920            4 :                     base_offset: MAX_SUPPORTED_POS.into_u64(),
     921            4 :                     batch_offset: 0,
     922            4 :                 },
     923            4 :                 Unpacked {
     924            4 :                     will_init: false,
     925            4 :                     len: len as u64,
     926            4 :                     pos: MAX_SUPPORTED_POS.into_u64(),
     927            4 :                 },
     928            4 :             );
     929            4 :             roundtrip(
     930            4 :                 Args {
     931            4 :                     will_init: false,
     932            4 :                     len,
     933            4 :                     base_offset: 0,
     934            4 :                     batch_offset: MAX_SUPPORTED_POS.into_u64(),
     935            4 :                 },
     936            4 :                 Unpacked {
     937            4 :                     will_init: false,
     938            4 :                     len: len as u64,
     939            4 :                     pos: MAX_SUPPORTED_POS.into_u64(),
     940            4 :                 },
     941            4 :             );
     942            4 :         }
     943            2 :     }
     944              : }
        

Generated by: LCOV version 2.1-beta