LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 77.4 % 235 182 53 182
Current Date: 2024-01-09 02:06:09 Functions: 78.8 % 33 26 7 26
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! An in-memory layer stores recently received key-value pairs.
       2                 : //!
       3                 : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4                 : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5                 : //! its position in the file, is kept in memory, though.
       6                 : //!
       7                 : use crate::config::PageServerConf;
       8                 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9                 : use crate::repository::{Key, Value};
      10                 : use crate::tenant::block_io::BlockReader;
      11                 : use crate::tenant::ephemeral_file::EphemeralFile;
      12                 : use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
      13                 : use crate::tenant::Timeline;
      14                 : use crate::walrecord;
      15                 : use anyhow::{ensure, Result};
      16                 : use pageserver_api::models::InMemoryLayerInfo;
      17                 : use pageserver_api::shard::TenantShardId;
      18                 : use std::collections::HashMap;
      19                 : use std::sync::{Arc, OnceLock};
      20                 : use tracing::*;
      21                 : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      22                 : // avoid binding to Write (conflicts with std::io::Write)
      23                 : // while being able to use std::fmt::Write's methods
      24                 : use std::fmt::Write as _;
      25                 : use std::ops::Range;
      26                 : use tokio::sync::{RwLock, RwLockWriteGuard};
      27                 : 
      28                 : use super::{DeltaLayerWriter, ResidentLayer};
      29                 : 
      30                 : pub struct InMemoryLayer {
      31                 :     conf: &'static PageServerConf,
      32                 :     tenant_shard_id: TenantShardId,
      33                 :     timeline_id: TimelineId,
      34                 : 
      35                 :     /// This layer contains all the changes from 'start_lsn'. The
      36                 :     /// start is inclusive.
      37                 :     start_lsn: Lsn,
      38                 : 
      39                 :     /// Frozen layers have an exclusive end LSN.
      40                 :     /// Writes are only allowed when this is `None`.
      41                 :     end_lsn: OnceLock<Lsn>,
      42                 : 
      43                 :     /// The above fields never change, except for `end_lsn`, which is only set once.
      44                 :     /// All other changing parts are in `inner`, and protected by a mutex.
      45                 :     inner: RwLock<InMemoryLayerInner>,
      46                 : }
      47                 : 
      48                 : impl std::fmt::Debug for InMemoryLayer {
      49 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      50               0 :         f.debug_struct("InMemoryLayer")
      51               0 :             .field("start_lsn", &self.start_lsn)
      52               0 :             .field("end_lsn", &self.end_lsn)
      53               0 :             .field("inner", &self.inner)
      54               0 :             .finish()
      55               0 :     }
      56                 : }
      57                 : 
      58                 : pub struct InMemoryLayerInner {
      59                 :     /// All versions of all pages in the layer are kept here.  Indexed
      60                 :     /// by block number and LSN. The value is an offset into the
      61                 :     /// ephemeral file where the page version is stored.
      62                 :     index: HashMap<Key, VecMap<Lsn, u64>>,
      63                 : 
      64                 :     /// The values are stored in a serialized format in this file.
      65                 :     /// Each serialized Value is preceded by a 'u32' length field.
      66                 :     /// PerSeg::page_versions map stores offsets into this file.
      67                 :     file: EphemeralFile,
      68                 : }
      69                 : 
      70                 : impl std::fmt::Debug for InMemoryLayerInner {
      71               0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72               0 :         f.debug_struct("InMemoryLayerInner").finish()
      73               0 :     }
      74                 : }
      75                 : 
      76                 : impl InMemoryLayer {
      77 CBC         322 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
      78             322 :         self.timeline_id
      79             322 :     }
      80                 : 
      81               8 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
      82               8 :         let lsn_start = self.start_lsn;
      83                 : 
      84               8 :         if let Some(&lsn_end) = self.end_lsn.get() {
      85 UBC           0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
      86                 :         } else {
      87 CBC           8 :             InMemoryLayerInfo::Open { lsn_start }
      88                 :         }
      89               8 :     }
      90                 : 
      91         1775420 :     pub(crate) fn assert_writable(&self) {
      92         1775420 :         assert!(self.end_lsn.get().is_none());
      93         1775420 :     }
      94                 : 
      95        18675050 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
      96        18675050 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
      97        18675050 :     }
      98                 : 
      99        18670285 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     100        18670285 :         self.start_lsn..self.end_lsn_or_max()
     101        18670285 :     }
     102                 : 
     103                 :     /// debugging function to print out the contents of the layer
     104                 :     ///
     105                 :     /// this is likely completly unused
     106 UBC           0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     107               0 :         let inner = self.inner.read().await;
     108                 : 
     109               0 :         let end_str = self.end_lsn_or_max();
     110               0 : 
     111               0 :         println!(
     112               0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     113               0 :             self.timeline_id, self.start_lsn, end_str,
     114               0 :         );
     115               0 : 
     116               0 :         if !verbose {
     117               0 :             return Ok(());
     118               0 :         }
     119               0 : 
     120               0 :         let cursor = inner.file.block_cursor();
     121               0 :         let mut buf = Vec::new();
     122               0 :         for (key, vec_map) in inner.index.iter() {
     123               0 :             for (lsn, pos) in vec_map.as_slice() {
     124               0 :                 let mut desc = String::new();
     125               0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     126               0 :                 let val = Value::des(&buf);
     127               0 :                 match val {
     128               0 :                     Ok(Value::Image(img)) => {
     129               0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     130                 :                     }
     131               0 :                     Ok(Value::WalRecord(rec)) => {
     132               0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     133               0 :                         write!(
     134               0 :                             &mut desc,
     135               0 :                             " rec {} bytes will_init: {} {}",
     136               0 :                             buf.len(),
     137               0 :                             rec.will_init(),
     138               0 :                             wal_desc
     139               0 :                         )?;
     140                 :                     }
     141               0 :                     Err(err) => {
     142               0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     143                 :                     }
     144                 :                 }
     145               0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     146                 :             }
     147                 :         }
     148                 : 
     149               0 :         Ok(())
     150               0 :     }
     151                 : 
     152                 :     /// Look up given value in the layer.
     153 CBC     4668457 :     pub(crate) async fn get_value_reconstruct_data(
     154         4668457 :         &self,
     155         4668457 :         key: Key,
     156         4668457 :         lsn_range: Range<Lsn>,
     157         4668457 :         reconstruct_state: &mut ValueReconstructState,
     158         4668457 :         ctx: &RequestContext,
     159         4668458 :     ) -> anyhow::Result<ValueReconstructResult> {
     160         4668458 :         ensure!(lsn_range.start >= self.start_lsn);
     161         4668458 :         let mut need_image = true;
     162         4668458 : 
     163         4668458 :         let ctx = RequestContextBuilder::extend(ctx)
     164         4668458 :             .page_content_kind(PageContentKind::InMemoryLayer)
     165         4668458 :             .build();
     166                 : 
     167         4668458 :         let inner = self.inner.read().await;
     168                 : 
     169         4668458 :         let reader = inner.file.block_cursor();
     170                 : 
     171                 :         // Scan the page versions backwards, starting from `lsn`.
     172         4668458 :         if let Some(vec_map) = inner.index.get(&key) {
     173         2955261 :             let slice = vec_map.slice_range(lsn_range);
     174        29093568 :             for (entry_lsn, pos) in slice.iter().rev() {
     175        29093568 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     176        29093568 :                 let value = Value::des(&buf)?;
     177        29093568 :                 match value {
     178         1052719 :                     Value::Image(img) => {
     179         1052719 :                         reconstruct_state.img = Some((*entry_lsn, img));
     180         1052719 :                         return Ok(ValueReconstructResult::Complete);
     181                 :                     }
     182        28040849 :                     Value::WalRecord(rec) => {
     183        28040849 :                         let will_init = rec.will_init();
     184        28040849 :                         reconstruct_state.records.push((*entry_lsn, rec));
     185        28040849 :                         if will_init {
     186                 :                             // This WAL record initializes the page, so no need to go further back
     187          429776 :                             need_image = false;
     188          429776 :                             break;
     189        27611073 :                         }
     190                 :                     }
     191                 :                 }
     192                 :             }
     193         1713197 :         }
     194                 : 
     195                 :         // release lock on 'inner'
     196                 : 
     197                 :         // If an older page image is needed to reconstruct the page, let the
     198                 :         // caller know.
     199         3615739 :         if need_image {
     200         3185963 :             Ok(ValueReconstructResult::Continue)
     201                 :         } else {
     202          429776 :             Ok(ValueReconstructResult::Complete)
     203                 :         }
     204         4668458 :     }
     205                 : }
     206                 : 
     207                 : impl std::fmt::Display for InMemoryLayer {
     208            4765 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     209            4765 :         let end_lsn = self.end_lsn_or_max();
     210            4765 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     211            4765 :     }
     212                 : }
     213                 : 
     214                 : impl InMemoryLayer {
     215                 :     /// Get layer size.
     216          565114 :     pub async fn size(&self) -> Result<u64> {
     217          565114 :         let inner = self.inner.read().await;
     218          565114 :         Ok(inner.file.len())
     219          565114 :     }
     220                 : 
     221                 :     /// Create a new, empty, in-memory layer
     222            4823 :     pub async fn create(
     223            4823 :         conf: &'static PageServerConf,
     224            4823 :         timeline_id: TimelineId,
     225            4823 :         tenant_shard_id: TenantShardId,
     226            4823 :         start_lsn: Lsn,
     227            4823 :     ) -> Result<InMemoryLayer> {
     228 UBC           0 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     229                 : 
     230 CBC        4823 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     231                 : 
     232            4823 :         Ok(InMemoryLayer {
     233            4823 :             conf,
     234            4823 :             timeline_id,
     235            4823 :             tenant_shard_id,
     236            4823 :             start_lsn,
     237            4823 :             end_lsn: OnceLock::new(),
     238            4823 :             inner: RwLock::new(InMemoryLayerInner {
     239            4823 :                 index: HashMap::new(),
     240            4823 :                 file,
     241            4823 :             }),
     242            4823 :         })
     243            4823 :     }
     244                 : 
     245                 :     // Write operations
     246                 : 
     247                 :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     248                 :     /// Adds the page version to the in-memory tree
     249          607051 :     pub(crate) async fn put_value(
     250          607051 :         &self,
     251          607051 :         key: Key,
     252          607051 :         lsn: Lsn,
     253          607051 :         val: &Value,
     254          607051 :         ctx: &RequestContext,
     255          607051 :     ) -> Result<()> {
     256          607051 :         let mut inner = self.inner.write().await;
     257          607051 :         self.assert_writable();
     258          607051 :         self.put_value_locked(&mut inner, key, lsn, val, ctx).await
     259          607051 :     }
     260                 : 
     261         1168369 :     pub(crate) async fn put_values(
     262         1168369 :         &self,
     263         1168369 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
     264         1168369 :         ctx: &RequestContext,
     265         1168369 :     ) -> Result<()> {
     266         1168369 :         let mut inner = self.inner.write().await;
     267         1168369 :         self.assert_writable();
     268         9368701 :         for (key, vals) in values {
     269        60722789 :             for (lsn, val) in vals {
     270        52522457 :                 self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
     271           17557 :                     .await?;
     272                 :             }
     273                 :         }
     274         1168369 :         Ok(())
     275         1168369 :     }
     276                 : 
     277        53129508 :     async fn put_value_locked(
     278        53129508 :         &self,
     279        53129508 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     280        53129508 :         key: Key,
     281        53129508 :         lsn: Lsn,
     282        53129508 :         val: &Value,
     283        53129508 :         ctx: &RequestContext,
     284        53129508 :     ) -> Result<()> {
     285 UBC           0 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     286                 : 
     287 CBC    53129508 :         let off = {
     288                 :             // Avoid doing allocations for "small" values.
     289                 :             // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
     290                 :             // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
     291        53129508 :             let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
     292        53129508 :             buf.clear();
     293        53129508 :             val.ser_into(&mut buf)?;
     294        53129508 :             locked_inner
     295        53129508 :                 .file
     296        53129508 :                 .write_blob(
     297        53129508 :                     &buf,
     298        53129508 :                     &RequestContextBuilder::extend(ctx)
     299        53129508 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     300        53129508 :                         .build(),
     301        53129508 :                 )
     302           17560 :                 .await?
     303                 :         };
     304                 : 
     305        53129508 :         let vec_map = locked_inner.index.entry(key).or_default();
     306        53129508 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     307        53129508 :         if old.is_some() {
     308                 :             // We already had an entry for this LSN. That's odd..
     309 UBC           0 :             warn!("Key {} at {} already exists", key, lsn);
     310 CBC    53129508 :         }
     311                 : 
     312        53129508 :         Ok(())
     313        53129508 :     }
     314                 : 
     315            6569 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     316            6569 :         // TODO: Currently, we just leak the storage for any deleted keys
     317            6569 :         Ok(())
     318            6569 :     }
     319                 : 
     320                 :     /// Records the end_lsn for non-dropped layers.
     321                 :     /// `end_lsn` is exclusive
     322            4467 :     pub async fn freeze(&self, end_lsn: Lsn) {
     323            4467 :         let inner = self.inner.write().await;
     324                 : 
     325            4467 :         assert!(self.start_lsn < end_lsn);
     326            4467 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     327                 : 
     328         4179146 :         for vec_map in inner.index.values() {
     329        31960054 :             for (lsn, _pos) in vec_map.as_slice() {
     330        31960054 :                 assert!(*lsn < end_lsn);
     331                 :             }
     332                 :         }
     333            4467 :     }
     334                 : 
     335                 :     /// Write this frozen in-memory layer to disk.
     336                 :     ///
     337                 :     /// Returns a new delta layer with all the same data as this in-memory layer
     338            4399 :     pub(crate) async fn write_to_disk(
     339            4399 :         &self,
     340            4399 :         timeline: &Arc<Timeline>,
     341            4399 :         ctx: &RequestContext,
     342            4399 :     ) -> Result<ResidentLayer> {
     343                 :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     344                 :         // layer is not writeable anymore, no one should be trying to acquire the
     345                 :         // write lock on it, so we shouldn't block anyone. There's one exception
     346                 :         // though: another thread might have grabbed a reference to this layer
     347                 :         // in `get_layer_for_write' just before the checkpointer called
     348                 :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     349                 :         // lock, it will see that it's not writeable anymore and retry, but it
     350                 :         // would have to wait until we release it. That race condition is very
     351                 :         // rare though, so we just accept the potential latency hit for now.
     352            4399 :         let inner = self.inner.read().await;
     353                 : 
     354            4399 :         let end_lsn = *self.end_lsn.get().unwrap();
     355                 : 
     356            4399 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     357            4399 :             self.conf,
     358            4399 :             self.timeline_id,
     359            4399 :             self.tenant_shard_id,
     360            4399 :             Key::MIN,
     361            4399 :             self.start_lsn..end_lsn,
     362            4399 :         )
     363 UBC           0 :         .await?;
     364                 : 
     365 CBC        4399 :         let mut buf = Vec::new();
     366            4399 : 
     367            4399 :         let cursor = inner.file.block_cursor();
     368            4399 : 
     369            4399 :         // Sort the keys because delta layer writer expects them sorted.
     370            4399 :         //
     371            4399 :         // NOTE: this sort can take up significant time if the layer has millions of
     372            4399 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     373            4399 :         //       keep the value as a reference.
     374         4135508 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     375       107529572 :         keys.sort_unstable_by_key(|k| k.0);
     376            4399 : 
     377            4399 :         let ctx = RequestContextBuilder::extend(ctx)
     378            4399 :             .page_content_kind(PageContentKind::InMemoryLayer)
     379            4399 :             .build();
     380         4116779 :         for (key, vec_map) in keys.iter() {
     381         4116779 :             let key = Key::from_i128(*key);
     382                 :             // Write all page versions
     383        30107628 :             for (lsn, pos) in vec_map.as_slice() {
     384        30107628 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     385        30107627 :                 let will_init = Value::des(&buf)?.will_init();
     386        30107627 :                 delta_layer_writer
     387        30107627 :                     .put_value_bytes(key, *lsn, &buf, will_init)
     388 UBC           0 :                     .await?;
     389                 :             }
     390                 :         }
     391                 : 
     392                 :         // MAX is used here because we identify L0 layers by full key range
     393 CBC        4397 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     394            4397 :         Ok(delta_layer)
     395            4397 :     }
     396                 : }
        

Generated by: LCOV version 2.1-beta