LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 84.7 % 321 272
Test Date: 2024-08-21 17:32:46 Functions: 77.5 % 40 31

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::page_cache::PAGE_SZ;
      10              : use crate::repository::{Key, Value};
      11              : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
      12              : use crate::tenant::ephemeral_file::EphemeralFile;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::PageReconstructError;
      15              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      16              : use crate::{l0_flush, page_cache};
      17              : use anyhow::{anyhow, Result};
      18              : use camino::Utf8PathBuf;
      19              : use pageserver_api::key::CompactKey;
      20              : use pageserver_api::keyspace::KeySpace;
      21              : use pageserver_api::models::InMemoryLayerInfo;
      22              : use pageserver_api::shard::TenantShardId;
      23              : use std::collections::BTreeMap;
      24              : use std::sync::{Arc, OnceLock};
      25              : use std::time::Instant;
      26              : use tracing::*;
      27              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      28              : // avoid binding to Write (conflicts with std::io::Write)
      29              : // while being able to use std::fmt::Write's methods
      30              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      31              : use std::cmp::Ordering;
      32              : use std::fmt::Write;
      33              : use std::ops::Range;
      34              : use std::sync::atomic::Ordering as AtomicOrdering;
      35              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      36              : use tokio::sync::{RwLock, RwLockWriteGuard};
      37              : 
      38              : use super::{
      39              :     DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
      40              : };
      41              : 
      42              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      43              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      44              : 
      45              : pub struct InMemoryLayer {
      46              :     conf: &'static PageServerConf,
      47              :     tenant_shard_id: TenantShardId,
      48              :     timeline_id: TimelineId,
      49              :     file_id: InMemoryLayerFileId,
      50              : 
      51              :     /// This layer contains all the changes from 'start_lsn'. The
      52              :     /// start is inclusive.
      53              :     start_lsn: Lsn,
      54              : 
      55              :     /// Frozen layers have an exclusive end LSN.
      56              :     /// Writes are only allowed when this is `None`.
      57              :     pub(crate) end_lsn: OnceLock<Lsn>,
      58              : 
      59              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      60              :     frozen_local_path_str: OnceLock<Arc<str>>,
      61              : 
      62              :     opened_at: Instant,
      63              : 
      64              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      65              :     /// All other changing parts are in `inner`, and protected by a mutex.
      66              :     inner: RwLock<InMemoryLayerInner>,
      67              : }
      68              : 
      69              : impl std::fmt::Debug for InMemoryLayer {
      70            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      71            0 :         f.debug_struct("InMemoryLayer")
      72            0 :             .field("start_lsn", &self.start_lsn)
      73            0 :             .field("end_lsn", &self.end_lsn)
      74            0 :             .field("inner", &self.inner)
      75            0 :             .finish()
      76            0 :     }
      77              : }
      78              : 
      79              : pub struct InMemoryLayerInner {
      80              :     /// All versions of all pages in the layer are kept here. Indexed
      81              :     /// by block number and LSN. The value is an offset into the
      82              :     /// ephemeral file where the page version is stored.
      83              :     index: BTreeMap<CompactKey, VecMap<Lsn, u64>>,
      84              : 
      85              :     /// The values are stored in a serialized format in this file.
      86              :     /// Each serialized Value is preceded by a 'u32' length field.
      87              :     /// PerSeg::page_versions map stores offsets into this file.
      88              :     file: EphemeralFile,
      89              : 
      90              :     resource_units: GlobalResourceUnits,
      91              : }
      92              : 
      93              : impl std::fmt::Debug for InMemoryLayerInner {
      94            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      95            0 :         f.debug_struct("InMemoryLayerInner").finish()
      96            0 :     }
      97              : }
      98              : 
      99              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     100              : /// to minimize contention.
     101              : ///
     102              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     103              : /// rolling layers proactively to limit the total amount of dirty data.
     104              : pub(crate) struct GlobalResources {
     105              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     106              :     // Zero means unlimited.
     107              :     pub(crate) max_dirty_bytes: AtomicU64,
     108              :     // How many bytes are in all EphemeralFile objects
     109              :     dirty_bytes: AtomicU64,
     110              :     // How many layers are contributing to dirty_bytes
     111              :     dirty_layers: AtomicUsize,
     112              : }
     113              : 
     114              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     115              : struct GlobalResourceUnits {
     116              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     117              :     // for decrementing the global counter by this many bytes when dropped.
     118              :     dirty_bytes: u64,
     119              : }
     120              : 
     121              : impl GlobalResourceUnits {
     122              :     // Hint for the layer append path to update us when the layer size differs from the last
     123              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     124              :     // updated when the Timeline "ticks" in the background.
     125              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     126              : 
     127         1264 :     fn new() -> Self {
     128         1264 :         GLOBAL_RESOURCES
     129         1264 :             .dirty_layers
     130         1264 :             .fetch_add(1, AtomicOrdering::Relaxed);
     131         1264 :         Self { dirty_bytes: 0 }
     132         1264 :     }
     133              : 
     134              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     135              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     136              :     ///
     137              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     138              :     /// the total number of dirty bytes below the configured maximum.
     139         1146 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     140         1146 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     141         1134 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     142              :             Ordering::Greater => {
     143           10 :                 let delta = size - self.dirty_bytes;
     144           10 :                 let old = GLOBAL_RESOURCES
     145           10 :                     .dirty_bytes
     146           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     147           10 :                 old + delta
     148              :             }
     149              :             Ordering::Less => {
     150            2 :                 let delta = self.dirty_bytes - size;
     151            2 :                 let old = GLOBAL_RESOURCES
     152            2 :                     .dirty_bytes
     153            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     154            2 :                 old - delta
     155              :             }
     156              :         };
     157              : 
     158              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     159              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     160              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     161              :         // be literally the last update.
     162         1146 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     163         1146 : 
     164         1146 :         self.dirty_bytes = size;
     165         1146 : 
     166         1146 :         let max_dirty_bytes = GLOBAL_RESOURCES
     167         1146 :             .max_dirty_bytes
     168         1146 :             .load(AtomicOrdering::Relaxed);
     169         1146 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     170              :             // Set the layer file limit to the average layer size: this implies that all above-average
     171              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     172              :             // next enter publish_size.
     173            0 :             Some(
     174            0 :                 new_global_dirty_bytes
     175            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     176            0 :             )
     177              :         } else {
     178         1146 :             None
     179              :         }
     180         1146 :     }
     181              : 
     182              :     // Call publish_size if the input size differs from last published size by more than
     183              :     // the drift limit
     184      5090610 :     fn maybe_publish_size(&mut self, size: u64) {
     185      5090610 :         let publish = match size.cmp(&self.dirty_bytes) {
     186            0 :             Ordering::Equal => false,
     187      5090610 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     188            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     189              :         };
     190              : 
     191      5090610 :         if publish {
     192           10 :             self.publish_size(size);
     193      5090600 :         }
     194      5090610 :     }
     195              : }
     196              : 
     197              : impl Drop for GlobalResourceUnits {
     198         1136 :     fn drop(&mut self) {
     199         1136 :         GLOBAL_RESOURCES
     200         1136 :             .dirty_layers
     201         1136 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     202         1136 : 
     203         1136 :         // Subtract our contribution to the global total dirty bytes
     204         1136 :         self.publish_size(0);
     205         1136 :     }
     206              : }
     207              : 
     208              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     209              :     max_dirty_bytes: AtomicU64::new(0),
     210              :     dirty_bytes: AtomicU64::new(0),
     211              :     dirty_layers: AtomicUsize::new(0),
     212              : };
     213              : 
     214              : impl InMemoryLayer {
     215       606143 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     216       606143 :         self.file_id
     217       606143 :     }
     218              : 
     219         1134 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     220         1134 :         self.timeline_id
     221         1134 :     }
     222              : 
     223            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     224            0 :         let lsn_start = self.start_lsn;
     225              : 
     226            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     227            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     228              :         } else {
     229            0 :             InMemoryLayerInfo::Open { lsn_start }
     230              :         }
     231            0 :     }
     232              : 
     233            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     234            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     235            0 :     }
     236              : 
     237      5090610 :     pub(crate) fn assert_writable(&self) {
     238      5090610 :         assert!(self.end_lsn.get().is_none());
     239      5090610 :     }
     240              : 
     241      1519603 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     242      1519603 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     243      1519603 :     }
     244              : 
     245      1518469 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     246      1518469 :         self.start_lsn..self.end_lsn_or_max()
     247      1518469 :     }
     248              : 
     249              :     /// debugging function to print out the contents of the layer
     250              :     ///
     251              :     /// this is likely completly unused
     252            0 :     pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
     253            0 :         let end_str = self.end_lsn_or_max();
     254            0 : 
     255            0 :         println!(
     256            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     257            0 :             self.timeline_id, self.start_lsn, end_str,
     258            0 :         );
     259            0 : 
     260            0 :         Ok(())
     261            0 :     }
     262              : 
     263              :     // Look up the keys in the provided keyspace and update
     264              :     // the reconstruct state with whatever is found.
     265              :     //
     266              :     // If the key is cached, go no further than the cached Lsn.
     267       606143 :     pub(crate) async fn get_values_reconstruct_data(
     268       606143 :         &self,
     269       606143 :         keyspace: KeySpace,
     270       606143 :         end_lsn: Lsn,
     271       606143 :         reconstruct_state: &mut ValuesReconstructState,
     272       606143 :         ctx: &RequestContext,
     273       606143 :     ) -> Result<(), GetVectoredError> {
     274       606143 :         let ctx = RequestContextBuilder::extend(ctx)
     275       606143 :             .page_content_kind(PageContentKind::InMemoryLayer)
     276       606143 :             .build();
     277              : 
     278       606143 :         let inner = self.inner.read().await;
     279       606143 :         let reader = inner.file.block_cursor();
     280              : 
     281       606143 :         for range in keyspace.ranges.iter() {
     282       606143 :             for (key, vec_map) in inner
     283       606143 :                 .index
     284       606143 :                 .range(range.start.to_compact()..range.end.to_compact())
     285              :             {
     286       499008 :                 let key = Key::from_compact(*key);
     287       499008 :                 let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     288            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     289       499008 :                     None => self.start_lsn..end_lsn,
     290              :                 };
     291              : 
     292       499008 :                 let slice = vec_map.slice_range(lsn_range);
     293              : 
     294       499018 :                 for (entry_lsn, pos) in slice.iter().rev() {
     295              :                     // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
     296       499018 :                     let buf = reader.read_blob(*pos, &ctx).await;
     297       499018 :                     if let Err(e) = buf {
     298            0 :                         reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
     299            0 :                         break;
     300       499018 :                     }
     301       499018 : 
     302       499018 :                     let value = Value::des(&buf.unwrap());
     303       499018 :                     if let Err(e) = value {
     304            0 :                         reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
     305            0 :                         break;
     306       499018 :                     }
     307       499018 : 
     308       499018 :                     let key_situation =
     309       499018 :                         reconstruct_state.update_key(&key, *entry_lsn, value.unwrap());
     310       499018 :                     if key_situation == ValueReconstructSituation::Complete {
     311       498998 :                         break;
     312           20 :                     }
     313              :                 }
     314              :             }
     315              :         }
     316              : 
     317       606143 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     318       606143 : 
     319       606143 :         Ok(())
     320       606143 :     }
     321              : }
     322              : 
     323         2268 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     324         2268 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     325         2268 : }
     326              : 
     327         1134 : fn inmem_layer_log_display(
     328         1134 :     mut f: impl Write,
     329         1134 :     timeline: TimelineId,
     330         1134 :     start_lsn: Lsn,
     331         1134 :     end_lsn: Lsn,
     332         1134 : ) -> std::fmt::Result {
     333         1134 :     write!(f, "timeline {} in-memory ", timeline)?;
     334         1134 :     inmem_layer_display(f, start_lsn, end_lsn)
     335         1134 : }
     336              : 
     337              : impl std::fmt::Display for InMemoryLayer {
     338         1134 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     339         1134 :         let end_lsn = self.end_lsn_or_max();
     340         1134 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     341         1134 :     }
     342              : }
     343              : 
     344              : impl InMemoryLayer {
     345              :     /// Get layer size.
     346         1264 :     pub async fn size(&self) -> Result<u64> {
     347         1264 :         let inner = self.inner.read().await;
     348         1264 :         Ok(inner.file.len())
     349         1264 :     }
     350              : 
     351              :     /// Create a new, empty, in-memory layer
     352         1264 :     pub async fn create(
     353         1264 :         conf: &'static PageServerConf,
     354         1264 :         timeline_id: TimelineId,
     355         1264 :         tenant_shard_id: TenantShardId,
     356         1264 :         start_lsn: Lsn,
     357         1264 :         gate_guard: utils::sync::gate::GateGuard,
     358         1264 :         ctx: &RequestContext,
     359         1264 :     ) -> Result<InMemoryLayer> {
     360         1264 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     361              : 
     362         1264 :         let file =
     363         1264 :             EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
     364         1264 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     365         1264 : 
     366         1264 :         Ok(InMemoryLayer {
     367         1264 :             file_id: key,
     368         1264 :             frozen_local_path_str: OnceLock::new(),
     369         1264 :             conf,
     370         1264 :             timeline_id,
     371         1264 :             tenant_shard_id,
     372         1264 :             start_lsn,
     373         1264 :             end_lsn: OnceLock::new(),
     374         1264 :             opened_at: Instant::now(),
     375         1264 :             inner: RwLock::new(InMemoryLayerInner {
     376         1264 :                 index: BTreeMap::new(),
     377         1264 :                 file,
     378         1264 :                 resource_units: GlobalResourceUnits::new(),
     379         1264 :             }),
     380         1264 :         })
     381         1264 :     }
     382              : 
     383              :     // Write operations
     384              : 
     385              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     386              :     /// Adds the page version to the in-memory tree
     387      5090610 :     pub async fn put_value(
     388      5090610 :         &self,
     389      5090610 :         key: CompactKey,
     390      5090610 :         lsn: Lsn,
     391      5090610 :         buf: &[u8],
     392      5090610 :         ctx: &RequestContext,
     393      5090610 :     ) -> Result<()> {
     394      5090610 :         let mut inner = self.inner.write().await;
     395      5090610 :         self.assert_writable();
     396      5090610 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     397      5090610 :     }
     398              : 
     399      5090610 :     async fn put_value_locked(
     400      5090610 :         &self,
     401      5090610 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     402      5090610 :         key: CompactKey,
     403      5090610 :         lsn: Lsn,
     404      5090610 :         buf: &[u8],
     405      5090610 :         ctx: &RequestContext,
     406      5090610 :     ) -> Result<()> {
     407      5090610 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     408              : 
     409      5090610 :         let off = {
     410      5090610 :             locked_inner
     411      5090610 :                 .file
     412      5090610 :                 .write_blob(
     413      5090610 :                     buf,
     414      5090610 :                     &RequestContextBuilder::extend(ctx)
     415      5090610 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     416      5090610 :                         .build(),
     417      5090610 :                 )
     418         3318 :                 .await?
     419              :         };
     420              : 
     421      5090610 :         let vec_map = locked_inner.index.entry(key).or_default();
     422      5090610 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     423      5090610 :         if old.is_some() {
     424              :             // We already had an entry for this LSN. That's odd..
     425            0 :             warn!("Key {} at {} already exists", key, lsn);
     426      5090610 :         }
     427              : 
     428      5090610 :         let size = locked_inner.file.len();
     429      5090610 :         locked_inner.resource_units.maybe_publish_size(size);
     430      5090610 : 
     431      5090610 :         Ok(())
     432      5090610 :     }
     433              : 
     434      4803026 :     pub(crate) fn get_opened_at(&self) -> Instant {
     435      4803026 :         self.opened_at
     436      4803026 :     }
     437              : 
     438            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     439            0 :         let mut inner = self.inner.write().await;
     440            0 :         let size = inner.file.len();
     441            0 :         inner.resource_units.publish_size(size)
     442            0 :     }
     443              : 
     444            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     445            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     446            2 :         Ok(())
     447            2 :     }
     448              : 
     449              :     /// Records the end_lsn for non-dropped layers.
     450              :     /// `end_lsn` is exclusive
     451         1134 :     pub async fn freeze(&self, end_lsn: Lsn) {
     452         1134 :         assert!(
     453         1134 :             self.start_lsn < end_lsn,
     454            0 :             "{} >= {}",
     455              :             self.start_lsn,
     456              :             end_lsn
     457              :         );
     458         1134 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     459         1134 : 
     460         1134 :         self.frozen_local_path_str
     461         1134 :             .set({
     462         1134 :                 let mut buf = String::new();
     463         1134 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     464         1134 :                     .unwrap();
     465         1134 :                 buf.into()
     466         1134 :             })
     467         1134 :             .expect("frozen_local_path_str set only once");
     468              : 
     469              :         #[cfg(debug_assertions)]
     470              :         {
     471         1134 :             let inner = self.inner.write().await;
     472      4255859 :             for vec_map in inner.index.values() {
     473      4386844 :                 for (lsn, _pos) in vec_map.as_slice() {
     474      4386844 :                     assert!(*lsn < end_lsn);
     475              :                 }
     476              :             }
     477              :         }
     478         1134 :     }
     479              : 
     480              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     481              :     /// layer will only contain the key range the user specifies, and may return `None`
     482              :     /// if there are no matching keys.
     483              :     ///
     484              :     /// Returns a new delta layer with all the same data as this in-memory layer
     485         1134 :     pub async fn write_to_disk(
     486         1134 :         &self,
     487         1134 :         ctx: &RequestContext,
     488         1134 :         key_range: Option<Range<Key>>,
     489         1134 :         l0_flush_global_state: &l0_flush::Inner,
     490         1134 :     ) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
     491              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     492              :         // layer is not writeable anymore, no one should be trying to acquire the
     493              :         // write lock on it, so we shouldn't block anyone. There's one exception
     494              :         // though: another thread might have grabbed a reference to this layer
     495              :         // in `get_layer_for_write' just before the checkpointer called
     496              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     497              :         // lock, it will see that it's not writeable anymore and retry, but it
     498              :         // would have to wait until we release it. That race condition is very
     499              :         // rare though, so we just accept the potential latency hit for now.
     500         1134 :         let inner = self.inner.read().await;
     501              : 
     502              :         use l0_flush::Inner;
     503         1134 :         let _concurrency_permit = match l0_flush_global_state {
     504         1134 :             Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
     505              :         };
     506              : 
     507         1134 :         let end_lsn = *self.end_lsn.get().unwrap();
     508              : 
     509         1134 :         let key_count = if let Some(key_range) = key_range {
     510          166 :             let key_range = key_range.start.to_compact()..key_range.end.to_compact();
     511          166 : 
     512          166 :             inner
     513          166 :                 .index
     514          166 :                 .iter()
     515         1326 :                 .filter(|(k, _)| key_range.contains(k))
     516          166 :                 .count()
     517              :         } else {
     518          968 :             inner.index.len()
     519              :         };
     520         1134 :         if key_count == 0 {
     521          166 :             return Ok(None);
     522          968 :         }
     523              : 
     524          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     525          968 :             self.conf,
     526          968 :             self.timeline_id,
     527          968 :             self.tenant_shard_id,
     528          968 :             Key::MIN,
     529          968 :             self.start_lsn..end_lsn,
     530          968 :             ctx,
     531          968 :         )
     532          504 :         .await?;
     533              : 
     534          968 :         match l0_flush_global_state {
     535              :             l0_flush::Inner::Direct { .. } => {
     536          968 :                 let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
     537          968 :                 assert_eq!(
     538          968 :                     file_contents.len() % PAGE_SZ,
     539              :                     0,
     540            0 :                     "needed by BlockReaderRef::Slice"
     541              :                 );
     542          968 :                 assert_eq!(file_contents.len(), {
     543          968 :                     let written = usize::try_from(inner.file.len()).unwrap();
     544          968 :                     if written % PAGE_SZ == 0 {
     545            0 :                         written
     546              :                     } else {
     547          968 :                         written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
     548              :                     }
     549              :                 });
     550              : 
     551          968 :                 let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
     552          968 : 
     553          968 :                 let mut buf = Vec::new();
     554              : 
     555      4254533 :                 for (key, vec_map) in inner.index.iter() {
     556              :                     // Write all page versions
     557      4385518 :                     for (lsn, pos) in vec_map.as_slice() {
     558              :                         // TODO: once we have blob lengths in the in-memory index, we can
     559              :                         // 1. get rid of the blob_io / BlockReaderRef::Slice business and
     560              :                         // 2. load the file contents into a Bytes and
     561              :                         // 3. the use `Bytes::slice` to get the `buf` that is our blob
     562              :                         // 4. pass that `buf` into `put_value_bytes`
     563              :                         // => https://github.com/neondatabase/neon/issues/8183
     564      4385518 :                         cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     565      4385518 :                         let will_init = Value::des(&buf)?.will_init();
     566      4385518 :                         let (tmp, res) = delta_layer_writer
     567      4385518 :                             .put_value_bytes(
     568      4385518 :                                 Key::from_compact(*key),
     569      4385518 :                                 *lsn,
     570      4385518 :                                 buf.slice_len(),
     571      4385518 :                                 will_init,
     572      4385518 :                                 ctx,
     573      4385518 :                             )
     574         2732 :                             .await;
     575      4385518 :                         res?;
     576      4385518 :                         buf = tmp.into_raw_slice().into_inner();
     577              :                     }
     578              :                 }
     579              :             }
     580              :         }
     581              : 
     582              :         // MAX is used here because we identify L0 layers by full key range
     583         6639 :         let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
     584              : 
     585              :         // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
     586              :         //
     587              :         // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
     588              :         // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
     589              :         // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
     590              :         //
     591              :         // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
     592              :         // we dirtied when writing to the filesystem have been flushed and marked !dirty.
     593          968 :         drop(_concurrency_permit);
     594          968 : 
     595          968 :         Ok(Some((desc, path)))
     596         1134 :     }
     597              : }
        

Generated by: LCOV version 2.1-beta