LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 56.7 % 305 173
Test Date: 2024-02-29 11:57:12 Functions: 60.5 % 38 23

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::ValueReconstructResult;
      13              : use crate::tenant::timeline::GetVectoredError;
      14              : use crate::tenant::{PageReconstructError, Timeline};
      15              : use crate::walrecord;
      16              : use anyhow::{anyhow, ensure, Result};
      17              : use pageserver_api::keyspace::KeySpace;
      18              : use pageserver_api::models::InMemoryLayerInfo;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use std::collections::{BinaryHeap, HashMap, HashSet};
      21              : use std::sync::{Arc, OnceLock};
      22              : use tracing::*;
      23              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      24              : // avoid binding to Write (conflicts with std::io::Write)
      25              : // while being able to use std::fmt::Write's methods
      26              : use std::fmt::Write as _;
      27              : use std::ops::Range;
      28              : use tokio::sync::{RwLock, RwLockWriteGuard};
      29              : 
      30              : use super::{
      31              :     DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
      32              :     ValuesReconstructState,
      33              : };
      34              : 
      35              : pub struct InMemoryLayer {
      36              :     conf: &'static PageServerConf,
      37              :     tenant_shard_id: TenantShardId,
      38              :     timeline_id: TimelineId,
      39              : 
      40              :     /// This layer contains all the changes from 'start_lsn'. The
      41              :     /// start is inclusive.
      42              :     start_lsn: Lsn,
      43              : 
      44              :     /// Frozen layers have an exclusive end LSN.
      45              :     /// Writes are only allowed when this is `None`.
      46              :     end_lsn: OnceLock<Lsn>,
      47              : 
      48              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      49              :     /// All other changing parts are in `inner`, and protected by a mutex.
      50              :     inner: RwLock<InMemoryLayerInner>,
      51              : }
      52              : 
      53              : impl std::fmt::Debug for InMemoryLayer {
      54            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      55            0 :         f.debug_struct("InMemoryLayer")
      56            0 :             .field("start_lsn", &self.start_lsn)
      57            0 :             .field("end_lsn", &self.end_lsn)
      58            0 :             .field("inner", &self.inner)
      59            0 :             .finish()
      60            0 :     }
      61              : }
      62              : 
      63              : pub struct InMemoryLayerInner {
      64              :     /// All versions of all pages in the layer are kept here.  Indexed
      65              :     /// by block number and LSN. The value is an offset into the
      66              :     /// ephemeral file where the page version is stored.
      67              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      68              : 
      69              :     /// The values are stored in a serialized format in this file.
      70              :     /// Each serialized Value is preceded by a 'u32' length field.
      71              :     /// PerSeg::page_versions map stores offsets into this file.
      72              :     file: EphemeralFile,
      73              : }
      74              : 
      75              : impl std::fmt::Debug for InMemoryLayerInner {
      76            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      77            0 :         f.debug_struct("InMemoryLayerInner").finish()
      78            0 :     }
      79              : }
      80              : 
      81              : impl InMemoryLayer {
      82            4 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
      83            4 :         self.timeline_id
      84            4 :     }
      85              : 
      86            0 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
      87            0 :         let lsn_start = self.start_lsn;
      88              : 
      89            0 :         if let Some(&lsn_end) = self.end_lsn.get() {
      90            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
      91              :         } else {
      92            0 :             InMemoryLayerInfo::Open { lsn_start }
      93              :         }
      94            0 :     }
      95              : 
      96      2628042 :     pub(crate) fn assert_writable(&self) {
      97      2628042 :         assert!(self.end_lsn.get().is_none());
      98      2628042 :     }
      99              : 
     100      3185800 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
     101      3185800 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     102      3185800 :     }
     103              : 
     104      3185272 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     105      3185272 :         self.start_lsn..self.end_lsn_or_max()
     106      3185272 :     }
     107              : 
     108              :     /// debugging function to print out the contents of the layer
     109              :     ///
     110              :     /// this is likely completly unused
     111            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     112            0 :         let inner = self.inner.read().await;
     113              : 
     114            0 :         let end_str = self.end_lsn_or_max();
     115            0 : 
     116            0 :         println!(
     117            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     118            0 :             self.timeline_id, self.start_lsn, end_str,
     119            0 :         );
     120            0 : 
     121            0 :         if !verbose {
     122            0 :             return Ok(());
     123            0 :         }
     124            0 : 
     125            0 :         let cursor = inner.file.block_cursor();
     126            0 :         let mut buf = Vec::new();
     127            0 :         for (key, vec_map) in inner.index.iter() {
     128            0 :             for (lsn, pos) in vec_map.as_slice() {
     129            0 :                 let mut desc = String::new();
     130            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     131            0 :                 let val = Value::des(&buf);
     132            0 :                 match val {
     133            0 :                     Ok(Value::Image(img)) => {
     134            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     135              :                     }
     136            0 :                     Ok(Value::WalRecord(rec)) => {
     137            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     138            0 :                         write!(
     139            0 :                             &mut desc,
     140            0 :                             " rec {} bytes will_init: {} {}",
     141            0 :                             buf.len(),
     142            0 :                             rec.will_init(),
     143            0 :                             wal_desc
     144            0 :                         )?;
     145              :                     }
     146            0 :                     Err(err) => {
     147            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     148              :                     }
     149              :                 }
     150            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     151              :             }
     152              :         }
     153              : 
     154            0 :         Ok(())
     155            0 :     }
     156              : 
     157              :     /// Look up given value in the layer.
     158       503245 :     pub(crate) async fn get_value_reconstruct_data(
     159       503245 :         &self,
     160       503245 :         key: Key,
     161       503245 :         lsn_range: Range<Lsn>,
     162       503245 :         reconstruct_state: &mut ValueReconstructState,
     163       503245 :         ctx: &RequestContext,
     164       503245 :     ) -> anyhow::Result<ValueReconstructResult> {
     165       503245 :         ensure!(lsn_range.start >= self.start_lsn);
     166       503245 :         let mut need_image = true;
     167       503245 : 
     168       503245 :         let ctx = RequestContextBuilder::extend(ctx)
     169       503245 :             .page_content_kind(PageContentKind::InMemoryLayer)
     170       503245 :             .build();
     171              : 
     172       503245 :         let inner = self.inner.read().await;
     173              : 
     174       503245 :         let reader = inner.file.block_cursor();
     175              : 
     176              :         // Scan the page versions backwards, starting from `lsn`.
     177       503245 :         if let Some(vec_map) = inner.index.get(&key) {
     178       429841 :             let slice = vec_map.slice_range(lsn_range);
     179       429847 :             for (entry_lsn, pos) in slice.iter().rev() {
     180       429847 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     181       429847 :                 let value = Value::des(&buf)?;
     182       429847 :                 match value {
     183       429837 :                     Value::Image(img) => {
     184       429837 :                         reconstruct_state.img = Some((*entry_lsn, img));
     185       429837 :                         return Ok(ValueReconstructResult::Complete);
     186              :                     }
     187           10 :                     Value::WalRecord(rec) => {
     188           10 :                         let will_init = rec.will_init();
     189           10 :                         reconstruct_state.records.push((*entry_lsn, rec));
     190           10 :                         if will_init {
     191              :                             // This WAL record initializes the page, so no need to go further back
     192            0 :                             need_image = false;
     193            0 :                             break;
     194           10 :                         }
     195              :                     }
     196              :                 }
     197              :             }
     198        73404 :         }
     199              : 
     200              :         // release lock on 'inner'
     201              : 
     202              :         // If an older page image is needed to reconstruct the page, let the
     203              :         // caller know.
     204        73408 :         if need_image {
     205        73408 :             Ok(ValueReconstructResult::Continue)
     206              :         } else {
     207            0 :             Ok(ValueReconstructResult::Complete)
     208              :         }
     209       503245 :     }
     210              : 
     211              :     // Look up the keys in the provided keyspace and update
     212              :     // the reconstruct state with whatever is found.
     213              :     //
     214              :     // If the key is cached, go no further than the cached Lsn.
     215            0 :     pub(crate) async fn get_values_reconstruct_data(
     216            0 :         &self,
     217            0 :         keyspace: KeySpace,
     218            0 :         end_lsn: Lsn,
     219            0 :         reconstruct_state: &mut ValuesReconstructState,
     220            0 :         ctx: &RequestContext,
     221            0 :     ) -> Result<(), GetVectoredError> {
     222            0 :         let ctx = RequestContextBuilder::extend(ctx)
     223            0 :             .page_content_kind(PageContentKind::InMemoryLayer)
     224            0 :             .build();
     225              : 
     226            0 :         let inner = self.inner.read().await;
     227            0 :         let reader = inner.file.block_cursor();
     228            0 : 
     229            0 :         #[derive(Eq, PartialEq, Ord, PartialOrd)]
     230            0 :         struct BlockRead {
     231            0 :             key: Key,
     232            0 :             lsn: Lsn,
     233            0 :             block_offset: u64,
     234            0 :         }
     235            0 : 
     236            0 :         let mut planned_block_reads = BinaryHeap::new();
     237              : 
     238            0 :         for range in keyspace.ranges.iter() {
     239            0 :             let mut key = range.start;
     240            0 :             while key < range.end {
     241            0 :                 if let Some(vec_map) = inner.index.get(&key) {
     242            0 :                     let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
     243            0 :                         Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
     244            0 :                         None => self.start_lsn..end_lsn,
     245              :                     };
     246              : 
     247            0 :                     let slice = vec_map.slice_range(lsn_range);
     248            0 :                     for (entry_lsn, pos) in slice.iter().rev() {
     249            0 :                         planned_block_reads.push(BlockRead {
     250            0 :                             key,
     251            0 :                             lsn: *entry_lsn,
     252            0 :                             block_offset: *pos,
     253            0 :                         });
     254            0 :                     }
     255            0 :                 }
     256              : 
     257            0 :                 key = key.next();
     258              :             }
     259              :         }
     260              : 
     261            0 :         let keyspace_size = keyspace.total_size();
     262            0 : 
     263            0 :         let mut completed_keys = HashSet::new();
     264            0 :         while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() {
     265            0 :             let block_read = planned_block_reads.pop().unwrap();
     266            0 :             if completed_keys.contains(&block_read.key) {
     267            0 :                 continue;
     268            0 :             }
     269              : 
     270            0 :             let buf = reader.read_blob(block_read.block_offset, &ctx).await;
     271            0 :             if let Err(e) = buf {
     272            0 :                 reconstruct_state
     273            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     274            0 :                 completed_keys.insert(block_read.key);
     275            0 :                 continue;
     276            0 :             }
     277            0 : 
     278            0 :             let value = Value::des(&buf.unwrap());
     279            0 :             if let Err(e) = value {
     280            0 :                 reconstruct_state
     281            0 :                     .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
     282            0 :                 completed_keys.insert(block_read.key);
     283            0 :                 continue;
     284            0 :             }
     285            0 : 
     286            0 :             let key_situation =
     287            0 :                 reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
     288            0 :             if key_situation == ValueReconstructSituation::Complete {
     289            0 :                 completed_keys.insert(block_read.key);
     290            0 :             }
     291              :         }
     292              : 
     293            0 :         Ok(())
     294            0 :     }
     295              : }
     296              : 
     297              : impl std::fmt::Display for InMemoryLayer {
     298          528 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     299          528 :         let end_lsn = self.end_lsn_or_max();
     300          528 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     301          528 :     }
     302              : }
     303              : 
     304              : impl InMemoryLayer {
     305              :     /// Get layer size.
     306            0 :     pub async fn size(&self) -> Result<u64> {
     307            0 :         let inner = self.inner.read().await;
     308            0 :         Ok(inner.file.len())
     309            0 :     }
     310              : 
     311              :     /// Create a new, empty, in-memory layer
     312          644 :     pub async fn create(
     313          644 :         conf: &'static PageServerConf,
     314          644 :         timeline_id: TimelineId,
     315          644 :         tenant_shard_id: TenantShardId,
     316          644 :         start_lsn: Lsn,
     317          644 :     ) -> Result<InMemoryLayer> {
     318            0 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     319              : 
     320          644 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     321              : 
     322          644 :         Ok(InMemoryLayer {
     323          644 :             conf,
     324          644 :             timeline_id,
     325          644 :             tenant_shard_id,
     326          644 :             start_lsn,
     327          644 :             end_lsn: OnceLock::new(),
     328          644 :             inner: RwLock::new(InMemoryLayerInner {
     329          644 :                 index: HashMap::new(),
     330          644 :                 file,
     331          644 :             }),
     332          644 :         })
     333          644 :     }
     334              : 
     335              :     // Write operations
     336              : 
     337              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     338              :     /// Adds the page version to the in-memory tree
     339      2214102 :     pub(crate) async fn put_value(
     340      2214102 :         &self,
     341      2214102 :         key: Key,
     342      2214102 :         lsn: Lsn,
     343      2214102 :         val: &Value,
     344      2214102 :         ctx: &RequestContext,
     345      2214102 :     ) -> Result<()> {
     346      2214102 :         let mut inner = self.inner.write().await;
     347      2214102 :         self.assert_writable();
     348      2214102 :         self.put_value_locked(&mut inner, key, lsn, val, ctx).await
     349      2214102 :     }
     350              : 
     351       413940 :     pub(crate) async fn put_values(
     352       413940 :         &self,
     353       413940 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
     354       413940 :         ctx: &RequestContext,
     355       413940 :     ) -> Result<()> {
     356       413940 :         let mut inner = self.inner.write().await;
     357       413940 :         self.assert_writable();
     358      1113632 :         for (key, vals) in values {
     359      1399388 :             for (lsn, val) in vals {
     360       699696 :                 self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
     361          328 :                     .await?;
     362              :             }
     363              :         }
     364       413940 :         Ok(())
     365       413940 :     }
     366              : 
     367      2913798 :     async fn put_value_locked(
     368      2913798 :         &self,
     369      2913798 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     370      2913798 :         key: Key,
     371      2913798 :         lsn: Lsn,
     372      2913798 :         val: &Value,
     373      2913798 :         ctx: &RequestContext,
     374      2913798 :     ) -> Result<()> {
     375            0 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     376              : 
     377      2913798 :         let off = {
     378              :             // Avoid doing allocations for "small" values.
     379              :             // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
     380              :             // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
     381      2913798 :             let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
     382      2913798 :             buf.clear();
     383      2913798 :             val.ser_into(&mut buf)?;
     384      2913798 :             locked_inner
     385      2913798 :                 .file
     386      2913798 :                 .write_blob(
     387      2913798 :                     &buf,
     388      2913798 :                     &RequestContextBuilder::extend(ctx)
     389      2913798 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     390      2913798 :                         .build(),
     391      2913798 :                 )
     392          936 :                 .await?
     393              :         };
     394              : 
     395      2913798 :         let vec_map = locked_inner.index.entry(key).or_default();
     396      2913798 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     397      2913798 :         if old.is_some() {
     398              :             // We already had an entry for this LSN. That's odd..
     399            0 :             warn!("Key {} at {} already exists", key, lsn);
     400      2913798 :         }
     401              : 
     402      2913798 :         Ok(())
     403      2913798 :     }
     404              : 
     405            2 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     406            2 :         // TODO: Currently, we just leak the storage for any deleted keys
     407            2 :         Ok(())
     408            2 :     }
     409              : 
     410              :     /// Records the end_lsn for non-dropped layers.
     411              :     /// `end_lsn` is exclusive
     412          524 :     pub async fn freeze(&self, end_lsn: Lsn) {
     413          524 :         let inner = self.inner.write().await;
     414              : 
     415          524 :         assert!(self.start_lsn < end_lsn);
     416          524 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     417              : 
     418      2135136 :         for vec_map in inner.index.values() {
     419      2210058 :             for (lsn, _pos) in vec_map.as_slice() {
     420      2210058 :                 assert!(*lsn < end_lsn);
     421              :             }
     422              :         }
     423          524 :     }
     424              : 
     425              :     /// Write this frozen in-memory layer to disk.
     426              :     ///
     427              :     /// Returns a new delta layer with all the same data as this in-memory layer
     428          450 :     pub(crate) async fn write_to_disk(
     429          450 :         &self,
     430          450 :         timeline: &Arc<Timeline>,
     431          450 :         ctx: &RequestContext,
     432          450 :     ) -> Result<ResidentLayer> {
     433              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     434              :         // layer is not writeable anymore, no one should be trying to acquire the
     435              :         // write lock on it, so we shouldn't block anyone. There's one exception
     436              :         // though: another thread might have grabbed a reference to this layer
     437              :         // in `get_layer_for_write' just before the checkpointer called
     438              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     439              :         // lock, it will see that it's not writeable anymore and retry, but it
     440              :         // would have to wait until we release it. That race condition is very
     441              :         // rare though, so we just accept the potential latency hit for now.
     442          450 :         let inner = self.inner.read().await;
     443              : 
     444          450 :         let end_lsn = *self.end_lsn.get().unwrap();
     445              : 
     446          450 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     447          450 :             self.conf,
     448          450 :             self.timeline_id,
     449          450 :             self.tenant_shard_id,
     450          450 :             Key::MIN,
     451          450 :             self.start_lsn..end_lsn,
     452          450 :         )
     453          267 :         .await?;
     454              : 
     455          450 :         let mut buf = Vec::new();
     456          450 : 
     457          450 :         let cursor = inner.file.block_cursor();
     458          450 : 
     459          450 :         // Sort the keys because delta layer writer expects them sorted.
     460          450 :         //
     461          450 :         // NOTE: this sort can take up significant time if the layer has millions of
     462          450 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     463          450 :         //       keep the value as a reference.
     464      2134544 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     465     61509772 :         keys.sort_unstable_by_key(|k| k.0);
     466          450 : 
     467          450 :         let ctx = RequestContextBuilder::extend(ctx)
     468          450 :             .page_content_kind(PageContentKind::InMemoryLayer)
     469          450 :             .build();
     470      2134544 :         for (key, vec_map) in keys.iter() {
     471      2134544 :             let key = Key::from_i128(*key);
     472              :             // Write all page versions
     473      2209466 :             for (lsn, pos) in vec_map.as_slice() {
     474      2209466 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     475      2209466 :                 let will_init = Value::des(&buf)?.will_init();
     476              :                 let res;
     477      2209466 :                 (buf, res) = delta_layer_writer
     478      2209466 :                     .put_value_bytes(key, *lsn, buf, will_init)
     479          174 :                     .await;
     480      2209466 :                 res?;
     481              :             }
     482              :         }
     483              : 
     484              :         // MAX is used here because we identify L0 layers by full key range
     485          450 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     486          450 :         Ok(delta_layer)
     487          450 :     }
     488              : }
        

Generated by: LCOV version 2.1-beta