LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 60.3 % 363 219
Test Date: 2024-04-18 15:32:49 Functions: 65.1 % 43 28

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::ValueReconstructResult;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::{PageReconstructError, Timeline};
      15              : use crate::{page_cache, walrecord};
      16              : use anyhow::{anyhow, ensure, Result};
      17              : use pageserver_api::keyspace::KeySpace;
      18              : use pageserver_api::models::InMemoryLayerInfo;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::collections::{BinaryHeap, HashMap, HashSet};
      21              : use std::sync::{Arc, OnceLock};
      22              : use std::time::Instant;
      23              : use tracing::*;
      24              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      25              : // avoid binding to Write (conflicts with std::io::Write)
      26              : // while being able to use std::fmt::Write's methods
      27              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      28              : use std::cmp::Ordering;
      29              : use std::fmt::Write as _;
      30              : use std::ops::Range;
      31              : use std::sync::atomic::Ordering as AtomicOrdering;
      32              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      33              : use tokio::sync::{RwLock, RwLockWriteGuard};
      34              : 
      35              : use super::{
      36              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      37              :     ValuesReconstructState,
      38              : };
      39              : 
      40              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      41              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      42              : 
      43              : pub struct InMemoryLayer {
      44              :     conf: &'static PageServerConf,
      45              :     tenant_shard_id: TenantShardId,
      46              :     timeline_id: TimelineId,
      47              :     file_id: InMemoryLayerFileId,
      48              : 
      49              :     /// This layer contains all the changes from 'start_lsn'. The
      50              :     /// start is inclusive.
      51              :     start_lsn: Lsn,
      52              : 
      53              :     /// Frozen layers have an exclusive end LSN.
      54              :     /// Writes are only allowed when this is `None`.
      55              :     end_lsn: OnceLock<Lsn>,
      56              : 
      57              :     opened_at: Instant,
      58              : 
      59              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      60              :     /// All other changing parts are in `inner`, and protected by a mutex.
      61              :     inner: RwLock<InMemoryLayerInner>,
      62              : }
      63              : 
      64              : impl std::fmt::Debug for InMemoryLayer {
      65            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      66            0 :         f.debug_struct("InMemoryLayer")
      67            0 :             .field("start_lsn", &self.start_lsn)
      68            0 :             .field("end_lsn", &self.end_lsn)
      69            0 :             .field("inner", &self.inner)
      70            0 :             .finish()
      71            0 :     }
      72              : }
      73              : 
      74              : pub struct InMemoryLayerInner {
      75              :     /// All versions of all pages in the layer are kept here.  Indexed
      76              :     /// by block number and LSN. The value is an offset into the
      77              :     /// ephemeral file where the page version is stored.
      78              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      79              : 
      80              :     /// The values are stored in a serialized format in this file.
      81              :     /// Each serialized Value is preceded by a 'u32' length field.
      82              :     /// PerSeg::page_versions map stores offsets into this file.
      83              :     file: EphemeralFile,
      84              : 
      85              :     resource_units: GlobalResourceUnits,
      86              : }
      87              : 
      88              : impl std::fmt::Debug for InMemoryLayerInner {
      89            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      90            0 :         f.debug_struct("InMemoryLayerInner").finish()
      91            0 :     }
      92              : }
      93              : 
      94              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
      95              : /// to minimize contention.
      96              : ///
      97              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
      98              : /// rolling layers proactively to limit the total amount of dirty data.
      99              : pub(crate) struct GlobalResources {
     100              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     101              :     // Zero means unlimited.
     102              :     pub(crate) max_dirty_bytes: AtomicU64,
     103              :     // How many bytes are in all EphemeralFile objects
     104              :     dirty_bytes: AtomicU64,
     105              :     // How many layers are contributing to dirty_bytes
     106              :     dirty_layers: AtomicUsize,
     107              : }
     108              : 
     109              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     110              : struct GlobalResourceUnits {
     111              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     112              :     // for decrementing the global counter by this many bytes when dropped.
     113              :     dirty_bytes: u64,
     114              : }
     115              : 
     116              : impl GlobalResourceUnits {
     117              :     // Hint for the layer append path to update us when the layer size differs from the last
     118              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     119              :     // updated when the Timeline "ticks" in the background.
     120              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     121              : 
     122          812 :     fn new() -> Self {
     123          812 :         GLOBAL_RESOURCES
     124          812 :             .dirty_layers
     125          812 :             .fetch_add(1, AtomicOrdering::Relaxed);
     126          812 :         Self { dirty_bytes: 0 }
     127          812 :     }
     128              : 
     129              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     130              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     131              :     ///
     132              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     133              :     /// the total number of dirty bytes below the configured maximum.
     134          704 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     135          704 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     136          692 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     137              :             Ordering::Greater => {
     138           10 :                 let delta = size - self.dirty_bytes;
     139           10 :                 let old = GLOBAL_RESOURCES
     140           10 :                     .dirty_bytes
     141           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     142           10 :                 old + delta
     143              :             }
     144              :             Ordering::Less => {
     145            2 :                 let delta = self.dirty_bytes - size;
     146            2 :                 let old = GLOBAL_RESOURCES
     147            2 :                     .dirty_bytes
     148            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     149            2 :                 old - delta
     150              :             }
     151              :         };
     152              : 
     153              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     154              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     155              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     156              :         // be literally the last update.
     157          704 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     158          704 : 
     159          704 :         self.dirty_bytes = size;
     160          704 : 
     161          704 :         let max_dirty_bytes = GLOBAL_RESOURCES
     162          704 :             .max_dirty_bytes
     163          704 :             .load(AtomicOrdering::Relaxed);
     164          704 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     165              :             // Set the layer file limit to the average layer size: this implies that all above-average
     166              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     167              :             // next enter publish_size.
     168            0 :             Some(
     169            0 :                 new_global_dirty_bytes
     170            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     171            0 :             )
     172              :         } else {
     173          704 :             None
     174              :         }
     175          704 :     }
     176              : 
     177              :     // Call publish_size if the input size differs from last published size by more than
     178              :     // the drift limit
     179      3933972 :     fn maybe_publish_size(&mut self, size: u64) {
     180      3933972 :         let publish = match size.cmp(&self.dirty_bytes) {
     181            0 :             Ordering::Equal => false,
     182      3933972 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     183            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     184              :         };
     185              : 
     186      3933972 :         if publish {
     187           10 :             self.publish_size(size);
     188      3933962 :         }
     189      3933972 :     }
     190              : }
     191              : 
     192              : impl Drop for GlobalResourceUnits {
     193          694 :     fn drop(&mut self) {
     194          694 :         GLOBAL_RESOURCES
     195          694 :             .dirty_layers
     196          694 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     197          694 : 
     198          694 :         // Subtract our contribution to the global total dirty bytes
     199          694 :         self.publish_size(0);
     200          694 :     }
     201              : }
     202              : 
     203              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     204              :     max_dirty_bytes: AtomicU64::new(0),
     205              :     dirty_bytes: AtomicU64::new(0),
     206              :     dirty_layers: AtomicUsize::new(0),
     207              : };
     208              : 
     209              : impl InMemoryLayer {
     210            0 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     211            0 :         self.file_id
     212            0 :     }
     213              : 
     214            4 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     215            4 :         self.timeline_id
     216            4 :     }
     217              : 
     218            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     219            0 :         let lsn_start = self.start_lsn;
     220              : 
     221            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     222            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     223              :         } else {
     224            0 :             InMemoryLayerInfo::Open { lsn_start }
     225              :         }
     226            0 :     }
     227              : 
     228            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     229            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     230            0 :     }
     231              : 
     232      3933972 :     pub(crate) fn assert_writable(&self) {
     233      3933972 :         assert!(self.end_lsn.get().is_none());
     234      3933972 :     }
     235              : 
     236      4205915 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     237      4205915 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     238      4205915 :     }
     239              : 
     240      4205219 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     241      4205219 :         self.start_lsn..self.end_lsn_or_max()
     242      4205219 :     }
     243              : 
     244              :     /// debugging function to print out the contents of the layer
     245              :     ///
     246              :     /// this is likely completly unused
     247            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     248            0 :         let inner = self.inner.read().await;
     249              : 
     250            0 :         let end_str = self.end_lsn_or_max();
     251            0 : 
     252            0 :         println!(
     253            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     254            0 :             self.timeline_id, self.start_lsn, end_str,
     255            0 :         );
     256            0 : 
     257            0 :         if !verbose {
     258            0 :             return Ok(());
     259            0 :         }
     260            0 : 
     261            0 :         let cursor = inner.file.block_cursor();
     262            0 :         let mut buf = Vec::new();
     263            0 :         for (key, vec_map) in inner.index.iter() {
     264            0 :             for (lsn, pos) in vec_map.as_slice() {
     265            0 :                 let mut desc = String::new();
     266            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     267            0 :                 let val = Value::des(&buf);
     268            0 :                 match val {
     269            0 :                     Ok(Value::Image(img)) => {
     270            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     271              :                     }
     272            0 :                     Ok(Value::WalRecord(rec)) => {
     273            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     274            0 :                         write!(
     275            0 :                             &mut desc,
     276            0 :                             " rec {} bytes will_init: {} {}",
     277            0 :                             buf.len(),
     278            0 :                             rec.will_init(),
     279            0 :                             wal_desc
     280            0 :                         )?;
     281              :                     }
     282            0 :                     Err(err) => {
     283            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     284              :                     }
     285              :                 }
     286            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     287              :             }
     288              :         }
     289              : 
     290            0 :         Ok(())
     291            0 :     }
     292              : 
     293              :     /// Look up given value in the layer.
     294       503455 :     pub(crate) async fn get_value_reconstruct_data(
     295       503455 :         &self,
     296       503455 :         key: Key,
     297       503455 :         lsn_range: Range<Lsn>,
     298       503455 :         reconstruct_state: &mut ValueReconstructState,
     299       503455 :         ctx: &RequestContext,
     300       503455 :     ) -> anyhow::Result<ValueReconstructResult> {
     301       503455 :         ensure!(lsn_range.start >= self.start_lsn);
     302       503455 :         let mut need_image = true;
     303       503455 : 
     304       503455 :         let ctx = RequestContextBuilder::extend(ctx)
     305       503455 :             .page_content_kind(PageContentKind::InMemoryLayer)
     306       503455 :             .build();
     307              : 
     308       503455 :         let inner = self.inner.read().await;
     309              : 
     310       503455 :         let reader = inner.file.block_cursor();
     311              : 
     312              :         // Scan the page versions backwards, starting from `lsn`.
     313       503455 :         if let Some(vec_map) = inner.index.get(&key) {
     314       430228 :             let slice = vec_map.slice_range(lsn_range);
     315       430234 :             for (entry_lsn, pos) in slice.iter().rev() {
     316       430234 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     317       430234 :                 let value = Value::des(&buf)?;
     318       430234 :                 match value {
     319       430224 :                     Value::Image(img) => {
     320       430224 :                         reconstruct_state.img = Some((*entry_lsn, img));
     321       430224 :                         return Ok(ValueReconstructResult::Complete);
     322              :                     }
     323           10 :                     Value::WalRecord(rec) => {
     324           10 :                         let will_init = rec.will_init();
     325           10 :                         reconstruct_state.records.push((*entry_lsn, rec));
     326           10 :                         if will_init {
     327              :                             // This WAL record initializes the page, so no need to go further back
     328            0 :                             need_image = false;
     329            0 :                             break;
     330           10 :                         }
     331              :                     }
     332              :                 }
     333              :             }
     334        73227 :         }
     335              : 
     336              :         // release lock on 'inner'
     337              : 
     338              :         // If an older page image is needed to reconstruct the page, let the
     339              :         // caller know.
     340        73231 :         if need_image {
     341        73231 :             Ok(ValueReconstructResult::Continue)
     342              :         } else {
     343            0 :             Ok(ValueReconstructResult::Complete)
     344              :         }
     345       503455 :     }
     346              : 
     347              :     // Look up the keys in the provided keyspace and update
     348              :     // the reconstruct state with whatever is found.
     349              :     //
     350              :     // If the key is cached, go no further than the cached Lsn.
     351            0 :     pub(crate) async fn get_values_reconstruct_data(
     352            0 :         &self,
     353            0 :         keyspace: KeySpace,
     354            0 :         end_lsn: Lsn,
     355            0 :         reconstruct_state: &mut ValuesReconstructState,
     356            0 :         ctx: &RequestContext,
     357            0 :     ) -> Result<(), GetVectoredError> {
     358            0 :         let ctx = RequestContextBuilder::extend(ctx)
     359            0 :             .page_content_kind(PageContentKind::InMemoryLayer)
     360            0 :             .build();
     361              : 
     362            0 :         let inner = self.inner.read().await;
     363            0 :         let reader = inner.file.block_cursor();
     364            0 : 
     365            0 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     366            0 :         struct BlockRead {
     367            0 :             key: Key,
     368            0 :             lsn: Lsn,
     369            0 :             block_offset: u64,
     370            0 :         }
     371            0 : 
     372            0 :         let mut planned_block_reads = BinaryHeap::new();
     373              : 
     374            0 :         for range in keyspace.ranges.iter() {
     375            0 :             let mut key = range.start;
     376            0 :             while key < range.end {
     377            0 :                 if let Some(vec_map) = inner.index.get(&key) {
     378            0 :                     let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     379            0 :                         Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     380            0 :                         None => self.start_lsn..end_lsn,
     381              :                     };
     382              : 
     383            0 :                     let slice = vec_map.slice_range(lsn_range);
     384            0 :                     for (entry_lsn, pos) in slice.iter().rev() {
     385            0 :                         planned_block_reads.push(BlockRead {
     386            0 :                             key,
     387            0 :                             lsn: *entry_lsn,
     388            0 :                             block_offset: *pos,
     389            0 :                         });
     390            0 :                     }
     391            0 :                 }
     392              : 
     393            0 :                 key = key.next();
     394              :             }
     395              :         }
     396              : 
     397            0 :         let keyspace_size = keyspace.total_size();
     398            0 : 
     399            0 :         let mut completed_keys = HashSet::new();
     400            0 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     401            0 :             let block_read = planned_block_reads.pop().unwrap();
     402            0 :             if completed_keys.contains(&block_read.key) {
     403            0 :                 continue;
     404            0 :             }
     405              : 
     406            0 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     407            0 :             if let Err(e) = buf {
     408            0 :                 reconstruct_state
     409            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     410            0 :                 completed_keys.insert(block_read.key);
     411            0 :                 continue;
     412            0 :             }
     413            0 : 
     414            0 :             let value = Value::des(&buf.unwrap());
     415            0 :             if let Err(e) = value {
     416            0 :                 reconstruct_state
     417            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     418            0 :                 completed_keys.insert(block_read.key);
     419            0 :                 continue;
     420            0 :             }
     421            0 : 
     422            0 :             let key_situation =
     423            0 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     424            0 :             if key_situation == ValueReconstructSituation::Complete {
     425            0 :                 completed_keys.insert(block_read.key);
     426            0 :             }
     427              :         }
     428              : 
     429            0 :         Ok(())
     430            0 :     }
     431              : }
     432              : 
     433              : impl std::fmt::Display for InMemoryLayer {
     434          696 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     435          696 :         let end_lsn = self.end_lsn_or_max();
     436          696 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     437          696 :     }
     438              : }
     439              : 
     440              : impl InMemoryLayer {
     441              :     /// Get layer size.
     442      3648020 :     pub async fn size(&self) -> Result<u64> {
     443      3648020 :         let inner = self.inner.read().await;
     444      3648020 :         Ok(inner.file.len())
     445      3648020 :     }
     446              : 
     447              :     /// Create a new, empty, in-memory layer
     448          812 :     pub async fn create(
     449          812 :         conf: &'static PageServerConf,
     450          812 :         timeline_id: TimelineId,
     451          812 :         tenant_shard_id: TenantShardId,
     452          812 :         start_lsn: Lsn,
     453          812 :     ) -> Result<InMemoryLayer> {
     454          812 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     455              : 
     456          812 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     457          812 :         let key = InMemoryLayerFileId(file.id());
     458          812 : 
     459          812 :         Ok(InMemoryLayer {
     460          812 :             file_id: key,
     461          812 :             conf,
     462          812 :             timeline_id,
     463          812 :             tenant_shard_id,
     464          812 :             start_lsn,
     465          812 :             end_lsn: OnceLock::new(),
     466          812 :             opened_at: Instant::now(),
     467          812 :             inner: RwLock::new(InMemoryLayerInner {
     468          812 :                 index: HashMap::new(),
     469          812 :                 file,
     470          812 :                 resource_units: GlobalResourceUnits::new(),
     471          812 :             }),
     472          812 :         })
     473          812 :     }
     474              : 
     475              :     // Write operations
     476              : 
     477              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     478              :     /// Adds the page version to the in-memory tree
     479              : 
     480      3933972 :     pub(crate) async fn put_value(
     481      3933972 :         &self,
     482      3933972 :         key: Key,
     483      3933972 :         lsn: Lsn,
     484      3933972 :         buf: &[u8],
     485      3933972 :         ctx: &RequestContext,
     486      3933972 :     ) -> Result<()> {
     487      3933972 :         let mut inner = self.inner.write().await;
     488      3933972 :         self.assert_writable();
     489      3933972 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     490      3933972 :     }
     491              : 
     492      3933972 :     async fn put_value_locked(
     493      3933972 :         &self,
     494      3933972 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     495      3933972 :         key: Key,
     496      3933972 :         lsn: Lsn,
     497      3933972 :         buf: &[u8],
     498      3933972 :         ctx: &RequestContext,
     499      3933972 :     ) -> Result<()> {
     500      3933972 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     501              : 
     502      3933972 :         let off = {
     503      3933972 :             locked_inner
     504      3933972 :                 .file
     505      3933972 :                 .write_blob(
     506      3933972 :                     buf,
     507      3933972 :                     &RequestContextBuilder::extend(ctx)
     508      3933972 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     509      3933972 :                         .build(),
     510      3933972 :                 )
     511        22833 :                 .await?
     512              :         };
     513              : 
     514      3933972 :         let vec_map = locked_inner.index.entry(key).or_default();
     515      3933972 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     516      3933972 :         if old.is_some() {
     517              :             // We already had an entry for this LSN. That's odd..
     518            0 :             warn!("Key {} at {} already exists", key, lsn);
     519      3933972 :         }
     520              : 
     521      3933972 :         let size = locked_inner.file.len();
     522      3933972 :         locked_inner.resource_units.maybe_publish_size(size);
     523      3933972 : 
     524      3933972 :         Ok(())
     525      3933972 :     }
     526              : 
     527           56 :     pub(crate) fn get_opened_at(&self) -> Instant {
     528           56 :         self.opened_at
     529           56 :     }
     530              : 
     531            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     532            0 :         let mut inner = self.inner.write().await;
     533            0 :         let size = inner.file.len();
     534            0 :         inner.resource_units.publish_size(size)
     535            0 :     }
     536              : 
     537            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     538            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     539            2 :         Ok(())
     540            2 :     }
     541              : 
     542              :     /// Records the end_lsn for non-dropped layers.
     543              :     /// `end_lsn` is exclusive
     544          692 :     pub async fn freeze(&self, end_lsn: Lsn) {
     545          692 :         let inner = self.inner.write().await;
     546              : 
     547          692 :         assert!(
     548          692 :             self.start_lsn < end_lsn,
     549            0 :             "{} >= {}",
     550              :             self.start_lsn,
     551              :             end_lsn
     552              :         );
     553          692 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     554              : 
     555      3155479 :         for vec_map in inner.index.values() {
     556      3230232 :             for (lsn, _pos) in vec_map.as_slice() {
     557      3230232 :                 assert!(*lsn < end_lsn);
     558              :             }
     559              :         }
     560          692 :     }
     561              : 
     562              :     /// Write this frozen in-memory layer to disk.
     563              :     ///
     564              :     /// Returns a new delta layer with all the same data as this in-memory layer
     565          598 :     pub(crate) async fn write_to_disk(
     566          598 :         &self,
     567          598 :         timeline: &Arc<Timeline>,
     568          598 :         ctx: &RequestContext,
     569          598 :     ) -> Result<ResidentLayer> {
     570              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     571              :         // layer is not writeable anymore, no one should be trying to acquire the
     572              :         // write lock on it, so we shouldn't block anyone. There's one exception
     573              :         // though: another thread might have grabbed a reference to this layer
     574              :         // in `get_layer_for_write' just before the checkpointer called
     575              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     576              :         // lock, it will see that it's not writeable anymore and retry, but it
     577              :         // would have to wait until we release it. That race condition is very
     578              :         // rare though, so we just accept the potential latency hit for now.
     579          598 :         let inner = self.inner.read().await;
     580              : 
     581          598 :         let end_lsn = *self.end_lsn.get().unwrap();
     582              : 
     583          598 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     584          598 :             self.conf,
     585          598 :             self.timeline_id,
     586          598 :             self.tenant_shard_id,
     587          598 :             Key::MIN,
     588          598 :             self.start_lsn..end_lsn,
     589          598 :         )
     590          315 :         .await?;
     591              : 
     592          598 :         let mut buf = Vec::new();
     593          598 : 
     594          598 :         let cursor = inner.file.block_cursor();
     595          598 : 
     596          598 :         // Sort the keys because delta layer writer expects them sorted.
     597          598 :         //
     598          598 :         // NOTE: this sort can take up significant time if the layer has millions of
     599          598 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     600          598 :         //       keep the value as a reference.
     601      3154727 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     602     91220448 :         keys.sort_unstable_by_key(|k| k.0);
     603          598 : 
     604          598 :         let ctx = RequestContextBuilder::extend(ctx)
     605          598 :             .page_content_kind(PageContentKind::InMemoryLayer)
     606          598 :             .build();
     607      3154727 :         for (key, vec_map) in keys.iter() {
     608      3154727 :             let key = Key::from_i128(*key);
     609              :             // Write all page versions
     610      3229480 :             for (lsn, pos) in vec_map.as_slice() {
     611      3229480 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     612      3229480 :                 let will_init = Value::des(&buf)?.will_init();
     613              :                 let res;
     614      3229480 :                 (buf, res) = delta_layer_writer
     615      3229480 :                     .put_value_bytes(key, *lsn, buf, will_init)
     616        17889 :                     .await;
     617      3229480 :                 res?;
     618              :             }
     619              :         }
     620              : 
     621              :         // MAX is used here because we identify L0 layers by full key range
     622         4589 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     623          598 :         Ok(delta_layer)
     624          598 :     }
     625              : }
        

Generated by: LCOV version 2.1-beta