LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 66.2 % 399 264
Test Date: 2024-08-02 21:34:27 Functions: 72.1 % 43 31

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::page_cache::PAGE_SZ;
      10              : use crate::repository::{Key, Value};
      11              : use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
      12              : use crate::tenant::ephemeral_file::EphemeralFile;
      13              : use crate::tenant::storage_layer::ValueReconstructResult;
      14              : use crate::tenant::timeline::GetVectoredError;
      15              : use crate::tenant::{PageReconstructError, Timeline};
      16              : use crate::{l0_flush, page_cache, walrecord};
      17              : use anyhow::{anyhow, ensure, Result};
      18              : use pageserver_api::keyspace::KeySpace;
      19              : use pageserver_api::models::InMemoryLayerInfo;
      20              : use pageserver_api::shard::TenantShardId;
      21              : use std::collections::BTreeMap;
      22              : use std::sync::{Arc, OnceLock};
      23              : use std::time::Instant;
      24              : use tracing::*;
      25              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      26              : // avoid binding to Write (conflicts with std::io::Write)
      27              : // while being able to use std::fmt::Write's methods
      28              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      29              : use std::cmp::Ordering;
      30              : use std::fmt::Write;
      31              : use std::ops::Range;
      32              : use std::sync::atomic::Ordering as AtomicOrdering;
      33              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      34              : use tokio::sync::{RwLock, RwLockWriteGuard};
      35              : 
      36              : use super::{
      37              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      38              :     ValuesReconstructState,
      39              : };
      40              : 
      41              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      42              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      43              : 
      44              : pub struct InMemoryLayer {
      45              :     conf: &'static PageServerConf,
      46              :     tenant_shard_id: TenantShardId,
      47              :     timeline_id: TimelineId,
      48              :     file_id: InMemoryLayerFileId,
      49              : 
      50              :     /// This layer contains all the changes from 'start_lsn'. The
      51              :     /// start is inclusive.
      52              :     start_lsn: Lsn,
      53              : 
      54              :     /// Frozen layers have an exclusive end LSN.
      55              :     /// Writes are only allowed when this is `None`.
      56              :     pub(crate) end_lsn: OnceLock<Lsn>,
      57              : 
      58              :     /// Used for traversal path. Cached representation of the in-memory layer before frozen.
      59              :     local_path_str: Arc<str>,
      60              : 
      61              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      62              :     frozen_local_path_str: OnceLock<Arc<str>>,
      63              : 
      64              :     opened_at: Instant,
      65              : 
      66              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      67              :     /// All other changing parts are in `inner`, and protected by a mutex.
      68              :     inner: RwLock<InMemoryLayerInner>,
      69              : }
      70              : 
      71              : impl std::fmt::Debug for InMemoryLayer {
      72            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      73            0 :         f.debug_struct("InMemoryLayer")
      74            0 :             .field("start_lsn", &self.start_lsn)
      75            0 :             .field("end_lsn", &self.end_lsn)
      76            0 :             .field("inner", &self.inner)
      77            0 :             .finish()
      78            0 :     }
      79              : }
      80              : 
      81              : pub struct InMemoryLayerInner {
      82              :     /// All versions of all pages in the layer are kept here. Indexed
      83              :     /// by block number and LSN. The value is an offset into the
      84              :     /// ephemeral file where the page version is stored.
      85              :     index: BTreeMap<Key, VecMap<Lsn, u64>>,
      86              : 
      87              :     /// The values are stored in a serialized format in this file.
      88              :     /// Each serialized Value is preceded by a 'u32' length field.
      89              :     /// PerSeg::page_versions map stores offsets into this file.
      90              :     file: EphemeralFile,
      91              : 
      92              :     resource_units: GlobalResourceUnits,
      93              : }
      94              : 
      95              : impl std::fmt::Debug for InMemoryLayerInner {
      96            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      97            0 :         f.debug_struct("InMemoryLayerInner").finish()
      98            0 :     }
      99              : }
     100              : 
     101              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     102              : /// to minimize contention.
     103              : ///
     104              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     105              : /// rolling layers proactively to limit the total amount of dirty data.
     106              : pub(crate) struct GlobalResources {
     107              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     108              :     // Zero means unlimited.
     109              :     pub(crate) max_dirty_bytes: AtomicU64,
     110              :     // How many bytes are in all EphemeralFile objects
     111              :     dirty_bytes: AtomicU64,
     112              :     // How many layers are contributing to dirty_bytes
     113              :     dirty_layers: AtomicUsize,
     114              : }
     115              : 
     116              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     117              : struct GlobalResourceUnits {
     118              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     119              :     // for decrementing the global counter by this many bytes when dropped.
     120              :     dirty_bytes: u64,
     121              : }
     122              : 
     123              : impl GlobalResourceUnits {
     124              :     // Hint for the layer append path to update us when the layer size differs from the last
     125              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     126              :     // updated when the Timeline "ticks" in the background.
     127              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     128              : 
     129         1256 :     fn new() -> Self {
     130         1256 :         GLOBAL_RESOURCES
     131         1256 :             .dirty_layers
     132         1256 :             .fetch_add(1, AtomicOrdering::Relaxed);
     133         1256 :         Self { dirty_bytes: 0 }
     134         1256 :     }
     135              : 
     136              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     137              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     138              :     ///
     139              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     140              :     /// the total number of dirty bytes below the configured maximum.
     141         1138 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     142         1138 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     143         1126 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     144              :             Ordering::Greater => {
     145           10 :                 let delta = size - self.dirty_bytes;
     146           10 :                 let old = GLOBAL_RESOURCES
     147           10 :                     .dirty_bytes
     148           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     149           10 :                 old + delta
     150              :             }
     151              :             Ordering::Less => {
     152            2 :                 let delta = self.dirty_bytes - size;
     153            2 :                 let old = GLOBAL_RESOURCES
     154            2 :                     .dirty_bytes
     155            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     156            2 :                 old - delta
     157              :             }
     158              :         };
     159              : 
     160              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     161              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     162              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     163              :         // be literally the last update.
     164         1138 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     165         1138 : 
     166         1138 :         self.dirty_bytes = size;
     167         1138 : 
     168         1138 :         let max_dirty_bytes = GLOBAL_RESOURCES
     169         1138 :             .max_dirty_bytes
     170         1138 :             .load(AtomicOrdering::Relaxed);
     171         1138 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     172              :             // Set the layer file limit to the average layer size: this implies that all above-average
     173              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     174              :             // next enter publish_size.
     175            0 :             Some(
     176            0 :                 new_global_dirty_bytes
     177            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     178            0 :             )
     179              :         } else {
     180         1138 :             None
     181              :         }
     182         1138 :     }
     183              : 
     184              :     // Call publish_size if the input size differs from last published size by more than
     185              :     // the drift limit
     186      5090546 :     fn maybe_publish_size(&mut self, size: u64) {
     187      5090546 :         let publish = match size.cmp(&self.dirty_bytes) {
     188            0 :             Ordering::Equal => false,
     189      5090546 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     190            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     191              :         };
     192              : 
     193      5090546 :         if publish {
     194           10 :             self.publish_size(size);
     195      5090536 :         }
     196      5090546 :     }
     197              : }
     198              : 
     199              : impl Drop for GlobalResourceUnits {
     200         1128 :     fn drop(&mut self) {
     201         1128 :         GLOBAL_RESOURCES
     202         1128 :             .dirty_layers
     203         1128 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     204         1128 : 
     205         1128 :         // Subtract our contribution to the global total dirty bytes
     206         1128 :         self.publish_size(0);
     207         1128 :     }
     208              : }
     209              : 
     210              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     211              :     max_dirty_bytes: AtomicU64::new(0),
     212              :     dirty_bytes: AtomicU64::new(0),
     213              :     dirty_layers: AtomicUsize::new(0),
     214              : };
     215              : 
     216              : impl InMemoryLayer {
     217       606197 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     218       606197 :         self.file_id
     219       606197 :     }
     220              : 
     221         1126 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     222         1126 :         self.timeline_id
     223         1126 :     }
     224              : 
     225            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     226            0 :         let lsn_start = self.start_lsn;
     227              : 
     228            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     229            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     230              :         } else {
     231            0 :             InMemoryLayerInfo::Open { lsn_start }
     232              :         }
     233            0 :     }
     234              : 
     235            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     236            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     237            0 :     }
     238              : 
     239      5090546 :     pub(crate) fn assert_writable(&self) {
     240      5090546 :         assert!(self.end_lsn.get().is_none());
     241      5090546 :     }
     242              : 
     243      1519345 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     244      1519345 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     245      1519345 :     }
     246              : 
     247      1518219 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     248      1518219 :         self.start_lsn..self.end_lsn_or_max()
     249      1518219 :     }
     250              : 
     251            0 :     pub(crate) fn local_path_str(&self) -> &Arc<str> {
     252            0 :         self.frozen_local_path_str
     253            0 :             .get()
     254            0 :             .unwrap_or(&self.local_path_str)
     255            0 :     }
     256              : 
     257              :     /// debugging function to print out the contents of the layer
     258              :     ///
     259              :     /// this is likely completly unused
     260            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     261            0 :         let inner = self.inner.read().await;
     262              : 
     263            0 :         let end_str = self.end_lsn_or_max();
     264            0 : 
     265            0 :         println!(
     266            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     267            0 :             self.timeline_id, self.start_lsn, end_str,
     268            0 :         );
     269            0 : 
     270            0 :         if !verbose {
     271            0 :             return Ok(());
     272            0 :         }
     273            0 : 
     274            0 :         let cursor = inner.file.block_cursor();
     275            0 :         let mut buf = Vec::new();
     276            0 :         for (key, vec_map) in inner.index.iter() {
     277            0 :             for (lsn, pos) in vec_map.as_slice() {
     278            0 :                 let mut desc = String::new();
     279            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     280            0 :                 let val = Value::des(&buf);
     281            0 :                 match val {
     282            0 :                     Ok(Value::Image(img)) => {
     283            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     284              :                     }
     285            0 :                     Ok(Value::WalRecord(rec)) => {
     286            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     287            0 :                         write!(
     288            0 :                             &mut desc,
     289            0 :                             " rec {} bytes will_init: {} {}",
     290            0 :                             buf.len(),
     291            0 :                             rec.will_init(),
     292            0 :                             wal_desc
     293            0 :                         )?;
     294              :                     }
     295            0 :                     Err(err) => {
     296            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     297              :                     }
     298              :                 }
     299            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     300              :             }
     301              :         }
     302              : 
     303            0 :         Ok(())
     304            0 :     }
     305              : 
     306              :     /// Look up given value in the layer.
     307            0 :     pub(crate) async fn get_value_reconstruct_data(
     308            0 :         &self,
     309            0 :         key: Key,
     310            0 :         lsn_range: Range<Lsn>,
     311            0 :         reconstruct_state: &mut ValueReconstructState,
     312            0 :         ctx: &RequestContext,
     313            0 :     ) -> anyhow::Result<ValueReconstructResult> {
     314            0 :         ensure!(lsn_range.start >= self.start_lsn);
     315            0 :         let mut need_image = true;
     316            0 : 
     317            0 :         let ctx = RequestContextBuilder::extend(ctx)
     318            0 :             .page_content_kind(PageContentKind::InMemoryLayer)
     319            0 :             .build();
     320              : 
     321            0 :         let inner = self.inner.read().await;
     322              : 
     323            0 :         let reader = inner.file.block_cursor();
     324              : 
     325              :         // Scan the page versions backwards, starting from `lsn`.
     326            0 :         if let Some(vec_map) = inner.index.get(&key) {
     327            0 :             let slice = vec_map.slice_range(lsn_range);
     328            0 :             for (entry_lsn, pos) in slice.iter().rev() {
     329            0 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     330            0 :                 let value = Value::des(&buf)?;
     331            0 :                 match value {
     332            0 :                     Value::Image(img) => {
     333            0 :                         reconstruct_state.img = Some((*entry_lsn, img));
     334            0 :                         return Ok(ValueReconstructResult::Complete);
     335              :                     }
     336            0 :                     Value::WalRecord(rec) => {
     337            0 :                         let will_init = rec.will_init();
     338            0 :                         reconstruct_state.records.push((*entry_lsn, rec));
     339            0 :                         if will_init {
     340              :                             // This WAL record initializes the page, so no need to go further back
     341            0 :                             need_image = false;
     342            0 :                             break;
     343            0 :                         }
     344              :                     }
     345              :                 }
     346              :             }
     347            0 :         }
     348              : 
     349              :         // release lock on 'inner'
     350              : 
     351              :         // If an older page image is needed to reconstruct the page, let the
     352              :         // caller know.
     353            0 :         if need_image {
     354            0 :             Ok(ValueReconstructResult::Continue)
     355              :         } else {
     356            0 :             Ok(ValueReconstructResult::Complete)
     357              :         }
     358            0 :     }
     359              : 
     360              :     // Look up the keys in the provided keyspace and update
     361              :     // the reconstruct state with whatever is found.
     362              :     //
     363              :     // If the key is cached, go no further than the cached Lsn.
     364       606197 :     pub(crate) async fn get_values_reconstruct_data(
     365       606197 :         &self,
     366       606197 :         keyspace: KeySpace,
     367       606197 :         end_lsn: Lsn,
     368       606197 :         reconstruct_state: &mut ValuesReconstructState,
     369       606197 :         ctx: &RequestContext,
     370       606197 :     ) -> Result<(), GetVectoredError> {
     371       606197 :         let ctx = RequestContextBuilder::extend(ctx)
     372       606197 :             .page_content_kind(PageContentKind::InMemoryLayer)
     373       606197 :             .build();
     374              : 
     375       606197 :         let inner = self.inner.read().await;
     376       606197 :         let reader = inner.file.block_cursor();
     377              : 
     378       606197 :         for range in keyspace.ranges.iter() {
     379       606197 :             for (key, vec_map) in inner.index.range(range.start..range.end) {
     380       499107 :                 let lsn_range = match reconstruct_state.get_cached_lsn(key) {
     381            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     382       499107 :                     None => self.start_lsn..end_lsn,
     383              :                 };
     384              : 
     385       499107 :                 let slice = vec_map.slice_range(lsn_range);
     386              : 
     387       499117 :                 for (entry_lsn, pos) in slice.iter().rev() {
     388              :                     // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183
     389       499117 :                     let buf = reader.read_blob(*pos, &ctx).await;
     390       499117 :                     if let Err(e) = buf {
     391            0 :                         reconstruct_state
     392            0 :                             .on_key_error(*key, PageReconstructError::from(anyhow!(e)));
     393            0 :                         break;
     394       499117 :                     }
     395       499117 : 
     396       499117 :                     let value = Value::des(&buf.unwrap());
     397       499117 :                     if let Err(e) = value {
     398            0 :                         reconstruct_state
     399            0 :                             .on_key_error(*key, PageReconstructError::from(anyhow!(e)));
     400            0 :                         break;
     401       499117 :                     }
     402       499117 : 
     403       499117 :                     let key_situation =
     404       499117 :                         reconstruct_state.update_key(key, *entry_lsn, value.unwrap());
     405       499117 :                     if key_situation == ValueReconstructSituation::Complete {
     406       499097 :                         break;
     407           20 :                     }
     408              :                 }
     409              :             }
     410              :         }
     411              : 
     412       606197 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     413       606197 : 
     414       606197 :         Ok(())
     415       606197 :     }
     416              : }
     417              : 
     418         3508 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     419         3508 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     420         3508 : }
     421              : 
     422         2382 : fn inmem_layer_log_display(
     423         2382 :     mut f: impl Write,
     424         2382 :     timeline: TimelineId,
     425         2382 :     start_lsn: Lsn,
     426         2382 :     end_lsn: Lsn,
     427         2382 : ) -> std::fmt::Result {
     428         2382 :     write!(f, "timeline {} in-memory ", timeline)?;
     429         2382 :     inmem_layer_display(f, start_lsn, end_lsn)
     430         2382 : }
     431              : 
     432              : impl std::fmt::Display for InMemoryLayer {
     433         1126 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     434         1126 :         let end_lsn = self.end_lsn_or_max();
     435         1126 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     436         1126 :     }
     437              : }
     438              : 
     439              : impl InMemoryLayer {
     440              :     /// Get layer size.
     441         1256 :     pub async fn size(&self) -> Result<u64> {
     442         1256 :         let inner = self.inner.read().await;
     443         1256 :         Ok(inner.file.len())
     444         1256 :     }
     445              : 
     446              :     /// Create a new, empty, in-memory layer
     447         1256 :     pub async fn create(
     448         1256 :         conf: &'static PageServerConf,
     449         1256 :         timeline_id: TimelineId,
     450         1256 :         tenant_shard_id: TenantShardId,
     451         1256 :         start_lsn: Lsn,
     452         1256 :         ctx: &RequestContext,
     453         1256 :     ) -> Result<InMemoryLayer> {
     454         1256 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     455              : 
     456         1256 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
     457         1256 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     458         1256 : 
     459         1256 :         Ok(InMemoryLayer {
     460         1256 :             file_id: key,
     461         1256 :             local_path_str: {
     462         1256 :                 let mut buf = String::new();
     463         1256 :                 inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
     464         1256 :                 buf.into()
     465         1256 :             },
     466         1256 :             frozen_local_path_str: OnceLock::new(),
     467         1256 :             conf,
     468         1256 :             timeline_id,
     469         1256 :             tenant_shard_id,
     470         1256 :             start_lsn,
     471         1256 :             end_lsn: OnceLock::new(),
     472         1256 :             opened_at: Instant::now(),
     473         1256 :             inner: RwLock::new(InMemoryLayerInner {
     474         1256 :                 index: BTreeMap::new(),
     475         1256 :                 file,
     476         1256 :                 resource_units: GlobalResourceUnits::new(),
     477         1256 :             }),
     478         1256 :         })
     479         1256 :     }
     480              : 
     481              :     // Write operations
     482              : 
     483              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     484              :     /// Adds the page version to the in-memory tree
     485              : 
     486      5090546 :     pub(crate) async fn put_value(
     487      5090546 :         &self,
     488      5090546 :         key: Key,
     489      5090546 :         lsn: Lsn,
     490      5090546 :         buf: &[u8],
     491      5090546 :         ctx: &RequestContext,
     492      5090546 :     ) -> Result<()> {
     493      5090546 :         let mut inner = self.inner.write().await;
     494      5090546 :         self.assert_writable();
     495      5090546 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     496      5090546 :     }
     497              : 
     498      5090546 :     async fn put_value_locked(
     499      5090546 :         &self,
     500      5090546 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     501      5090546 :         key: Key,
     502      5090546 :         lsn: Lsn,
     503      5090546 :         buf: &[u8],
     504      5090546 :         ctx: &RequestContext,
     505      5090546 :     ) -> Result<()> {
     506      5090546 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     507              : 
     508      5090546 :         let off = {
     509      5090546 :             locked_inner
     510      5090546 :                 .file
     511      5090546 :                 .write_blob(
     512      5090546 :                     buf,
     513      5090546 :                     &RequestContextBuilder::extend(ctx)
     514      5090546 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     515      5090546 :                         .build(),
     516      5090546 :                 )
     517         3320 :                 .await?
     518              :         };
     519              : 
     520      5090546 :         let vec_map = locked_inner.index.entry(key).or_default();
     521      5090546 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     522      5090546 :         if old.is_some() {
     523              :             // We already had an entry for this LSN. That's odd..
     524            0 :             warn!("Key {} at {} already exists", key, lsn);
     525      5090546 :         }
     526              : 
     527      5090546 :         let size = locked_inner.file.len();
     528      5090546 :         locked_inner.resource_units.maybe_publish_size(size);
     529      5090546 : 
     530      5090546 :         Ok(())
     531      5090546 :     }
     532              : 
     533      4803026 :     pub(crate) fn get_opened_at(&self) -> Instant {
     534      4803026 :         self.opened_at
     535      4803026 :     }
     536              : 
     537            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     538            0 :         let mut inner = self.inner.write().await;
     539            0 :         let size = inner.file.len();
     540            0 :         inner.resource_units.publish_size(size)
     541            0 :     }
     542              : 
     543            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     544            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     545            2 :         Ok(())
     546            2 :     }
     547              : 
     548              :     /// Records the end_lsn for non-dropped layers.
     549              :     /// `end_lsn` is exclusive
     550         1126 :     pub async fn freeze(&self, end_lsn: Lsn) {
     551         1126 :         let inner = self.inner.write().await;
     552              : 
     553         1126 :         assert!(
     554         1126 :             self.start_lsn < end_lsn,
     555            0 :             "{} >= {}",
     556              :             self.start_lsn,
     557              :             end_lsn
     558              :         );
     559         1126 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     560         1126 : 
     561         1126 :         self.frozen_local_path_str
     562         1126 :             .set({
     563         1126 :                 let mut buf = String::new();
     564         1126 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     565         1126 :                     .unwrap();
     566         1126 :                 buf.into()
     567         1126 :             })
     568         1126 :             .expect("frozen_local_path_str set only once");
     569              : 
     570      4255938 :         for vec_map in inner.index.values() {
     571      4386780 :             for (lsn, _pos) in vec_map.as_slice() {
     572      4386780 :                 assert!(*lsn < end_lsn);
     573              :             }
     574              :         }
     575         1126 :     }
     576              : 
     577              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     578              :     /// layer will only contain the key range the user specifies, and may return `None`
     579              :     /// if there are no matching keys.
     580              :     ///
     581              :     /// Returns a new delta layer with all the same data as this in-memory layer
     582         1126 :     pub(crate) async fn write_to_disk(
     583         1126 :         &self,
     584         1126 :         timeline: &Arc<Timeline>,
     585         1126 :         ctx: &RequestContext,
     586         1126 :         key_range: Option<Range<Key>>,
     587         1126 :     ) -> Result<Option<ResidentLayer>> {
     588              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     589              :         // layer is not writeable anymore, no one should be trying to acquire the
     590              :         // write lock on it, so we shouldn't block anyone. There's one exception
     591              :         // though: another thread might have grabbed a reference to this layer
     592              :         // in `get_layer_for_write' just before the checkpointer called
     593              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     594              :         // lock, it will see that it's not writeable anymore and retry, but it
     595              :         // would have to wait until we release it. That race condition is very
     596              :         // rare though, so we just accept the potential latency hit for now.
     597         1126 :         let inner = self.inner.read().await;
     598              : 
     599         1126 :         let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
     600              :         use l0_flush::Inner;
     601         1126 :         let _concurrency_permit = match &*l0_flush_global_state {
     602            0 :             Inner::PageCached => None,
     603         1126 :             Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
     604              :         };
     605              : 
     606         1126 :         let end_lsn = *self.end_lsn.get().unwrap();
     607              : 
     608         1126 :         let key_count = if let Some(key_range) = key_range {
     609          158 :             inner
     610          158 :                 .index
     611          158 :                 .iter()
     612         1262 :                 .filter(|(k, _)| key_range.contains(k))
     613          158 :                 .count()
     614              :         } else {
     615          968 :             inner.index.len()
     616              :         };
     617         1126 :         if key_count == 0 {
     618          158 :             return Ok(None);
     619          968 :         }
     620              : 
     621          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     622          968 :             self.conf,
     623          968 :             self.timeline_id,
     624          968 :             self.tenant_shard_id,
     625          968 :             Key::MIN,
     626          968 :             self.start_lsn..end_lsn,
     627          968 :             ctx,
     628          968 :         )
     629          491 :         .await?;
     630              : 
     631          968 :         match &*l0_flush_global_state {
     632              :             l0_flush::Inner::PageCached => {
     633            0 :                 let ctx = RequestContextBuilder::extend(ctx)
     634            0 :                     .page_content_kind(PageContentKind::InMemoryLayer)
     635            0 :                     .build();
     636            0 : 
     637            0 :                 let mut buf = Vec::new();
     638            0 : 
     639            0 :                 let cursor = inner.file.block_cursor();
     640              : 
     641            0 :                 for (key, vec_map) in inner.index.iter() {
     642              :                     // Write all page versions
     643            0 :                     for (lsn, pos) in vec_map.as_slice() {
     644            0 :                         cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     645            0 :                         let will_init = Value::des(&buf)?.will_init();
     646              :                         let res;
     647            0 :                         (buf, res) = delta_layer_writer
     648            0 :                             .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
     649            0 :                             .await;
     650            0 :                         res?;
     651              :                     }
     652              :                 }
     653              :             }
     654              :             l0_flush::Inner::Direct { .. } => {
     655          968 :                 let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
     656          968 :                 assert_eq!(
     657          968 :                     file_contents.len() % PAGE_SZ,
     658              :                     0,
     659            0 :                     "needed by BlockReaderRef::Slice"
     660              :                 );
     661          968 :                 assert_eq!(file_contents.len(), {
     662          968 :                     let written = usize::try_from(inner.file.len()).unwrap();
     663          968 :                     if written % PAGE_SZ == 0 {
     664            0 :                         written
     665              :                     } else {
     666          968 :                         written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap()
     667              :                     }
     668              :                 });
     669              : 
     670          968 :                 let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents));
     671          968 : 
     672          968 :                 let mut buf = Vec::new();
     673              : 
     674      4254676 :                 for (key, vec_map) in inner.index.iter() {
     675              :                     // Write all page versions
     676      4385518 :                     for (lsn, pos) in vec_map.as_slice() {
     677              :                         // TODO: once we have blob lengths in the in-memory index, we can
     678              :                         // 1. get rid of the blob_io / BlockReaderRef::Slice business and
     679              :                         // 2. load the file contents into a Bytes and
     680              :                         // 3. the use `Bytes::slice` to get the `buf` that is our blob
     681              :                         // 4. pass that `buf` into `put_value_bytes`
     682              :                         // => https://github.com/neondatabase/neon/issues/8183
     683      4385518 :                         cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     684      4385518 :                         let will_init = Value::des(&buf)?.will_init();
     685              :                         let res;
     686      4385518 :                         (buf, res) = delta_layer_writer
     687      4385518 :                             .put_value_bytes(*key, *lsn, buf, will_init, ctx)
     688         2728 :                             .await;
     689      4385518 :                         res?;
     690              :                     }
     691              :                 }
     692              :             }
     693              :         }
     694              : 
     695              :         // MAX is used here because we identify L0 layers by full key range
     696         6657 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
     697              : 
     698              :         // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
     699              :         //
     700              :         // If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
     701              :         // the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
     702              :         // Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
     703              :         //
     704              :         // We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
     705              :         // we dirtied when writing to the filesystem have been flushed and marked !dirty.
     706          968 :         drop(_concurrency_permit);
     707          968 : 
     708          968 :         Ok(Some(delta_layer))
     709         1126 :     }
     710              : }
        

Generated by: LCOV version 2.1-beta