LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 59.9 % 359 215
Test Date: 2024-04-08 10:22:05 Functions: 64.3 % 42 27

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::ValueReconstructResult;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::{PageReconstructError, Timeline};
      15              : use crate::{page_cache, walrecord};
      16              : use anyhow::{anyhow, ensure, Result};
      17              : use pageserver_api::keyspace::KeySpace;
      18              : use pageserver_api::models::InMemoryLayerInfo;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::collections::{BinaryHeap, HashMap, HashSet};
      21              : use std::sync::{Arc, OnceLock};
      22              : use tracing::*;
      23              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      24              : // avoid binding to Write (conflicts with std::io::Write)
      25              : // while being able to use std::fmt::Write's methods
      26              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      27              : use std::cmp::Ordering;
      28              : use std::fmt::Write as _;
      29              : use std::ops::Range;
      30              : use std::sync::atomic::Ordering as AtomicOrdering;
      31              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      32              : use tokio::sync::{RwLock, RwLockWriteGuard};
      33              : 
      34              : use super::{
      35              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      36              :     ValuesReconstructState,
      37              : };
      38              : 
      39              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      40              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      41              : 
      42              : pub struct InMemoryLayer {
      43              :     conf: &'static PageServerConf,
      44              :     tenant_shard_id: TenantShardId,
      45              :     timeline_id: TimelineId,
      46              :     file_id: InMemoryLayerFileId,
      47              : 
      48              :     /// This layer contains all the changes from 'start_lsn'. The
      49              :     /// start is inclusive.
      50              :     start_lsn: Lsn,
      51              : 
      52              :     /// Frozen layers have an exclusive end LSN.
      53              :     /// Writes are only allowed when this is `None`.
      54              :     end_lsn: OnceLock<Lsn>,
      55              : 
      56              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      57              :     /// All other changing parts are in `inner`, and protected by a mutex.
      58              :     inner: RwLock<InMemoryLayerInner>,
      59              : }
      60              : 
      61              : impl std::fmt::Debug for InMemoryLayer {
      62            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      63            0 :         f.debug_struct("InMemoryLayer")
      64            0 :             .field("start_lsn", &self.start_lsn)
      65            0 :             .field("end_lsn", &self.end_lsn)
      66            0 :             .field("inner", &self.inner)
      67            0 :             .finish()
      68            0 :     }
      69              : }
      70              : 
      71              : pub struct InMemoryLayerInner {
      72              :     /// All versions of all pages in the layer are kept here.  Indexed
      73              :     /// by block number and LSN. The value is an offset into the
      74              :     /// ephemeral file where the page version is stored.
      75              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      76              : 
      77              :     /// The values are stored in a serialized format in this file.
      78              :     /// Each serialized Value is preceded by a 'u32' length field.
      79              :     /// PerSeg::page_versions map stores offsets into this file.
      80              :     file: EphemeralFile,
      81              : 
      82              :     resource_units: GlobalResourceUnits,
      83              : }
      84              : 
      85              : impl std::fmt::Debug for InMemoryLayerInner {
      86            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      87            0 :         f.debug_struct("InMemoryLayerInner").finish()
      88            0 :     }
      89              : }
      90              : 
      91              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
      92              : /// to minimize contention.
      93              : ///
      94              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
      95              : /// rolling layers proactively to limit the total amount of dirty data.
      96              : pub(crate) struct GlobalResources {
      97              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
      98              :     // Zero means unlimited.
      99              :     pub(crate) max_dirty_bytes: AtomicU64,
     100              :     // How many bytes are in all EphemeralFile objects
     101              :     dirty_bytes: AtomicU64,
     102              :     // How many layers are contributing to dirty_bytes
     103              :     dirty_layers: AtomicUsize,
     104              : }
     105              : 
     106              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     107              : struct GlobalResourceUnits {
     108              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     109              :     // for decrementing the global counter by this many bytes when dropped.
     110              :     dirty_bytes: u64,
     111              : }
     112              : 
     113              : impl GlobalResourceUnits {
     114              :     // Hint for the layer append path to update us when the layer size differs from the last
     115              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     116              :     // updated when the Timeline "ticks" in the background.
     117              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     118              : 
     119          804 :     fn new() -> Self {
     120          804 :         GLOBAL_RESOURCES
     121          804 :             .dirty_layers
     122          804 :             .fetch_add(1, AtomicOrdering::Relaxed);
     123          804 :         Self { dirty_bytes: 0 }
     124          804 :     }
     125              : 
     126              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     127              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     128              :     ///
     129              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     130              :     /// the total number of dirty bytes below the configured maximum.
     131          696 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     132          696 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     133          684 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     134              :             Ordering::Greater => {
     135           10 :                 let delta = size - self.dirty_bytes;
     136           10 :                 let old = GLOBAL_RESOURCES
     137           10 :                     .dirty_bytes
     138           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     139           10 :                 old + delta
     140              :             }
     141              :             Ordering::Less => {
     142            2 :                 let delta = self.dirty_bytes - size;
     143            2 :                 let old = GLOBAL_RESOURCES
     144            2 :                     .dirty_bytes
     145            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     146            2 :                 old - delta
     147              :             }
     148              :         };
     149              : 
     150              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     151              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     152              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     153              :         // be literally the last update.
     154          696 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     155          696 : 
     156          696 :         self.dirty_bytes = size;
     157          696 : 
     158          696 :         let max_dirty_bytes = GLOBAL_RESOURCES
     159          696 :             .max_dirty_bytes
     160          696 :             .load(AtomicOrdering::Relaxed);
     161          696 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     162              :             // Set the layer file limit to the average layer size: this implies that all above-average
     163              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     164              :             // next enter publish_size.
     165            0 :             Some(
     166            0 :                 new_global_dirty_bytes
     167            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     168            0 :             )
     169              :         } else {
     170          696 :             None
     171              :         }
     172          696 :     }
     173              : 
     174              :     // Call publish_size if the input size differs from last published size by more than
     175              :     // the drift limit
     176      3933926 :     fn maybe_publish_size(&mut self, size: u64) {
     177      3933926 :         let publish = match size.cmp(&self.dirty_bytes) {
     178            0 :             Ordering::Equal => false,
     179      3933926 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     180            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     181              :         };
     182              : 
     183      3933926 :         if publish {
     184           10 :             self.publish_size(size);
     185      3933916 :         }
     186      3933926 :     }
     187              : }
     188              : 
     189              : impl Drop for GlobalResourceUnits {
     190          686 :     fn drop(&mut self) {
     191          686 :         GLOBAL_RESOURCES
     192          686 :             .dirty_layers
     193          686 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     194          686 : 
     195          686 :         // Subtract our contribution to the global total dirty bytes
     196          686 :         self.publish_size(0);
     197          686 :     }
     198              : }
     199              : 
     200              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     201              :     max_dirty_bytes: AtomicU64::new(0),
     202              :     dirty_bytes: AtomicU64::new(0),
     203              :     dirty_layers: AtomicUsize::new(0),
     204              : };
     205              : 
     206              : impl InMemoryLayer {
     207            0 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     208            0 :         self.file_id
     209            0 :     }
     210              : 
     211            4 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     212            4 :         self.timeline_id
     213            4 :     }
     214              : 
     215            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     216            0 :         let lsn_start = self.start_lsn;
     217              : 
     218            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     219            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     220              :         } else {
     221            0 :             InMemoryLayerInfo::Open { lsn_start }
     222              :         }
     223            0 :     }
     224              : 
     225            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     226            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     227            0 :     }
     228              : 
     229      3933926 :     pub(crate) fn assert_writable(&self) {
     230      3933926 :         assert!(self.end_lsn.get().is_none());
     231      3933926 :     }
     232              : 
     233      4206503 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     234      4206503 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     235      4206503 :     }
     236              : 
     237      4205815 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     238      4205815 :         self.start_lsn..self.end_lsn_or_max()
     239      4205815 :     }
     240              : 
     241              :     /// debugging function to print out the contents of the layer
     242              :     ///
     243              :     /// this is likely completly unused
     244            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     245            0 :         let inner = self.inner.read().await;
     246              : 
     247            0 :         let end_str = self.end_lsn_or_max();
     248            0 : 
     249            0 :         println!(
     250            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     251            0 :             self.timeline_id, self.start_lsn, end_str,
     252            0 :         );
     253            0 : 
     254            0 :         if !verbose {
     255            0 :             return Ok(());
     256            0 :         }
     257            0 : 
     258            0 :         let cursor = inner.file.block_cursor();
     259            0 :         let mut buf = Vec::new();
     260            0 :         for (key, vec_map) in inner.index.iter() {
     261            0 :             for (lsn, pos) in vec_map.as_slice() {
     262            0 :                 let mut desc = String::new();
     263            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     264            0 :                 let val = Value::des(&buf);
     265            0 :                 match val {
     266            0 :                     Ok(Value::Image(img)) => {
     267            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     268              :                     }
     269            0 :                     Ok(Value::WalRecord(rec)) => {
     270            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     271            0 :                         write!(
     272            0 :                             &mut desc,
     273            0 :                             " rec {} bytes will_init: {} {}",
     274            0 :                             buf.len(),
     275            0 :                             rec.will_init(),
     276            0 :                             wal_desc
     277            0 :                         )?;
     278              :                     }
     279            0 :                     Err(err) => {
     280            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     281              :                     }
     282              :                 }
     283            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     284              :             }
     285              :         }
     286              : 
     287            0 :         Ok(())
     288            0 :     }
     289              : 
     290              :     /// Look up given value in the layer.
     291       503457 :     pub(crate) async fn get_value_reconstruct_data(
     292       503457 :         &self,
     293       503457 :         key: Key,
     294       503457 :         lsn_range: Range<Lsn>,
     295       503457 :         reconstruct_state: &mut ValueReconstructState,
     296       503457 :         ctx: &RequestContext,
     297       503457 :     ) -> anyhow::Result<ValueReconstructResult> {
     298       503457 :         ensure!(lsn_range.start >= self.start_lsn);
     299       503457 :         let mut need_image = true;
     300       503457 : 
     301       503457 :         let ctx = RequestContextBuilder::extend(ctx)
     302       503457 :             .page_content_kind(PageContentKind::InMemoryLayer)
     303       503457 :             .build();
     304              : 
     305       503457 :         let inner = self.inner.read().await;
     306              : 
     307       503457 :         let reader = inner.file.block_cursor();
     308              : 
     309              :         // Scan the page versions backwards, starting from `lsn`.
     310       503457 :         if let Some(vec_map) = inner.index.get(&key) {
     311       430035 :             let slice = vec_map.slice_range(lsn_range);
     312       430041 :             for (entry_lsn, pos) in slice.iter().rev() {
     313       430041 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     314       430041 :                 let value = Value::des(&buf)?;
     315       430041 :                 match value {
     316       430031 :                     Value::Image(img) => {
     317       430031 :                         reconstruct_state.img = Some((*entry_lsn, img));
     318       430031 :                         return Ok(ValueReconstructResult::Complete);
     319              :                     }
     320           10 :                     Value::WalRecord(rec) => {
     321           10 :                         let will_init = rec.will_init();
     322           10 :                         reconstruct_state.records.push((*entry_lsn, rec));
     323           10 :                         if will_init {
     324              :                             // This WAL record initializes the page, so no need to go further back
     325            0 :                             need_image = false;
     326            0 :                             break;
     327           10 :                         }
     328              :                     }
     329              :                 }
     330              :             }
     331        73422 :         }
     332              : 
     333              :         // release lock on 'inner'
     334              : 
     335              :         // If an older page image is needed to reconstruct the page, let the
     336              :         // caller know.
     337        73426 :         if need_image {
     338        73426 :             Ok(ValueReconstructResult::Continue)
     339              :         } else {
     340            0 :             Ok(ValueReconstructResult::Complete)
     341              :         }
     342       503457 :     }
     343              : 
     344              :     // Look up the keys in the provided keyspace and update
     345              :     // the reconstruct state with whatever is found.
     346              :     //
     347              :     // If the key is cached, go no further than the cached Lsn.
     348            0 :     pub(crate) async fn get_values_reconstruct_data(
     349            0 :         &self,
     350            0 :         keyspace: KeySpace,
     351            0 :         end_lsn: Lsn,
     352            0 :         reconstruct_state: &mut ValuesReconstructState,
     353            0 :         ctx: &RequestContext,
     354            0 :     ) -> Result<(), GetVectoredError> {
     355            0 :         let ctx = RequestContextBuilder::extend(ctx)
     356            0 :             .page_content_kind(PageContentKind::InMemoryLayer)
     357            0 :             .build();
     358              : 
     359            0 :         let inner = self.inner.read().await;
     360            0 :         let reader = inner.file.block_cursor();
     361            0 : 
     362            0 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     363            0 :         struct BlockRead {
     364            0 :             key: Key,
     365            0 :             lsn: Lsn,
     366            0 :             block_offset: u64,
     367            0 :         }
     368            0 : 
     369            0 :         let mut planned_block_reads = BinaryHeap::new();
     370              : 
     371            0 :         for range in keyspace.ranges.iter() {
     372            0 :             let mut key = range.start;
     373            0 :             while key < range.end {
     374            0 :                 if let Some(vec_map) = inner.index.get(&key) {
     375            0 :                     let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     376            0 :                         Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     377            0 :                         None => self.start_lsn..end_lsn,
     378              :                     };
     379              : 
     380            0 :                     let slice = vec_map.slice_range(lsn_range);
     381            0 :                     for (entry_lsn, pos) in slice.iter().rev() {
     382            0 :                         planned_block_reads.push(BlockRead {
     383            0 :                             key,
     384            0 :                             lsn: *entry_lsn,
     385            0 :                             block_offset: *pos,
     386            0 :                         });
     387            0 :                     }
     388            0 :                 }
     389              : 
     390            0 :                 key = key.next();
     391              :             }
     392              :         }
     393              : 
     394            0 :         let keyspace_size = keyspace.total_size();
     395            0 : 
     396            0 :         let mut completed_keys = HashSet::new();
     397            0 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     398            0 :             let block_read = planned_block_reads.pop().unwrap();
     399            0 :             if completed_keys.contains(&block_read.key) {
     400            0 :                 continue;
     401            0 :             }
     402              : 
     403            0 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     404            0 :             if let Err(e) = buf {
     405            0 :                 reconstruct_state
     406            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     407            0 :                 completed_keys.insert(block_read.key);
     408            0 :                 continue;
     409            0 :             }
     410            0 : 
     411            0 :             let value = Value::des(&buf.unwrap());
     412            0 :             if let Err(e) = value {
     413            0 :                 reconstruct_state
     414            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     415            0 :                 completed_keys.insert(block_read.key);
     416            0 :                 continue;
     417            0 :             }
     418            0 : 
     419            0 :             let key_situation =
     420            0 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     421            0 :             if key_situation == ValueReconstructSituation::Complete {
     422            0 :                 completed_keys.insert(block_read.key);
     423            0 :             }
     424              :         }
     425              : 
     426            0 :         Ok(())
     427            0 :     }
     428              : }
     429              : 
     430              : impl std::fmt::Display for InMemoryLayer {
     431          688 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     432          688 :         let end_lsn = self.end_lsn_or_max();
     433          688 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     434          688 :     }
     435              : }
     436              : 
     437              : impl InMemoryLayer {
     438              :     /// Get layer size.
     439      3648012 :     pub async fn size(&self) -> Result<u64> {
     440      3648012 :         let inner = self.inner.read().await;
     441      3648012 :         Ok(inner.file.len())
     442      3648012 :     }
     443              : 
     444              :     /// Create a new, empty, in-memory layer
     445          804 :     pub async fn create(
     446          804 :         conf: &'static PageServerConf,
     447          804 :         timeline_id: TimelineId,
     448          804 :         tenant_shard_id: TenantShardId,
     449          804 :         start_lsn: Lsn,
     450          804 :     ) -> Result<InMemoryLayer> {
     451          804 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     452              : 
     453          804 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     454          804 :         let key = InMemoryLayerFileId(file.id());
     455          804 : 
     456          804 :         Ok(InMemoryLayer {
     457          804 :             file_id: key,
     458          804 :             conf,
     459          804 :             timeline_id,
     460          804 :             tenant_shard_id,
     461          804 :             start_lsn,
     462          804 :             end_lsn: OnceLock::new(),
     463          804 :             inner: RwLock::new(InMemoryLayerInner {
     464          804 :                 index: HashMap::new(),
     465          804 :                 file,
     466          804 :                 resource_units: GlobalResourceUnits::new(),
     467          804 :             }),
     468          804 :         })
     469          804 :     }
     470              : 
     471              :     // Write operations
     472              : 
     473              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     474              :     /// Adds the page version to the in-memory tree
     475              : 
     476      3933926 :     pub(crate) async fn put_value(
     477      3933926 :         &self,
     478      3933926 :         key: Key,
     479      3933926 :         lsn: Lsn,
     480      3933926 :         buf: &[u8],
     481      3933926 :         ctx: &RequestContext,
     482      3933926 :     ) -> Result<()> {
     483      3933926 :         let mut inner = self.inner.write().await;
     484      3933926 :         self.assert_writable();
     485      3933926 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     486      3933926 :     }
     487              : 
     488      3933926 :     async fn put_value_locked(
     489      3933926 :         &self,
     490      3933926 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     491      3933926 :         key: Key,
     492      3933926 :         lsn: Lsn,
     493      3933926 :         buf: &[u8],
     494      3933926 :         ctx: &RequestContext,
     495      3933926 :     ) -> Result<()> {
     496      3933926 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     497              : 
     498      3933926 :         let off = {
     499      3933926 :             locked_inner
     500      3933926 :                 .file
     501      3933926 :                 .write_blob(
     502      3933926 :                     buf,
     503      3933926 :                     &RequestContextBuilder::extend(ctx)
     504      3933926 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     505      3933926 :                         .build(),
     506      3933926 :                 )
     507        22827 :                 .await?
     508              :         };
     509              : 
     510      3933926 :         let vec_map = locked_inner.index.entry(key).or_default();
     511      3933926 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     512      3933926 :         if old.is_some() {
     513              :             // We already had an entry for this LSN. That's odd..
     514            0 :             warn!("Key {} at {} already exists", key, lsn);
     515      3933926 :         }
     516              : 
     517      3933926 :         let size = locked_inner.file.len();
     518      3933926 :         locked_inner.resource_units.maybe_publish_size(size);
     519      3933926 : 
     520      3933926 :         Ok(())
     521      3933926 :     }
     522              : 
     523            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     524            0 :         let mut inner = self.inner.write().await;
     525            0 :         let size = inner.file.len();
     526            0 :         inner.resource_units.publish_size(size)
     527            0 :     }
     528              : 
     529            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     530            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     531            2 :         Ok(())
     532            2 :     }
     533              : 
     534              :     /// Records the end_lsn for non-dropped layers.
     535              :     /// `end_lsn` is exclusive
     536          684 :     pub async fn freeze(&self, end_lsn: Lsn) {
     537          684 :         let inner = self.inner.write().await;
     538              : 
     539          684 :         assert!(
     540          684 :             self.start_lsn < end_lsn,
     541            0 :             "{} >= {}",
     542              :             self.start_lsn,
     543              :             end_lsn
     544              :         );
     545          684 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     546              : 
     547      3155246 :         for vec_map in inner.index.values() {
     548      3230186 :             for (lsn, _pos) in vec_map.as_slice() {
     549      3230186 :                 assert!(*lsn < end_lsn);
     550              :             }
     551              :         }
     552          684 :     }
     553              : 
     554              :     /// Write this frozen in-memory layer to disk.
     555              :     ///
     556              :     /// Returns a new delta layer with all the same data as this in-memory layer
     557          594 :     pub(crate) async fn write_to_disk(
     558          594 :         &self,
     559          594 :         timeline: &Arc<Timeline>,
     560          594 :         ctx: &RequestContext,
     561          594 :     ) -> Result<ResidentLayer> {
     562              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     563              :         // layer is not writeable anymore, no one should be trying to acquire the
     564              :         // write lock on it, so we shouldn't block anyone. There's one exception
     565              :         // though: another thread might have grabbed a reference to this layer
     566              :         // in `get_layer_for_write' just before the checkpointer called
     567              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     568              :         // lock, it will see that it's not writeable anymore and retry, but it
     569              :         // would have to wait until we release it. That race condition is very
     570              :         // rare though, so we just accept the potential latency hit for now.
     571          594 :         let inner = self.inner.read().await;
     572              : 
     573          594 :         let end_lsn = *self.end_lsn.get().unwrap();
     574              : 
     575          594 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     576          594 :             self.conf,
     577          594 :             self.timeline_id,
     578          594 :             self.tenant_shard_id,
     579          594 :             Key::MIN,
     580          594 :             self.start_lsn..end_lsn,
     581          594 :         )
     582          306 :         .await?;
     583              : 
     584          594 :         let mut buf = Vec::new();
     585          594 : 
     586          594 :         let cursor = inner.file.block_cursor();
     587          594 : 
     588          594 :         // Sort the keys because delta layer writer expects them sorted.
     589          594 :         //
     590          594 :         // NOTE: this sort can take up significant time if the layer has millions of
     591          594 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     592          594 :         //       keep the value as a reference.
     593      3154526 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     594     91221864 :         keys.sort_unstable_by_key(|k| k.0);
     595          594 : 
     596          594 :         let ctx = RequestContextBuilder::extend(ctx)
     597          594 :             .page_content_kind(PageContentKind::InMemoryLayer)
     598          594 :             .build();
     599      3154526 :         for (key, vec_map) in keys.iter() {
     600      3154526 :             let key = Key::from_i128(*key);
     601              :             // Write all page versions
     602      3229466 :             for (lsn, pos) in vec_map.as_slice() {
     603      3229466 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     604      3229466 :                 let will_init = Value::des(&buf)?.will_init();
     605              :                 let res;
     606      3229466 :                 (buf, res) = delta_layer_writer
     607      3229466 :                     .put_value_bytes(key, *lsn, buf, will_init)
     608        17886 :                     .await;
     609      3229466 :                 res?;
     610              :             }
     611              :         }
     612              : 
     613              :         // MAX is used here because we identify L0 layers by full key range
     614         4585 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     615          594 :         Ok(delta_layer)
     616          594 :     }
     617              : }
        

Generated by: LCOV version 2.1-beta