LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 75.7 % 423 320
Test Date: 2024-07-03 15:33:13 Functions: 79.1 % 43 34

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::page_cache::PAGE_SZ;
      10              : use crate::repository::{Key, Value};
      11              : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
      12              : use crate::tenant::ephemeral_file::EphemeralFile;
      13              : use crate::tenant::storage_layer::ValueReconstructResult;
      14              : use crate::tenant::timeline::GetVectoredError;
      15              : use crate::tenant::{PageReconstructError, Timeline};
      16              : use crate::{l0_flush, page_cache, walrecord};
      17              : use anyhow::{anyhow, ensure, Result};
      18              : use pageserver_api::keyspace::KeySpace;
      19              : use pageserver_api::models::InMemoryLayerInfo;
      20              : use pageserver_api::shard::TenantShardId;
      21              : use std::collections::{BTreeMap, BinaryHeap, HashSet};
      22              : use std::sync::{Arc, OnceLock};
      23              : use std::time::Instant;
      24              : use tracing::*;
      25              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      26              : // avoid binding to Write (conflicts with std::io::Write)
      27              : // while being able to use std::fmt::Write's methods
      28              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      29              : use std::cmp::Ordering;
      30              : use std::fmt::Write;
      31              : use std::ops::Range;
      32              : use std::sync::atomic::Ordering as AtomicOrdering;
      33              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      34              : use tokio::sync::{RwLock, RwLockWriteGuard};
      35              : 
      36              : use super::{
      37              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      38              :     ValuesReconstructState,
      39              : };
      40              : 
      41              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      42              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      43              : 
      44              : pub struct InMemoryLayer {
      45              :     conf: &'static PageServerConf,
      46              :     tenant_shard_id: TenantShardId,
      47              :     timeline_id: TimelineId,
      48              :     file_id: InMemoryLayerFileId,
      49              : 
      50              :     /// This layer contains all the changes from 'start_lsn'. The
      51              :     /// start is inclusive.
      52              :     start_lsn: Lsn,
      53              : 
      54              :     /// Frozen layers have an exclusive end LSN.
      55              :     /// Writes are only allowed when this is `None`.
      56              :     pub(crate) end_lsn: OnceLock<Lsn>,
      57              : 
      58              :     /// Used for traversal path. Cached representation of the in-memory layer before frozen.
      59              :     local_path_str: Arc<str>,
      60              : 
      61              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      62              :     frozen_local_path_str: OnceLock<Arc<str>>,
      63              : 
      64              :     opened_at: Instant,
      65              : 
      66              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      67              :     /// All other changing parts are in `inner`, and protected by a mutex.
      68              :     inner: RwLock<InMemoryLayerInner>,
      69              : }
      70              : 
      71              : impl std::fmt::Debug for InMemoryLayer {
      72            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      73            0 :         f.debug_struct("InMemoryLayer")
      74            0 :             .field("start_lsn", &self.start_lsn)
      75            0 :             .field("end_lsn", &self.end_lsn)
      76            0 :             .field("inner", &self.inner)
      77            0 :             .finish()
      78            0 :     }
      79              : }
      80              : 
      81              : pub struct InMemoryLayerInner {
      82              :     /// All versions of all pages in the layer are kept here. Indexed
      83              :     /// by block number and LSN. The value is an offset into the
      84              :     /// ephemeral file where the page version is stored.
      85              :     index: BTreeMap<Key, VecMap<Lsn, u64>>,
      86              : 
      87              :     /// The values are stored in a serialized format in this file.
      88              :     /// Each serialized Value is preceded by a 'u32' length field.
      89              :     /// PerSeg::page_versions map stores offsets into this file.
      90              :     file: EphemeralFile,
      91              : 
      92              :     resource_units: GlobalResourceUnits,
      93              : }
      94              : 
      95              : impl std::fmt::Debug for InMemoryLayerInner {
      96            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      97            0 :         f.debug_struct("InMemoryLayerInner").finish()
      98            0 :     }
      99              : }
     100              : 
     101              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     102              : /// to minimize contention.
     103              : ///
     104              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     105              : /// rolling layers proactively to limit the total amount of dirty data.
     106              : pub(crate) struct GlobalResources {
     107              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     108              :     // Zero means unlimited.
     109              :     pub(crate) max_dirty_bytes: AtomicU64,
     110              :     // How many bytes are in all EphemeralFile objects
     111              :     dirty_bytes: AtomicU64,
     112              :     // How many layers are contributing to dirty_bytes
     113              :     dirty_layers: AtomicUsize,
     114              : }
     115              : 
     116              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     117              : struct GlobalResourceUnits {
     118              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     119              :     // for decrementing the global counter by this many bytes when dropped.
     120              :     dirty_bytes: u64,
     121              : }
     122              : 
     123              : impl GlobalResourceUnits {
     124              :     // Hint for the layer append path to update us when the layer size differs from the last
     125              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     126              :     // updated when the Timeline "ticks" in the background.
     127              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     128              : 
     129         1244 :     fn new() -> Self {
     130         1244 :         GLOBAL_RESOURCES
     131         1244 :             .dirty_layers
     132         1244 :             .fetch_add(1, AtomicOrdering::Relaxed);
     133         1244 :         Self { dirty_bytes: 0 }
     134         1244 :     }
     135              : 
     136              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     137              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     138              :     ///
     139              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     140              :     /// the total number of dirty bytes below the configured maximum.
     141         1126 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     142         1126 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     143         1114 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     144              :             Ordering::Greater => {
     145           10 :                 let delta = size - self.dirty_bytes;
     146           10 :                 let old = GLOBAL_RESOURCES
     147           10 :                     .dirty_bytes
     148           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     149           10 :                 old + delta
     150              :             }
     151              :             Ordering::Less => {
     152            2 :                 let delta = self.dirty_bytes - size;
     153            2 :                 let old = GLOBAL_RESOURCES
     154            2 :                     .dirty_bytes
     155            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     156            2 :                 old - delta
     157              :             }
     158              :         };
     159              : 
     160              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     161              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     162              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     163              :         // be literally the last update.
     164         1126 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     165         1126 : 
     166         1126 :         self.dirty_bytes = size;
     167         1126 : 
     168         1126 :         let max_dirty_bytes = GLOBAL_RESOURCES
     169         1126 :             .max_dirty_bytes
     170         1126 :             .load(AtomicOrdering::Relaxed);
     171         1126 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     172              :             // Set the layer file limit to the average layer size: this implies that all above-average
     173              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     174              :             // next enter publish_size.
     175            0 :             Some(
     176            0 :                 new_global_dirty_bytes
     177            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     178            0 :             )
     179              :         } else {
     180         1126 :             None
     181              :         }
     182         1126 :     }
     183              : 
     184              :     // Call publish_size if the input size differs from last published size by more than
     185              :     // the drift limit
     186      5090450 :     fn maybe_publish_size(&mut self, size: u64) {
     187      5090450 :         let publish = match size.cmp(&self.dirty_bytes) {
     188            0 :             Ordering::Equal => false,
     189      5090450 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     190            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     191              :         };
     192              : 
     193      5090450 :         if publish {
     194           10 :             self.publish_size(size);
     195      5090440 :         }
     196      5090450 :     }
     197              : }
     198              : 
     199              : impl Drop for GlobalResourceUnits {
     200         1116 :     fn drop(&mut self) {
     201         1116 :         GLOBAL_RESOURCES
     202         1116 :             .dirty_layers
     203         1116 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     204         1116 : 
     205         1116 :         // Subtract our contribution to the global total dirty bytes
     206         1116 :         self.publish_size(0);
     207         1116 :     }
     208              : }
     209              : 
     210              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     211              :     max_dirty_bytes: AtomicU64::new(0),
     212              :     dirty_bytes: AtomicU64::new(0),
     213              :     dirty_layers: AtomicUsize::new(0),
     214              : };
     215              : 
     216              : impl InMemoryLayer {
     217           14 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     218           14 :         self.file_id
     219           14 :     }
     220              : 
     221         1114 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     222         1114 :         self.timeline_id
     223         1114 :     }
     224              : 
     225            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     226            0 :         let lsn_start = self.start_lsn;
     227              : 
     228            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     229            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     230              :         } else {
     231            0 :             InMemoryLayerInfo::Open { lsn_start }
     232              :         }
     233            0 :     }
     234              : 
     235            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     236            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     237            0 :     }
     238              : 
     239      5090450 :     pub(crate) fn assert_writable(&self) {
     240      5090450 :         assert!(self.end_lsn.get().is_none());
     241      5090450 :     }
     242              : 
     243       721720 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     244       721720 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     245       721720 :     }
     246              : 
     247       720606 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     248       720606 :         self.start_lsn..self.end_lsn_or_max()
     249       720606 :     }
     250              : 
     251       606260 :     pub(crate) fn local_path_str(&self) -> &Arc<str> {
     252       606260 :         self.frozen_local_path_str
     253       606260 :             .get()
     254       606260 :             .unwrap_or(&self.local_path_str)
     255       606260 :     }
     256              : 
     257              :     /// debugging function to print out the contents of the layer
     258              :     ///
     259              :     /// this is likely completly unused
     260            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     261            0 :         let inner = self.inner.read().await;
     262              : 
     263            0 :         let end_str = self.end_lsn_or_max();
     264            0 : 
     265            0 :         println!(
     266            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     267            0 :             self.timeline_id, self.start_lsn, end_str,
     268            0 :         );
     269            0 : 
     270            0 :         if !verbose {
     271            0 :             return Ok(());
     272            0 :         }
     273            0 : 
     274            0 :         let cursor = inner.file.block_cursor();
     275            0 :         let mut buf = Vec::new();
     276            0 :         for (key, vec_map) in inner.index.iter() {
     277            0 :             for (lsn, pos) in vec_map.as_slice() {
     278            0 :                 let mut desc = String::new();
     279            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     280            0 :                 let val = Value::des(&buf);
     281            0 :                 match val {
     282            0 :                     Ok(Value::Image(img)) => {
     283            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     284              :                     }
     285            0 :                     Ok(Value::WalRecord(rec)) => {
     286            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     287            0 :                         write!(
     288            0 :                             &mut desc,
     289            0 :                             " rec {} bytes will_init: {} {}",
     290            0 :                             buf.len(),
     291            0 :                             rec.will_init(),
     292            0 :                             wal_desc
     293            0 :                         )?;
     294              :                     }
     295            0 :                     Err(err) => {
     296            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     297              :                     }
     298              :                 }
     299            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     300              :             }
     301              :         }
     302              : 
     303            0 :         Ok(())
     304            0 :     }
     305              : 
     306              :     /// Look up given value in the layer.
     307       606260 :     pub(crate) async fn get_value_reconstruct_data(
     308       606260 :         &self,
     309       606260 :         key: Key,
     310       606260 :         lsn_range: Range<Lsn>,
     311       606260 :         reconstruct_state: &mut ValueReconstructState,
     312       606260 :         ctx: &RequestContext,
     313       606260 :     ) -> anyhow::Result<ValueReconstructResult> {
     314       606260 :         ensure!(lsn_range.start >= self.start_lsn);
     315       606260 :         let mut need_image = true;
     316       606260 : 
     317       606260 :         let ctx = RequestContextBuilder::extend(ctx)
     318       606260 :             .page_content_kind(PageContentKind::InMemoryLayer)
     319       606260 :             .build();
     320              : 
     321       606260 :         let inner = self.inner.read().await;
     322              : 
     323       606260 :         let reader = inner.file.block_cursor();
     324              : 
     325              :         // Scan the page versions backwards, starting from `lsn`.
     326       606260 :         if let Some(vec_map) = inner.index.get(&key) {
     327       496893 :             let slice = vec_map.slice_range(lsn_range);
     328       496903 :             for (entry_lsn, pos) in slice.iter().rev() {
     329       496903 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     330       496903 :                 let value = Value::des(&buf)?;
     331       496903 :                 match value {
     332       496883 :                     Value::Image(img) => {
     333       496883 :                         reconstruct_state.img = Some((*entry_lsn, img));
     334       496883 :                         return Ok(ValueReconstructResult::Complete);
     335              :                     }
     336           20 :                     Value::WalRecord(rec) => {
     337           20 :                         let will_init = rec.will_init();
     338           20 :                         reconstruct_state.records.push((*entry_lsn, rec));
     339           20 :                         if will_init {
     340              :                             // This WAL record initializes the page, so no need to go further back
     341            0 :                             need_image = false;
     342            0 :                             break;
     343           20 :                         }
     344              :                     }
     345              :                 }
     346              :             }
     347       109367 :         }
     348              : 
     349              :         // release lock on 'inner'
     350              : 
     351              :         // If an older page image is needed to reconstruct the page, let the
     352              :         // caller know.
     353       109377 :         if need_image {
     354       109377 :             Ok(ValueReconstructResult::Continue)
     355              :         } else {
     356            0 :             Ok(ValueReconstructResult::Complete)
     357              :         }
     358       606260 :     }
     359              : 
     360              :     // Look up the keys in the provided keyspace and update
     361              :     // the reconstruct state with whatever is found.
     362              :     //
     363              :     // If the key is cached, go no further than the cached Lsn.
     364           14 :     pub(crate) async fn get_values_reconstruct_data(
     365           14 :         &self,
     366           14 :         keyspace: KeySpace,
     367           14 :         end_lsn: Lsn,
     368           14 :         reconstruct_state: &mut ValuesReconstructState,
     369           14 :         ctx: &RequestContext,
     370           14 :     ) -> Result<(), GetVectoredError> {
     371           14 :         let ctx = RequestContextBuilder::extend(ctx)
     372           14 :             .page_content_kind(PageContentKind::InMemoryLayer)
     373           14 :             .build();
     374              : 
     375           14 :         let inner = self.inner.read().await;
     376           14 :         let reader = inner.file.block_cursor();
     377           14 : 
     378           14 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     379           14 :         struct BlockRead {
     380           14 :             key: Key,
     381           14 :             lsn: Lsn,
     382           14 :             block_offset: u64,
     383           14 :         }
     384           14 : 
     385           14 :         let mut planned_block_reads = BinaryHeap::new();
     386              : 
     387           14 :         for range in keyspace.ranges.iter() {
     388         2018 :             for (key, vec_map) in inner.index.range(range.start..range.end) {
     389         2018 :                 let lsn_range = match reconstruct_state.get_cached_lsn(key) {
     390            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     391         2018 :                     None => self.start_lsn..end_lsn,
     392              :                 };
     393              : 
     394         2018 :                 let slice = vec_map.slice_range(lsn_range);
     395         2022 :                 for (entry_lsn, pos) in slice.iter().rev() {
     396         2022 :                     planned_block_reads.push(BlockRead {
     397         2022 :                         key: *key,
     398         2022 :                         lsn: *entry_lsn,
     399         2022 :                         block_offset: *pos,
     400         2022 :                     });
     401         2022 :                 }
     402              :             }
     403              :         }
     404              : 
     405           14 :         let keyspace_size = keyspace.total_raw_size();
     406           14 : 
     407           14 :         let mut completed_keys = HashSet::new();
     408         2036 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     409         2022 :             let block_read = planned_block_reads.pop().unwrap();
     410         2022 :             if completed_keys.contains(&block_read.key) {
     411            4 :                 continue;
     412         2018 :             }
     413              : 
     414              :             // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
     415         2018 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     416         2018 :             if let Err(e) = buf {
     417            0 :                 reconstruct_state
     418            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     419            0 :                 completed_keys.insert(block_read.key);
     420            0 :                 continue;
     421         2018 :             }
     422         2018 : 
     423         2018 :             let value = Value::des(&buf.unwrap());
     424         2018 :             if let Err(e) = value {
     425            0 :                 reconstruct_state
     426            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     427            0 :                 completed_keys.insert(block_read.key);
     428            0 :                 continue;
     429         2018 :             }
     430         2018 : 
     431         2018 :             let key_situation =
     432         2018 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     433         2018 :             if key_situation == ValueReconstructSituation::Complete {
     434         2018 :                 completed_keys.insert(block_read.key);
     435         2018 :             }
     436              :         }
     437              : 
     438           14 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     439           14 : 
     440           14 :         Ok(())
     441           14 :     }
     442              : }
     443              : 
     444         3472 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     445         3472 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     446         3472 : }
     447              : 
     448         2358 : fn inmem_layer_log_display(
     449         2358 :     mut f: impl Write,
     450         2358 :     timeline: TimelineId,
     451         2358 :     start_lsn: Lsn,
     452         2358 :     end_lsn: Lsn,
     453         2358 : ) -> std::fmt::Result {
     454         2358 :     write!(f, "timeline {} in-memory ", timeline)?;
     455         2358 :     inmem_layer_display(f, start_lsn, end_lsn)
     456         2358 : }
     457              : 
     458              : impl std::fmt::Display for InMemoryLayer {
     459         1114 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     460         1114 :         let end_lsn = self.end_lsn_or_max();
     461         1114 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     462         1114 :     }
     463              : }
     464              : 
     465              : impl InMemoryLayer {
     466              :     /// Get layer size.
     467         1244 :     pub async fn size(&self) -> Result<u64> {
     468         1244 :         let inner = self.inner.read().await;
     469         1244 :         Ok(inner.file.len())
     470         1244 :     }
     471              : 
     472              :     /// Create a new, empty, in-memory layer
     473         1244 :     pub async fn create(
     474         1244 :         conf: &'static PageServerConf,
     475         1244 :         timeline_id: TimelineId,
     476         1244 :         tenant_shard_id: TenantShardId,
     477         1244 :         start_lsn: Lsn,
     478         1244 :         ctx: &RequestContext,
     479         1244 :     ) -> Result<InMemoryLayer> {
     480         1244 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     481              : 
     482         1244 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
     483         1244 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     484         1244 : 
     485         1244 :         Ok(InMemoryLayer {
     486         1244 :             file_id: key,
     487         1244 :             local_path_str: {
     488         1244 :                 let mut buf = String::new();
     489         1244 :                 inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
     490         1244 :                 buf.into()
     491         1244 :             },
     492         1244 :             frozen_local_path_str: OnceLock::new(),
     493         1244 :             conf,
     494         1244 :             timeline_id,
     495         1244 :             tenant_shard_id,
     496         1244 :             start_lsn,
     497         1244 :             end_lsn: OnceLock::new(),
     498         1244 :             opened_at: Instant::now(),
     499         1244 :             inner: RwLock::new(InMemoryLayerInner {
     500         1244 :                 index: BTreeMap::new(),
     501         1244 :                 file,
     502         1244 :                 resource_units: GlobalResourceUnits::new(),
     503         1244 :             }),
     504         1244 :         })
     505         1244 :     }
     506              : 
     507              :     // Write operations
     508              : 
     509              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     510              :     /// Adds the page version to the in-memory tree
     511              : 
     512      5090450 :     pub(crate) async fn put_value(
     513      5090450 :         &self,
     514      5090450 :         key: Key,
     515      5090450 :         lsn: Lsn,
     516      5090450 :         buf: &[u8],
     517      5090450 :         ctx: &RequestContext,
     518      5090450 :     ) -> Result<()> {
     519      5090450 :         let mut inner = self.inner.write().await;
     520      5090450 :         self.assert_writable();
     521      5090450 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     522      5090450 :     }
     523              : 
     524      5090450 :     async fn put_value_locked(
     525      5090450 :         &self,
     526      5090450 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     527      5090450 :         key: Key,
     528      5090450 :         lsn: Lsn,
     529      5090450 :         buf: &[u8],
     530      5090450 :         ctx: &RequestContext,
     531      5090450 :     ) -> Result<()> {
     532      5090450 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     533              : 
     534      5090450 :         let off = {
     535      5090450 :             locked_inner
     536      5090450 :                 .file
     537      5090450 :                 .write_blob(
     538      5090450 :                     buf,
     539      5090450 :                     &RequestContextBuilder::extend(ctx)
     540      5090450 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     541      5090450 :                         .build(),
     542      5090450 :                 )
     543         3577 :                 .await?
     544              :         };
     545              : 
     546      5090450 :         let vec_map = locked_inner.index.entry(key).or_default();
     547      5090450 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     548      5090450 :         if old.is_some() {
     549              :             // We already had an entry for this LSN. That's odd..
     550            0 :             warn!("Key {} at {} already exists", key, lsn);
     551      5090450 :         }
     552              : 
     553      5090450 :         let size = locked_inner.file.len();
     554      5090450 :         locked_inner.resource_units.maybe_publish_size(size);
     555      5090450 : 
     556      5090450 :         Ok(())
     557      5090450 :     }
     558              : 
     559      4803026 :     pub(crate) fn get_opened_at(&self) -> Instant {
     560      4803026 :         self.opened_at
     561      4803026 :     }
     562              : 
     563            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     564            0 :         let mut inner = self.inner.write().await;
     565            0 :         let size = inner.file.len();
     566            0 :         inner.resource_units.publish_size(size)
     567            0 :     }
     568              : 
     569            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     570            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     571            2 :         Ok(())
     572            2 :     }
     573              : 
     574              :     /// Records the end_lsn for non-dropped layers.
     575              :     /// `end_lsn` is exclusive
     576         1114 :     pub async fn freeze(&self, end_lsn: Lsn) {
     577         1114 :         let inner = self.inner.write().await;
     578              : 
     579         1114 :         assert!(
     580         1114 :             self.start_lsn < end_lsn,
     581            0 :             "{} >= {}",
     582              :             self.start_lsn,
     583              :             end_lsn
     584              :         );
     585         1114 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     586         1114 : 
     587         1114 :         self.frozen_local_path_str
     588         1114 :             .set({
     589         1114 :                 let mut buf = String::new();
     590         1114 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     591         1114 :                     .unwrap();
     592         1114 :                 buf.into()
     593         1114 :             })
     594         1114 :             .expect("frozen_local_path_str set only once");
     595              : 
     596      4255741 :         for vec_map in inner.index.values() {
     597      4386684 :             for (lsn, _pos) in vec_map.as_slice() {
     598      4386684 :                 assert!(*lsn < end_lsn);
     599              :             }
     600              :         }
     601         1114 :     }
     602              : 
     603              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     604              :     /// layer will only contain the key range the user specifies, and may return `None`
     605              :     /// if there are no matching keys.
     606              :     ///
     607              :     /// Returns a new delta layer with all the same data as this in-memory layer
     608         1114 :     pub(crate) async fn write_to_disk(
     609         1114 :         &self,
     610         1114 :         timeline: &Arc<Timeline>,
     611         1114 :         ctx: &RequestContext,
     612         1114 :         key_range: Option<Range<Key>>,
     613         1114 :     ) -> Result<Option<ResidentLayer>> {
     614              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     615              :         // layer is not writeable anymore, no one should be trying to acquire the
     616              :         // write lock on it, so we shouldn't block anyone. There's one exception
     617              :         // though: another thread might have grabbed a reference to this layer
     618              :         // in `get_layer_for_write' just before the checkpointer called
     619              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     620              :         // lock, it will see that it's not writeable anymore and retry, but it
     621              :         // would have to wait until we release it. That race condition is very
     622              :         // rare though, so we just accept the potential latency hit for now.
     623         1114 :         let inner = self.inner.read().await;
     624              : 
     625         1114 :         let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
     626              :         use l0_flush::Inner;
     627         1114 :         let _concurrency_permit = match &*l0_flush_global_state {
     628         1114 :             Inner::PageCached => None,
     629            0 :             Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
     630              :         };
     631              : 
     632         1114 :         let end_lsn = *self.end_lsn.get().unwrap();
     633              : 
     634         1114 :         let key_count = if let Some(key_range) = key_range {
     635          146 :             inner
     636          146 :                 .index
     637          146 :                 .iter()
     638         1166 :                 .filter(|(k, _)| key_range.contains(k))
     639          146 :                 .count()
     640              :         } else {
     641          968 :             inner.index.len()
     642              :         };
     643         1114 :         if key_count == 0 {
     644          146 :             return Ok(None);
     645          968 :         }
     646              : 
     647          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     648          968 :             self.conf,
     649          968 :             self.timeline_id,
     650          968 :             self.tenant_shard_id,
     651          968 :             Key::MIN,
     652          968 :             self.start_lsn..end_lsn,
     653          968 :             ctx,
     654          968 :         )
     655          496 :         .await?;
     656              : 
     657          968 :         match &*l0_flush_global_state {
     658              :             l0_flush::Inner::PageCached => {
     659          968 :                 let ctx = RequestContextBuilder::extend(ctx)
     660          968 :                     .page_content_kind(PageContentKind::InMemoryLayer)
     661          968 :                     .build();
     662          968 : 
     663          968 :                 let mut buf = Vec::new();
     664          968 : 
     665          968 :                 let cursor = inner.file.block_cursor();
     666              : 
     667      4254575 :                 for (key, vec_map) in inner.index.iter() {
     668              :                     // Write all page versions
     669      4385518 :                     for (lsn, pos) in vec_map.as_slice() {
     670      4385518 :                         cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     671      4385518 :                         let will_init = Value::des(&buf)?.will_init();
     672              :                         let res;
     673      4385518 :                         (buf, res) = delta_layer_writer
     674      4385518 :                             .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
     675         2729 :                             .await;
     676      4385518 :                         res?;
     677              :                     }
     678              :                 }
     679              :             }
     680              :             l0_flush::Inner::Direct { .. } => {
     681            0 :                 let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
     682            0 :                 assert_eq!(
     683            0 :                     file_contents.len() % PAGE_SZ,
     684              :                     0,
     685            0 :                     "needed by BlockReaderRef::Slice"
     686              :                 );
     687            0 :                 assert_eq!(file_contents.len(), {
     688            0 :                     let written = usize::try_from(inner.file.len()).unwrap();
     689            0 :                     if written % PAGE_SZ == 0 {
     690            0 :                         written
     691              :                     } else {
     692            0 :                         written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
     693              :                     }
     694              :                 });
     695              : 
     696            0 :                 let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
     697            0 : 
     698            0 :                 let mut buf = Vec::new();
     699              : 
     700            0 :                 for (key, vec_map) in inner.index.iter() {
     701              :                     // Write all page versions
     702            0 :                     for (lsn, pos) in vec_map.as_slice() {
     703              :                         // TODO: once we have blob lengths in the in-memory index, we can
     704              :                         // 1. get rid of the blob_io / BlockReaderRef::Slice business and
     705              :                         // 2. load the file contents into a Bytes and
     706              :                         // 3. the use `Bytes::slice` to get the `buf` that is our blob
     707              :                         // 4. pass that `buf` into `put_value_bytes`
     708              :                         // => https://github.com/neondatabase/neon/issues/8183
     709            0 :                         cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     710            0 :                         let will_init = Value::des(&buf)?.will_init();
     711              :                         let res;
     712            0 :                         (buf, res) = delta_layer_writer
     713            0 :                             .put_value_bytes(*key, *lsn, buf, will_init, ctx)
     714            0 :                             .await;
     715            0 :                         res?;
     716              :                     }
     717              :                 }
     718              : 
     719              :                 // Hold the permit until the IO is done; if we didn't, one could drop this future,
     720              :                 // thereby releasing the permit, but the Vec<u8> remains allocated until the IO completes.
     721              :                 // => we'd have more concurrenct Vec<u8> than allowed as per the semaphore.
     722            0 :                 drop(_concurrency_permit);
     723              :             }
     724              :         }
     725              : 
     726              :         // MAX is used here because we identify L0 layers by full key range
     727         6668 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
     728          968 :         Ok(Some(delta_layer))
     729         1114 :     }
     730              : }
        

Generated by: LCOV version 2.1-beta