LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: e402c46de0a007db6b48dddbde450ddbb92e6ceb.info Lines: 79.6 % 398 317
Test Date: 2024-06-25 10:31:23 Functions: 77.8 % 45 35

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::ValueReconstructResult;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::{PageReconstructError, Timeline};
      15              : use crate::{page_cache, walrecord};
      16              : use anyhow::{anyhow, ensure, Result};
      17              : use pageserver_api::keyspace::KeySpace;
      18              : use pageserver_api::models::InMemoryLayerInfo;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::collections::{BTreeMap, BinaryHeap, HashSet};
      21              : use std::sync::{Arc, OnceLock};
      22              : use std::time::Instant;
      23              : use tracing::*;
      24              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      25              : // avoid binding to Write (conflicts with std::io::Write)
      26              : // while being able to use std::fmt::Write's methods
      27              : use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
      28              : use std::cmp::Ordering;
      29              : use std::fmt::Write;
      30              : use std::ops::Range;
      31              : use std::sync::atomic::Ordering as AtomicOrdering;
      32              : use std::sync::atomic::{AtomicU64, AtomicUsize};
      33              : use tokio::sync::{RwLock, RwLockWriteGuard};
      34              : 
      35              : use super::{
      36              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      37              :     ValuesReconstructState,
      38              : };
      39              : 
      40              : #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
      41              : pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
      42              : 
      43              : pub struct InMemoryLayer {
      44              :     conf: &'static PageServerConf,
      45              :     tenant_shard_id: TenantShardId,
      46              :     timeline_id: TimelineId,
      47              :     file_id: InMemoryLayerFileId,
      48              : 
      49              :     /// This layer contains all the changes from 'start_lsn'. The
      50              :     /// start is inclusive.
      51              :     start_lsn: Lsn,
      52              : 
      53              :     /// Frozen layers have an exclusive end LSN.
      54              :     /// Writes are only allowed when this is `None`.
      55              :     pub(crate) end_lsn: OnceLock<Lsn>,
      56              : 
      57              :     /// Used for traversal path. Cached representation of the in-memory layer before frozen.
      58              :     local_path_str: Arc<str>,
      59              : 
      60              :     /// Used for traversal path. Cached representation of the in-memory layer after frozen.
      61              :     frozen_local_path_str: OnceLock<Arc<str>>,
      62              : 
      63              :     opened_at: Instant,
      64              : 
      65              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      66              :     /// All other changing parts are in `inner`, and protected by a mutex.
      67              :     inner: RwLock<InMemoryLayerInner>,
      68              : }
      69              : 
      70              : impl std::fmt::Debug for InMemoryLayer {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         f.debug_struct("InMemoryLayer")
      73            0 :             .field("start_lsn", &self.start_lsn)
      74            0 :             .field("end_lsn", &self.end_lsn)
      75            0 :             .field("inner", &self.inner)
      76            0 :             .finish()
      77            0 :     }
      78              : }
      79              : 
      80              : pub struct InMemoryLayerInner {
      81              :     /// All versions of all pages in the layer are kept here. Indexed
      82              :     /// by block number and LSN. The value is an offset into the
      83              :     /// ephemeral file where the page version is stored.
      84              :     index: BTreeMap<Key, VecMap<Lsn, u64>>,
      85              : 
      86              :     /// The values are stored in a serialized format in this file.
      87              :     /// Each serialized Value is preceded by a 'u32' length field.
      88              :     /// PerSeg::page_versions map stores offsets into this file.
      89              :     file: EphemeralFile,
      90              : 
      91              :     resource_units: GlobalResourceUnits,
      92              : }
      93              : 
      94              : impl std::fmt::Debug for InMemoryLayerInner {
      95            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      96            0 :         f.debug_struct("InMemoryLayerInner").finish()
      97            0 :     }
      98              : }
      99              : 
     100              : /// State shared by all in-memory (ephemeral) layers.  Updated infrequently during background ticks in Timeline,
     101              : /// to minimize contention.
     102              : ///
     103              : /// This global state is used to implement behaviors that require a global view of the system, e.g.
     104              : /// rolling layers proactively to limit the total amount of dirty data.
     105              : pub(crate) struct GlobalResources {
     106              :     // Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
     107              :     // Zero means unlimited.
     108              :     pub(crate) max_dirty_bytes: AtomicU64,
     109              :     // How many bytes are in all EphemeralFile objects
     110              :     dirty_bytes: AtomicU64,
     111              :     // How many layers are contributing to dirty_bytes
     112              :     dirty_layers: AtomicUsize,
     113              : }
     114              : 
     115              : // Per-timeline RAII struct for its contribution to [`GlobalResources`]
     116              : struct GlobalResourceUnits {
     117              :     // How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
     118              :     // for decrementing the global counter by this many bytes when dropped.
     119              :     dirty_bytes: u64,
     120              : }
     121              : 
     122              : impl GlobalResourceUnits {
     123              :     // Hint for the layer append path to update us when the layer size differs from the last
     124              :     // call to update_size by this much.  If we don't reach this threshold, we'll still get
     125              :     // updated when the Timeline "ticks" in the background.
     126              :     const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
     127              : 
     128         1240 :     fn new() -> Self {
     129         1240 :         GLOBAL_RESOURCES
     130         1240 :             .dirty_layers
     131         1240 :             .fetch_add(1, AtomicOrdering::Relaxed);
     132         1240 :         Self { dirty_bytes: 0 }
     133         1240 :     }
     134              : 
     135              :     /// Do not call this frequently: all timelines will write to these same global atomics,
     136              :     /// so this is a relatively expensive operation.  Wait at least a few seconds between calls.
     137              :     ///
     138              :     /// Returns the effective layer size limit that should be applied, if any, to keep
     139              :     /// the total number of dirty bytes below the configured maximum.
     140         1122 :     fn publish_size(&mut self, size: u64) -> Option<u64> {
     141         1122 :         let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
     142         1110 :             Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
     143              :             Ordering::Greater => {
     144           10 :                 let delta = size - self.dirty_bytes;
     145           10 :                 let old = GLOBAL_RESOURCES
     146           10 :                     .dirty_bytes
     147           10 :                     .fetch_add(delta, AtomicOrdering::Relaxed);
     148           10 :                 old + delta
     149              :             }
     150              :             Ordering::Less => {
     151            2 :                 let delta = self.dirty_bytes - size;
     152            2 :                 let old = GLOBAL_RESOURCES
     153            2 :                     .dirty_bytes
     154            2 :                     .fetch_sub(delta, AtomicOrdering::Relaxed);
     155            2 :                 old - delta
     156              :             }
     157              :         };
     158              : 
     159              :         // This is a sloppy update: concurrent updates to the counter will race, and the exact
     160              :         // value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
     161              :         // That's okay: as long as the metric contains some recent value, it doesn't have to always
     162              :         // be literally the last update.
     163         1122 :         TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
     164         1122 : 
     165         1122 :         self.dirty_bytes = size;
     166         1122 : 
     167         1122 :         let max_dirty_bytes = GLOBAL_RESOURCES
     168         1122 :             .max_dirty_bytes
     169         1122 :             .load(AtomicOrdering::Relaxed);
     170         1122 :         if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
     171              :             // Set the layer file limit to the average layer size: this implies that all above-average
     172              :             // sized layers will be elegible for freezing.  They will be frozen in the order they
     173              :             // next enter publish_size.
     174            0 :             Some(
     175            0 :                 new_global_dirty_bytes
     176            0 :                     / GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
     177            0 :             )
     178              :         } else {
     179         1122 :             None
     180              :         }
     181         1122 :     }
     182              : 
     183              :     // Call publish_size if the input size differs from last published size by more than
     184              :     // the drift limit
     185      5090418 :     fn maybe_publish_size(&mut self, size: u64) {
     186      5090418 :         let publish = match size.cmp(&self.dirty_bytes) {
     187            0 :             Ordering::Equal => false,
     188      5090418 :             Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
     189            0 :             Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
     190              :         };
     191              : 
     192      5090418 :         if publish {
     193           10 :             self.publish_size(size);
     194      5090408 :         }
     195      5090418 :     }
     196              : }
     197              : 
     198              : impl Drop for GlobalResourceUnits {
     199         1112 :     fn drop(&mut self) {
     200         1112 :         GLOBAL_RESOURCES
     201         1112 :             .dirty_layers
     202         1112 :             .fetch_sub(1, AtomicOrdering::Relaxed);
     203         1112 : 
     204         1112 :         // Subtract our contribution to the global total dirty bytes
     205         1112 :         self.publish_size(0);
     206         1112 :     }
     207              : }
     208              : 
     209              : pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
     210              :     max_dirty_bytes: AtomicU64::new(0),
     211              :     dirty_bytes: AtomicU64::new(0),
     212              :     dirty_layers: AtomicUsize::new(0),
     213              : };
     214              : 
     215              : impl InMemoryLayer {
     216           14 :     pub(crate) fn file_id(&self) -> InMemoryLayerFileId {
     217           14 :         self.file_id
     218           14 :     }
     219              : 
     220         1110 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
     221         1110 :         self.timeline_id
     222         1110 :     }
     223              : 
     224            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
     225            0 :         let lsn_start = self.start_lsn;
     226              : 
     227            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
     228            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
     229              :         } else {
     230            0 :             InMemoryLayerInfo::Open { lsn_start }
     231              :         }
     232            0 :     }
     233              : 
     234            0 :     pub(crate) fn try_len(&self) -> Option<u64> {
     235            0 :         self.inner.try_read().map(|i| i.file.len()).ok()
     236            0 :     }
     237              : 
     238      5090418 :     pub(crate) fn assert_writable(&self) {
     239      5090418 :         assert!(self.end_lsn.get().is_none());
     240      5090418 :     }
     241              : 
     242       721385 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     243       721385 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     244       721385 :     }
     245              : 
     246       720275 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     247       720275 :         self.start_lsn..self.end_lsn_or_max()
     248       720275 :     }
     249              : 
     250       606120 :     pub(crate) fn local_path_str(&self) -> &Arc<str> {
     251       606120 :         self.frozen_local_path_str
     252       606120 :             .get()
     253       606120 :             .unwrap_or(&self.local_path_str)
     254       606120 :     }
     255              : 
     256              :     /// debugging function to print out the contents of the layer
     257              :     ///
     258              :     /// this is likely completly unused
     259            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     260            0 :         let inner = self.inner.read().await;
     261              : 
     262            0 :         let end_str = self.end_lsn_or_max();
     263            0 : 
     264            0 :         println!(
     265            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     266            0 :             self.timeline_id, self.start_lsn, end_str,
     267            0 :         );
     268            0 : 
     269            0 :         if !verbose {
     270            0 :             return Ok(());
     271            0 :         }
     272            0 : 
     273            0 :         let cursor = inner.file.block_cursor();
     274            0 :         let mut buf = Vec::new();
     275            0 :         for (key, vec_map) in inner.index.iter() {
     276            0 :             for (lsn, pos) in vec_map.as_slice() {
     277            0 :                 let mut desc = String::new();
     278            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     279            0 :                 let val = Value::des(&buf);
     280            0 :                 match val {
     281            0 :                     Ok(Value::Image(img)) => {
     282            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     283              :                     }
     284            0 :                     Ok(Value::WalRecord(rec)) => {
     285            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     286            0 :                         write!(
     287            0 :                             &mut desc,
     288            0 :                             " rec {} bytes will_init: {} {}",
     289            0 :                             buf.len(),
     290            0 :                             rec.will_init(),
     291            0 :                             wal_desc
     292            0 :                         )?;
     293              :                     }
     294            0 :                     Err(err) => {
     295            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     296              :                     }
     297              :                 }
     298            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     299              :             }
     300              :         }
     301              : 
     302            0 :         Ok(())
     303            0 :     }
     304              : 
     305              :     /// Look up given value in the layer.
     306       606120 :     pub(crate) async fn get_value_reconstruct_data(
     307       606120 :         &self,
     308       606120 :         key: Key,
     309       606120 :         lsn_range: Range<Lsn>,
     310       606120 :         reconstruct_state: &mut ValueReconstructState,
     311       606120 :         ctx: &RequestContext,
     312       606120 :     ) -> anyhow::Result<ValueReconstructResult> {
     313       606120 :         ensure!(lsn_range.start >= self.start_lsn);
     314       606120 :         let mut need_image = true;
     315       606120 : 
     316       606120 :         let ctx = RequestContextBuilder::extend(ctx)
     317       606120 :             .page_content_kind(PageContentKind::InMemoryLayer)
     318       606120 :             .build();
     319              : 
     320       606120 :         let inner = self.inner.read().await;
     321              : 
     322       606120 :         let reader = inner.file.block_cursor();
     323              : 
     324              :         // Scan the page versions backwards, starting from `lsn`.
     325       606120 :         if let Some(vec_map) = inner.index.get(&key) {
     326       496700 :             let slice = vec_map.slice_range(lsn_range);
     327       496710 :             for (entry_lsn, pos) in slice.iter().rev() {
     328       496710 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     329       496710 :                 let value = Value::des(&buf)?;
     330       496710 :                 match value {
     331       496690 :                     Value::Image(img) => {
     332       496690 :                         reconstruct_state.img = Some((*entry_lsn, img));
     333       496690 :                         return Ok(ValueReconstructResult::Complete);
     334              :                     }
     335           20 :                     Value::WalRecord(rec) => {
     336           20 :                         let will_init = rec.will_init();
     337           20 :                         reconstruct_state.records.push((*entry_lsn, rec));
     338           20 :                         if will_init {
     339              :                             // This WAL record initializes the page, so no need to go further back
     340            0 :                             need_image = false;
     341            0 :                             break;
     342           20 :                         }
     343              :                     }
     344              :                 }
     345              :             }
     346       109420 :         }
     347              : 
     348              :         // release lock on 'inner'
     349              : 
     350              :         // If an older page image is needed to reconstruct the page, let the
     351              :         // caller know.
     352       109430 :         if need_image {
     353       109430 :             Ok(ValueReconstructResult::Continue)
     354              :         } else {
     355            0 :             Ok(ValueReconstructResult::Complete)
     356              :         }
     357       606120 :     }
     358              : 
     359              :     // Look up the keys in the provided keyspace and update
     360              :     // the reconstruct state with whatever is found.
     361              :     //
     362              :     // If the key is cached, go no further than the cached Lsn.
     363           14 :     pub(crate) async fn get_values_reconstruct_data(
     364           14 :         &self,
     365           14 :         keyspace: KeySpace,
     366           14 :         end_lsn: Lsn,
     367           14 :         reconstruct_state: &mut ValuesReconstructState,
     368           14 :         ctx: &RequestContext,
     369           14 :     ) -> Result<(), GetVectoredError> {
     370           14 :         let ctx = RequestContextBuilder::extend(ctx)
     371           14 :             .page_content_kind(PageContentKind::InMemoryLayer)
     372           14 :             .build();
     373              : 
     374           14 :         let inner = self.inner.read().await;
     375           14 :         let reader = inner.file.block_cursor();
     376           14 : 
     377           14 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     378           14 :         struct BlockRead {
     379           14 :             key: Key,
     380           14 :             lsn: Lsn,
     381           14 :             block_offset: u64,
     382           14 :         }
     383           14 : 
     384           14 :         let mut planned_block_reads = BinaryHeap::new();
     385              : 
     386           14 :         for range in keyspace.ranges.iter() {
     387         2018 :             for (key, vec_map) in inner.index.range(range.start..range.end) {
     388         2018 :                 let lsn_range = match reconstruct_state.get_cached_lsn(key) {
     389            0 :                     Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     390         2018 :                     None => self.start_lsn..end_lsn,
     391              :                 };
     392              : 
     393         2018 :                 let slice = vec_map.slice_range(lsn_range);
     394         2022 :                 for (entry_lsn, pos) in slice.iter().rev() {
     395         2022 :                     planned_block_reads.push(BlockRead {
     396         2022 :                         key: *key,
     397         2022 :                         lsn: *entry_lsn,
     398         2022 :                         block_offset: *pos,
     399         2022 :                     });
     400         2022 :                 }
     401              :             }
     402              :         }
     403              : 
     404           14 :         let keyspace_size = keyspace.total_raw_size();
     405           14 : 
     406           14 :         let mut completed_keys = HashSet::new();
     407         2036 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     408         2022 :             let block_read = planned_block_reads.pop().unwrap();
     409         2022 :             if completed_keys.contains(&block_read.key) {
     410            4 :                 continue;
     411         2018 :             }
     412              : 
     413         2018 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     414         2018 :             if let Err(e) = buf {
     415            0 :                 reconstruct_state
     416            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     417            0 :                 completed_keys.insert(block_read.key);
     418            0 :                 continue;
     419         2018 :             }
     420         2018 : 
     421         2018 :             let value = Value::des(&buf.unwrap());
     422         2018 :             if let Err(e) = value {
     423            0 :                 reconstruct_state
     424            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     425            0 :                 completed_keys.insert(block_read.key);
     426            0 :                 continue;
     427         2018 :             }
     428         2018 : 
     429         2018 :             let key_situation =
     430         2018 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     431         2018 :             if key_situation == ValueReconstructSituation::Complete {
     432         2018 :                 completed_keys.insert(block_read.key);
     433         2018 :             }
     434              :         }
     435              : 
     436           14 :         reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
     437           14 : 
     438           14 :         Ok(())
     439           14 :     }
     440              : }
     441              : 
     442         3460 : fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
     443         3460 :     write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
     444         3460 : }
     445              : 
     446         2350 : fn inmem_layer_log_display(
     447         2350 :     mut f: impl Write,
     448         2350 :     timeline: TimelineId,
     449         2350 :     start_lsn: Lsn,
     450         2350 :     end_lsn: Lsn,
     451         2350 : ) -> std::fmt::Result {
     452         2350 :     write!(f, "timeline {} in-memory ", timeline)?;
     453         2350 :     inmem_layer_display(f, start_lsn, end_lsn)
     454         2350 : }
     455              : 
     456              : impl std::fmt::Display for InMemoryLayer {
     457         1110 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     458         1110 :         let end_lsn = self.end_lsn_or_max();
     459         1110 :         inmem_layer_display(f, self.start_lsn, end_lsn)
     460         1110 :     }
     461              : }
     462              : 
     463              : impl InMemoryLayer {
     464              :     /// Get layer size.
     465         1240 :     pub async fn size(&self) -> Result<u64> {
     466         1240 :         let inner = self.inner.read().await;
     467         1240 :         Ok(inner.file.len())
     468         1240 :     }
     469              : 
     470              :     /// Create a new, empty, in-memory layer
     471         1240 :     pub async fn create(
     472         1240 :         conf: &'static PageServerConf,
     473         1240 :         timeline_id: TimelineId,
     474         1240 :         tenant_shard_id: TenantShardId,
     475         1240 :         start_lsn: Lsn,
     476         1240 :         ctx: &RequestContext,
     477         1240 :     ) -> Result<InMemoryLayer> {
     478         1240 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     479              : 
     480         1240 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
     481         1240 :         let key = InMemoryLayerFileId(file.page_cache_file_id());
     482         1240 : 
     483         1240 :         Ok(InMemoryLayer {
     484         1240 :             file_id: key,
     485         1240 :             local_path_str: {
     486         1240 :                 let mut buf = String::new();
     487         1240 :                 inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
     488         1240 :                 buf.into()
     489         1240 :             },
     490         1240 :             frozen_local_path_str: OnceLock::new(),
     491         1240 :             conf,
     492         1240 :             timeline_id,
     493         1240 :             tenant_shard_id,
     494         1240 :             start_lsn,
     495         1240 :             end_lsn: OnceLock::new(),
     496         1240 :             opened_at: Instant::now(),
     497         1240 :             inner: RwLock::new(InMemoryLayerInner {
     498         1240 :                 index: BTreeMap::new(),
     499         1240 :                 file,
     500         1240 :                 resource_units: GlobalResourceUnits::new(),
     501         1240 :             }),
     502         1240 :         })
     503         1240 :     }
     504              : 
     505              :     // Write operations
     506              : 
     507              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     508              :     /// Adds the page version to the in-memory tree
     509              : 
     510      5090418 :     pub(crate) async fn put_value(
     511      5090418 :         &self,
     512      5090418 :         key: Key,
     513      5090418 :         lsn: Lsn,
     514      5090418 :         buf: &[u8],
     515      5090418 :         ctx: &RequestContext,
     516      5090418 :     ) -> Result<()> {
     517      5090418 :         let mut inner = self.inner.write().await;
     518      5090418 :         self.assert_writable();
     519      5090418 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     520      5090418 :     }
     521              : 
     522      5090418 :     async fn put_value_locked(
     523      5090418 :         &self,
     524      5090418 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     525      5090418 :         key: Key,
     526      5090418 :         lsn: Lsn,
     527      5090418 :         buf: &[u8],
     528      5090418 :         ctx: &RequestContext,
     529      5090418 :     ) -> Result<()> {
     530      5090418 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     531              : 
     532      5090418 :         let off = {
     533      5090418 :             locked_inner
     534      5090418 :                 .file
     535      5090418 :                 .write_blob(
     536      5090418 :                     buf,
     537      5090418 :                     &RequestContextBuilder::extend(ctx)
     538      5090418 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     539      5090418 :                         .build(),
     540      5090418 :                 )
     541         3587 :                 .await?
     542              :         };
     543              : 
     544      5090418 :         let vec_map = locked_inner.index.entry(key).or_default();
     545      5090418 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     546      5090418 :         if old.is_some() {
     547              :             // We already had an entry for this LSN. That's odd..
     548            0 :             warn!("Key {} at {} already exists", key, lsn);
     549      5090418 :         }
     550              : 
     551      5090418 :         let size = locked_inner.file.len();
     552      5090418 :         locked_inner.resource_units.maybe_publish_size(size);
     553      5090418 : 
     554      5090418 :         Ok(())
     555      5090418 :     }
     556              : 
     557      4803026 :     pub(crate) fn get_opened_at(&self) -> Instant {
     558      4803026 :         self.opened_at
     559      4803026 :     }
     560              : 
     561            0 :     pub(crate) async fn tick(&self) -> Option<u64> {
     562            0 :         let mut inner = self.inner.write().await;
     563            0 :         let size = inner.file.len();
     564            0 :         inner.resource_units.publish_size(size)
     565            0 :     }
     566              : 
     567            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     568            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     569            2 :         Ok(())
     570            2 :     }
     571              : 
     572              :     /// Records the end_lsn for non-dropped layers.
     573              :     /// `end_lsn` is exclusive
     574         1110 :     pub async fn freeze(&self, end_lsn: Lsn) {
     575         1110 :         let inner = self.inner.write().await;
     576              : 
     577         1110 :         assert!(
     578         1110 :             self.start_lsn < end_lsn,
     579            0 :             "{} >= {}",
     580              :             self.start_lsn,
     581              :             end_lsn
     582              :         );
     583         1110 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     584         1110 : 
     585         1110 :         self.frozen_local_path_str
     586         1110 :             .set({
     587         1110 :                 let mut buf = String::new();
     588         1110 :                 inmem_layer_log_display(&mut buf, self.get_timeline_id(), self.start_lsn, end_lsn)
     589         1110 :                     .unwrap();
     590         1110 :                 buf.into()
     591         1110 :             })
     592         1110 :             .expect("frozen_local_path_str set only once");
     593              : 
     594      4255747 :         for vec_map in inner.index.values() {
     595      4386652 :             for (lsn, _pos) in vec_map.as_slice() {
     596      4386652 :                 assert!(*lsn < end_lsn);
     597              :             }
     598              :         }
     599         1110 :     }
     600              : 
     601              :     /// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
     602              :     /// layer will only contain the key range the user specifies, and may return `None`
     603              :     /// if there are no matching keys.
     604              :     ///
     605              :     /// Returns a new delta layer with all the same data as this in-memory layer
     606         1110 :     pub(crate) async fn write_to_disk(
     607         1110 :         &self,
     608         1110 :         timeline: &Arc<Timeline>,
     609         1110 :         ctx: &RequestContext,
     610         1110 :         key_range: Option<Range<Key>>,
     611         1110 :     ) -> Result<Option<ResidentLayer>> {
     612              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     613              :         // layer is not writeable anymore, no one should be trying to acquire the
     614              :         // write lock on it, so we shouldn't block anyone. There's one exception
     615              :         // though: another thread might have grabbed a reference to this layer
     616              :         // in `get_layer_for_write' just before the checkpointer called
     617              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     618              :         // lock, it will see that it's not writeable anymore and retry, but it
     619              :         // would have to wait until we release it. That race condition is very
     620              :         // rare though, so we just accept the potential latency hit for now.
     621         1110 :         let inner = self.inner.read().await;
     622              : 
     623         1110 :         let end_lsn = *self.end_lsn.get().unwrap();
     624              : 
     625         1110 :         let keys: Vec<_> = if let Some(key_range) = key_range {
     626          142 :             inner
     627          142 :                 .index
     628          142 :                 .iter()
     629         1134 :                 .filter(|(k, _)| key_range.contains(k))
     630          142 :                 .map(|(k, m)| (k.to_i128(), m))
     631          142 :                 .collect()
     632              :         } else {
     633      4254613 :             inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect()
     634              :         };
     635              : 
     636         1110 :         if keys.is_empty() {
     637          142 :             return Ok(None);
     638          968 :         }
     639              : 
     640          968 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     641          968 :             self.conf,
     642          968 :             self.timeline_id,
     643          968 :             self.tenant_shard_id,
     644          968 :             Key::MIN,
     645          968 :             self.start_lsn..end_lsn,
     646          968 :             ctx,
     647          968 :         )
     648          503 :         .await?;
     649              : 
     650          968 :         let mut buf = Vec::new();
     651          968 : 
     652          968 :         let cursor = inner.file.block_cursor();
     653          968 : 
     654          968 :         let ctx = RequestContextBuilder::extend(ctx)
     655          968 :             .page_content_kind(PageContentKind::InMemoryLayer)
     656          968 :             .build();
     657      4254613 :         for (key, vec_map) in inner.index.iter() {
     658              :             // Write all page versions
     659      4385518 :             for (lsn, pos) in vec_map.as_slice() {
     660      4385518 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     661      4385518 :                 let will_init = Value::des(&buf)?.will_init();
     662              :                 let res;
     663      4385518 :                 (buf, res) = delta_layer_writer
     664      4385518 :                     .put_value_bytes(*key, *lsn, buf, will_init, &ctx)
     665         2734 :                     .await;
     666      4385518 :                 res?;
     667              :             }
     668              :         }
     669              : 
     670              :         // MAX is used here because we identify L0 layers by full key range
     671         6687 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?;
     672          968 :         Ok(Some(delta_layer))
     673         1110 :     }
     674              : }
        

Generated by: LCOV version 2.1-beta