LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 55.7 % 291 162
Test Date: 2024-02-23 13:21:27 Functions: 63.9 % 36 23

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::ValueReconstructResult;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::{PageReconstructError, Timeline};
      15              : use crate::walrecord;
      16              : use anyhow::{anyhow, ensure, Result};
      17              : use pageserver_api::keyspace::KeySpace;
      18              : use pageserver_api::models::InMemoryLayerInfo;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::collections::{BinaryHeap, HashMap, HashSet};
      21              : use std::sync::{Arc, OnceLock};
      22              : use tracing::*;
      23              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      24              : // avoid binding to Write (conflicts with std::io::Write)
      25              : // while being able to use std::fmt::Write's methods
      26              : use std::fmt::Write as _;
      27              : use std::ops::Range;
      28              : use tokio::sync::{RwLock, RwLockWriteGuard};
      29              : 
      30              : use super::{
      31              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      32              :     ValuesReconstructState,
      33              : };
      34              : 
      35              : pub struct InMemoryLayer {
      36              :     conf: &'static PageServerConf,
      37              :     tenant_shard_id: TenantShardId,
      38              :     timeline_id: TimelineId,
      39              : 
      40              :     /// This layer contains all the changes from 'start_lsn'. The
      41              :     /// start is inclusive.
      42              :     start_lsn: Lsn,
      43              : 
      44              :     /// Frozen layers have an exclusive end LSN.
      45              :     /// Writes are only allowed when this is `None`.
      46              :     end_lsn: OnceLock<Lsn>,
      47              : 
      48              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      49              :     /// All other changing parts are in `inner`, and protected by a mutex.
      50              :     inner: RwLock<InMemoryLayerInner>,
      51              : }
      52              : 
      53              : impl std::fmt::Debug for InMemoryLayer {
      54            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      55            0 :         f.debug_struct("InMemoryLayer")
      56            0 :             .field("start_lsn", &self.start_lsn)
      57            0 :             .field("end_lsn", &self.end_lsn)
      58            0 :             .field("inner", &self.inner)
      59            0 :             .finish()
      60            0 :     }
      61              : }
      62              : 
      63              : pub struct InMemoryLayerInner {
      64              :     /// All versions of all pages in the layer are kept here.  Indexed
      65              :     /// by block number and LSN. The value is an offset into the
      66              :     /// ephemeral file where the page version is stored.
      67              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      68              : 
      69              :     /// The values are stored in a serialized format in this file.
      70              :     /// Each serialized Value is preceded by a 'u32' length field.
      71              :     /// PerSeg::page_versions map stores offsets into this file.
      72              :     file: EphemeralFile,
      73              : }
      74              : 
      75              : impl std::fmt::Debug for InMemoryLayerInner {
      76            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      77            0 :         f.debug_struct("InMemoryLayerInner").finish()
      78            0 :     }
      79              : }
      80              : 
      81              : impl InMemoryLayer {
      82            4 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
      83            4 :         self.timeline_id
      84            4 :     }
      85              : 
      86            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
      87            0 :         let lsn_start = self.start_lsn;
      88              : 
      89            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
      90            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
      91              :         } else {
      92            0 :             InMemoryLayerInfo::Open { lsn_start }
      93              :         }
      94            0 :     }
      95              : 
      96      2913766 :     pub(crate) fn assert_writable(&self) {
      97      2913766 :         assert!(self.end_lsn.get().is_none());
      98      2913766 :     }
      99              : 
     100      3185404 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     101      3185404 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     102      3185404 :     }
     103              : 
     104      3184880 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     105      3184880 :         self.start_lsn..self.end_lsn_or_max()
     106      3184880 :     }
     107              : 
     108              :     /// debugging function to print out the contents of the layer
     109              :     ///
     110              :     /// this is likely completly unused
     111            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     112            0 :         let inner = self.inner.read().await;
     113              : 
     114            0 :         let end_str = self.end_lsn_or_max();
     115            0 : 
     116            0 :         println!(
     117            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     118            0 :             self.timeline_id, self.start_lsn, end_str,
     119            0 :         );
     120            0 : 
     121            0 :         if !verbose {
     122            0 :             return Ok(());
     123            0 :         }
     124            0 : 
     125            0 :         let cursor = inner.file.block_cursor();
     126            0 :         let mut buf = Vec::new();
     127            0 :         for (key, vec_map) in inner.index.iter() {
     128            0 :             for (lsn, pos) in vec_map.as_slice() {
     129            0 :                 let mut desc = String::new();
     130            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     131            0 :                 let val = Value::des(&buf);
     132            0 :                 match val {
     133            0 :                     Ok(Value::Image(img)) => {
     134            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     135              :                     }
     136            0 :                     Ok(Value::WalRecord(rec)) => {
     137            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     138            0 :                         write!(
     139            0 :                             &mut desc,
     140            0 :                             " rec {} bytes will_init: {} {}",
     141            0 :                             buf.len(),
     142            0 :                             rec.will_init(),
     143            0 :                             wal_desc
     144            0 :                         )?;
     145              :                     }
     146            0 :                     Err(err) => {
     147            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     148              :                     }
     149              :                 }
     150            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     151              :             }
     152              :         }
     153              : 
     154            0 :         Ok(())
     155            0 :     }
     156              : 
     157              :     /// Look up given value in the layer.
     158       503212 :     pub(crate) async fn get_value_reconstruct_data(
     159       503212 :         &self,
     160       503212 :         key: Key,
     161       503212 :         lsn_range: Range<Lsn>,
     162       503212 :         reconstruct_state: &mut ValueReconstructState,
     163       503212 :         ctx: &RequestContext,
     164       503212 :     ) -> anyhow::Result<ValueReconstructResult> {
     165       503212 :         ensure!(lsn_range.start >= self.start_lsn);
     166       503212 :         let mut need_image = true;
     167       503212 : 
     168       503212 :         let ctx = RequestContextBuilder::extend(ctx)
     169       503212 :             .page_content_kind(PageContentKind::InMemoryLayer)
     170       503212 :             .build();
     171              : 
     172       503212 :         let inner = self.inner.read().await;
     173              : 
     174       503212 :         let reader = inner.file.block_cursor();
     175              : 
     176              :         // Scan the page versions backwards, starting from `lsn`.
     177       503212 :         if let Some(vec_map) = inner.index.get(&key) {
     178       429823 :             let slice = vec_map.slice_range(lsn_range);
     179       429831 :             for (entry_lsn, pos) in slice.iter().rev() {
     180       429831 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     181       429831 :                 let value = Value::des(&buf)?;
     182       429831 :                 match value {
     183       429819 :                     Value::Image(img) => {
     184       429819 :                         reconstruct_state.img = Some((*entry_lsn, img));
     185       429819 :                         return Ok(ValueReconstructResult::Complete);
     186              :                     }
     187           12 :                     Value::WalRecord(rec) => {
     188           12 :                         let will_init = rec.will_init();
     189           12 :                         reconstruct_state.records.push((*entry_lsn, rec));
     190           12 :                         if will_init {
     191              :                             // This WAL record initializes the page, so no need to go further back
     192            0 :                             need_image = false;
     193            0 :                             break;
     194           12 :                         }
     195              :                     }
     196              :                 }
     197              :             }
     198        73389 :         }
     199              : 
     200              :         // release lock on 'inner'
     201              : 
     202              :         // If an older page image is needed to reconstruct the page, let the
     203              :         // caller know.
     204        73393 :         if need_image {
     205        73393 :             Ok(ValueReconstructResult::Continue)
     206              :         } else {
     207            0 :             Ok(ValueReconstructResult::Complete)
     208              :         }
     209       503212 :     }
     210              : 
     211              :     // Look up the keys in the provided keyspace and update
     212              :     // the reconstruct state with whatever is found.
     213              :     //
     214              :     // If the key is cached, go no further than the cached Lsn.
     215            0 :     pub(crate) async fn get_values_reconstruct_data(
     216            0 :         &self,
     217            0 :         keyspace: KeySpace,
     218            0 :         end_lsn: Lsn,
     219            0 :         reconstruct_state: &mut ValuesReconstructState,
     220            0 :         ctx: &RequestContext,
     221            0 :     ) -> Result<(), GetVectoredError> {
     222            0 :         let ctx = RequestContextBuilder::extend(ctx)
     223            0 :             .page_content_kind(PageContentKind::InMemoryLayer)
     224            0 :             .build();
     225              : 
     226            0 :         let inner = self.inner.read().await;
     227            0 :         let reader = inner.file.block_cursor();
     228            0 : 
     229            0 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     230            0 :         struct BlockRead {
     231            0 :             key: Key,
     232            0 :             lsn: Lsn,
     233            0 :             block_offset: u64,
     234            0 :         }
     235            0 : 
     236            0 :         let mut planned_block_reads = BinaryHeap::new();
     237              : 
     238            0 :         for range in keyspace.ranges.iter() {
     239            0 :             let mut key = range.start;
     240            0 :             while key < range.end {
     241            0 :                 if let Some(vec_map) = inner.index.get(&key) {
     242            0 :                     let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     243            0 :                         Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     244            0 :                         None => self.start_lsn..end_lsn,
     245              :                     };
     246              : 
     247            0 :                     let slice = vec_map.slice_range(lsn_range);
     248            0 :                     for (entry_lsn, pos) in slice.iter().rev() {
     249            0 :                         planned_block_reads.push(BlockRead {
     250            0 :                             key,
     251            0 :                             lsn: *entry_lsn,
     252            0 :                             block_offset: *pos,
     253            0 :                         });
     254            0 :                     }
     255            0 :                 }
     256              : 
     257            0 :                 key = key.next();
     258              :             }
     259              :         }
     260              : 
     261            0 :         let keyspace_size = keyspace.total_size();
     262            0 : 
     263            0 :         let mut completed_keys = HashSet::new();
     264            0 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     265            0 :             let block_read = planned_block_reads.pop().unwrap();
     266            0 :             if completed_keys.contains(&block_read.key) {
     267            0 :                 continue;
     268            0 :             }
     269              : 
     270            0 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     271            0 :             if let Err(e) = buf {
     272            0 :                 reconstruct_state
     273            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     274            0 :                 completed_keys.insert(block_read.key);
     275            0 :                 continue;
     276            0 :             }
     277            0 : 
     278            0 :             let value = Value::des(&buf.unwrap());
     279            0 :             if let Err(e) = value {
     280            0 :                 reconstruct_state
     281            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     282            0 :                 completed_keys.insert(block_read.key);
     283            0 :                 continue;
     284            0 :             }
     285            0 : 
     286            0 :             let key_situation =
     287            0 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     288            0 :             if key_situation == ValueReconstructSituation::Complete {
     289            0 :                 completed_keys.insert(block_read.key);
     290            0 :             }
     291              :         }
     292              : 
     293            0 :         Ok(())
     294            0 :     }
     295              : }
     296              : 
     297              : impl std::fmt::Display for InMemoryLayer {
     298          524 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     299          524 :         let end_lsn = self.end_lsn_or_max();
     300          524 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     301          524 :     }
     302              : }
     303              : 
     304              : impl InMemoryLayer {
     305              :     /// Get layer size.
     306      2627992 :     pub async fn size(&self) -> Result<u64> {
     307      2627992 :         let inner = self.inner.read().await;
     308      2627992 :         Ok(inner.file.len())
     309      2627992 :     }
     310              : 
     311              :     /// Create a new, empty, in-memory layer
     312          640 :     pub async fn create(
     313          640 :         conf: &'static PageServerConf,
     314          640 :         timeline_id: TimelineId,
     315          640 :         tenant_shard_id: TenantShardId,
     316          640 :         start_lsn: Lsn,
     317          640 :     ) -> Result<InMemoryLayer> {
     318            0 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     319              : 
     320          640 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     321              : 
     322          640 :         Ok(InMemoryLayer {
     323          640 :             conf,
     324          640 :             timeline_id,
     325          640 :             tenant_shard_id,
     326          640 :             start_lsn,
     327          640 :             end_lsn: OnceLock::new(),
     328          640 :             inner: RwLock::new(InMemoryLayerInner {
     329          640 :                 index: HashMap::new(),
     330          640 :                 file,
     331          640 :             }),
     332          640 :         })
     333          640 :     }
     334              : 
     335              :     // Write operations
     336              : 
     337              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     338              :     /// Adds the page version to the in-memory tree
     339              : 
     340      2913766 :     pub(crate) async fn put_value(
     341      2913766 :         &self,
     342      2913766 :         key: Key,
     343      2913766 :         lsn: Lsn,
     344      2913766 :         buf: &[u8],
     345      2913766 :         ctx: &RequestContext,
     346      2913766 :     ) -> Result<()> {
     347      2913766 :         let mut inner = self.inner.write().await;
     348      2913766 :         self.assert_writable();
     349      2913766 :         self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
     350      2913766 :     }
     351              : 
     352      2913766 :     async fn put_value_locked(
     353      2913766 :         &self,
     354      2913766 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     355      2913766 :         key: Key,
     356      2913766 :         lsn: Lsn,
     357      2913766 :         buf: &[u8],
     358      2913766 :         ctx: &RequestContext,
     359      2913766 :     ) -> Result<()> {
     360            0 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     361              : 
     362      2913766 :         let off = {
     363      2913766 :             locked_inner
     364      2913766 :                 .file
     365      2913766 :                 .write_blob(
     366      2913766 :                     buf,
     367      2913766 :                     &RequestContextBuilder::extend(ctx)
     368      2913766 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     369      2913766 :                         .build(),
     370      2913766 :                 )
     371          729 :                 .await?
     372              :         };
     373              : 
     374      2913766 :         let vec_map = locked_inner.index.entry(key).or_default();
     375      2913766 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     376      2913766 :         if old.is_some() {
     377              :             // We already had an entry for this LSN. That's odd..
     378            0 :             warn!("Key {} at {} already exists", key, lsn);
     379      2913766 :         }
     380              : 
     381      2913766 :         Ok(())
     382      2913766 :     }
     383              : 
     384            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     385            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     386            2 :         Ok(())
     387            2 :     }
     388              : 
     389              :     /// Records the end_lsn for non-dropped layers.
     390              :     /// `end_lsn` is exclusive
     391          520 :     pub async fn freeze(&self, end_lsn: Lsn) {
     392          520 :         let inner = self.inner.write().await;
     393              : 
     394          520 :         assert!(
     395          520 :             self.start_lsn < end_lsn,
     396            0 :             "{} >= {}",
     397              :             self.start_lsn,
     398              :             end_lsn
     399              :         );
     400          520 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     401              : 
     402      2135119 :         for vec_map in inner.index.values() {
     403      2210026 :             for (lsn, _pos) in vec_map.as_slice() {
     404      2210026 :                 assert!(*lsn < end_lsn);
     405              :             }
     406              :         }
     407          520 :     }
     408              : 
     409              :     /// Write this frozen in-memory layer to disk.
     410              :     ///
     411              :     /// Returns a new delta layer with all the same data as this in-memory layer
     412          450 :     pub(crate) async fn write_to_disk(
     413          450 :         &self,
     414          450 :         timeline: &Arc<Timeline>,
     415          450 :         ctx: &RequestContext,
     416          450 :     ) -> Result<ResidentLayer> {
     417              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     418              :         // layer is not writeable anymore, no one should be trying to acquire the
     419              :         // write lock on it, so we shouldn't block anyone. There's one exception
     420              :         // though: another thread might have grabbed a reference to this layer
     421              :         // in `get_layer_for_write' just before the checkpointer called
     422              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     423              :         // lock, it will see that it's not writeable anymore and retry, but it
     424              :         // would have to wait until we release it. That race condition is very
     425              :         // rare though, so we just accept the potential latency hit for now.
     426          450 :         let inner = self.inner.read().await;
     427              : 
     428          450 :         let end_lsn = *self.end_lsn.get().unwrap();
     429              : 
     430          450 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     431          450 :             self.conf,
     432          450 :             self.timeline_id,
     433          450 :             self.tenant_shard_id,
     434          450 :             Key::MIN,
     435          450 :             self.start_lsn..end_lsn,
     436          450 :         )
     437          267 :         .await?;
     438              : 
     439          450 :         let mut buf = Vec::new();
     440          450 : 
     441          450 :         let cursor = inner.file.block_cursor();
     442          450 : 
     443          450 :         // Sort the keys because delta layer writer expects them sorted.
     444          450 :         //
     445          450 :         // NOTE: this sort can take up significant time if the layer has millions of
     446          450 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     447          450 :         //       keep the value as a reference.
     448      2134559 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     449     61367088 :         keys.sort_unstable_by_key(|k| k.0);
     450          450 : 
     451          450 :         let ctx = RequestContextBuilder::extend(ctx)
     452          450 :             .page_content_kind(PageContentKind::InMemoryLayer)
     453          450 :             .build();
     454      2134559 :         for (key, vec_map) in keys.iter() {
     455      2134559 :             let key = Key::from_i128(*key);
     456              :             // Write all page versions
     457      2209466 :             for (lsn, pos) in vec_map.as_slice() {
     458      2209466 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     459      2209466 :                 let will_init = Value::des(&buf)?.will_init();
     460              :                 let res;
     461      2209466 :                 (buf, res) = delta_layer_writer
     462      2209466 :                     .put_value_bytes(key, *lsn, buf, will_init)
     463          162 :                     .await;
     464      2209466 :                 res?;
     465              :             }
     466              :         }
     467              : 
     468              :         // MAX is used here because we identify L0 layers by full key range
     469          450 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     470          450 :         Ok(delta_layer)
     471          450 :     }
     472              : }
        

Generated by: LCOV version 2.1-beta