       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
      13              : use crate::tenant::Timeline;
      14              : use crate::walrecord;
      15              : use anyhow::{ensure, Result};
      16              : use pageserver_api::models::InMemoryLayerInfo;
      17              : use pageserver_api::shard::TenantShardId;
      18              : use std::collections::HashMap;
      19              : use std::sync::{Arc, OnceLock};
      20              : use tracing::*;
      21              : use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
      22              : // avoid binding to Write (conflicts with std::io::Write)
      23              : // while being able to use std::fmt::Write's methods
      24              : use std::fmt::Write as _;
      25              : use std::ops::Range;
      26              : use tokio::sync::{RwLock, RwLockWriteGuard};
      27              : 
      28              : use super::{DeltaLayerWriter, ResidentLayer};
      29              : 
      30              : pub struct InMemoryLayer {
      31              :     conf: &'static PageServerConf,
      32              :     tenant_shard_id: TenantShardId,
      33              :     timeline_id: TimelineId,
      34              : 
      35              :     /// This layer contains all the changes from 'start_lsn'. The
      36              :     /// start is inclusive.
      37              :     start_lsn: Lsn,
      38              : 
      39              :     /// Frozen layers have an exclusive end LSN.
      40              :     /// Writes are only allowed when this is `None`.
      41              :     end_lsn: OnceLock<Lsn>,
      42              : 
      43              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      44              :     /// All other changing parts are in `inner`, and protected by a mutex.
      45              :     inner: RwLock<InMemoryLayerInner>,
      46              : }
      47              : 
      48              : impl std::fmt::Debug for InMemoryLayer {
      49            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      50            0 :         f.debug_struct("InMemoryLayer")
      51            0 :             .field("start_lsn", &self.start_lsn)
      52            0 :             .field("end_lsn", &self.end_lsn)
      53            0 :             .field("inner", &self.inner)
      54            0 :             .finish()
      55            0 :     }
      56              : }
      57              : 
      58              : pub struct InMemoryLayerInner {
      59              :     /// All versions of all pages in the layer are kept here.  Indexed
      60              :     /// by block number and LSN. The value is an offset into the
      61              :     /// ephemeral file where the page version is stored.
      62              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      63              : 
      64              :     /// The values are stored in a serialized format in this file.
      65              :     /// Each serialized Value is preceded by a 'u32' length field.
      66              :     /// PerSeg::page_versions map stores offsets into this file.
      67              :     file: EphemeralFile,
      68              : }
      69              : 
      70              : impl std::fmt::Debug for InMemoryLayerInner {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         f.debug_struct("InMemoryLayerInner").finish()
      73            0 :     }
      74              : }
      75              : 
      76              : impl InMemoryLayer {
      77          337 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
      78          337 :         self.timeline_id
      79          337 :     }
      80              : 
      81            4 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
      82            4 :         let lsn_start = self.start_lsn;
      83              : 
      84            4 :         if let Some(&lsn_end) = self.end_lsn.get() {
      85            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
      86              :         } else {
      87            4 :             InMemoryLayerInfo::Open { lsn_start }
      88              :         }
      89            4 :     }
      90              : 
      91      2932753 :     pub(crate) fn assert_writable(&self) {
      92      2932753 :         assert!(self.end_lsn.get().is_none());
      93      2932753 :     }
      94              : 
      95     29921801 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
      96     29921801 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
      97     29921801 :     }
      98              : 
      99     29916198 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     100     29916198 :         self.start_lsn..self.end_lsn_or_max()
     101     29916198 :     }
     102              : 
     103              :     /// debugging function to print out the contents of the layer
     104              :     ///
     105              :     /// this is likely completly unused
     106            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     107            0 :         let inner =;
     108              : 
     109            0 :         let end_str = self.end_lsn_or_max();
     110            0 : 
     111            0 :         println!(
     112            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     113            0 :             self.timeline_id, self.start_lsn, end_str,
     114            0 :         );
     115            0 : 
     116            0 :         if !verbose {
     117            0 :             return Ok(());
     118            0 :         }
     119            0 : 
     120            0 :         let cursor = inner.file.block_cursor();
     121            0 :         let mut buf = Vec::new();
     122            0 :         for (key, vec_map) in inner.index.iter() {
     123            0 :             for (lsn, pos) in vec_map.as_slice() {
     124            0 :                 let mut desc = String::new();
     125            0 :                 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
     126            0 :                 let val = Value::des(&buf);
     127            0 :                 match val {
     128            0 :                     Ok(Value::Image(img)) => {
     129            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     130              :                     }
     131            0 :                     Ok(Value::WalRecord(rec)) => {
     132            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     133            0 :                         write!(
     134            0 :                             &mut desc,
     135            0 :                             " rec {} bytes will_init: {} {}",
     136            0 :                             buf.len(),
     137            0 :                             rec.will_init(),
     138            0 :                             wal_desc
     139            0 :                         )?;
     140              :                     }
     141            0 :                     Err(err) => {
     142            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     143              :                     }
     144              :                 }
     145            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     146              :             }
     147              :         }
     148              : 
     149            0 :         Ok(())
     150            0 :     }
     151              : 
     152              :     /// Look up given value in the layer.
     153      6962343 :     pub(crate) async fn get_value_reconstruct_data(
     154      6962343 :         &self,
     155      6962343 :         key: Key,
     156      6962343 :         lsn_range: Range<Lsn>,
     157      6962343 :         reconstruct_state: &mut ValueReconstructState,
     158      6962343 :         ctx: &RequestContext,
     159      6962343 :     ) -> anyhow::Result<ValueReconstructResult> {
     160      6962339 :         ensure!(lsn_range.start >= self.start_lsn);
     161      6962339 :         let mut need_image = true;
     162      6962339 : 
     163      6962339 :         let ctx = RequestContextBuilder::extend(ctx)
     164      6962339 :             .page_content_kind(PageContentKind::InMemoryLayer)
     165      6962339 :             .build();
     166              : 
     167      6962339 :         let inner =;
     168              : 
     169      6962339 :         let reader = inner.file.block_cursor();
     170              : 
     171              :         // Scan the page versions backwards, starting from `lsn`.
     172      6962339 :         if let Some(vec_map) = inner.index.get(&key) {
     173      4203246 :             let slice = vec_map.slice_range(lsn_range);
     174     34868495 :             for (entry_lsn, pos) in slice.iter().rev() {
     175     34868495 :                 let buf = reader.read_blob(*pos, &ctx).await?;
     176     34868495 :                 let value = Value::des(&buf)?;
     177     34868495 :                 match value {
     178      1577349 :                     Value::Image(img) => {
     179      1577349 :                         reconstruct_state.img = Some((*entry_lsn, img));
     180      1577349 :                         return Ok(ValueReconstructResult::Complete);
     181              :                     }
     182     33291146 :                     Value::WalRecord(rec) => {
     183     33291146 :                         let will_init = rec.will_init();
     184     33291146 :                         reconstruct_state.records.push((*entry_lsn, rec));
     185     33291146 :                         if will_init {
     186              :                             // This WAL record initializes the page, so no need to go further back
     187       537941 :                             need_image = false;
     188       537941 :                             break;
     189     32753205 :                         }
     190              :                     }
     191              :                 }
     192              :             }
     193      2759093 :         }
     194              : 
     195              :         // release lock on 'inner'
     196              : 
     197              :         // If an older page image is needed to reconstruct the page, let the
     198              :         // caller know.
     199      5384990 :         if need_image {
     200      4847049 :             Ok(ValueReconstructResult::Continue)
     201              :         } else {
     202       537941 :             Ok(ValueReconstructResult::Complete)
     203              :         }
     204      6962339 :     }
     205              : }
     206              : 
     207              : impl std::fmt::Display for InMemoryLayer {
     208         5603 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     209         5603 :         let end_lsn = self.end_lsn_or_max();
     210         5603 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     211         5603 :     }
     212              : }
     213              : 
     214              : impl InMemoryLayer {
     215              :     /// Get layer size.
     216      1400622 :     pub async fn size(&self) -> Result<u64> {
     217      1400622 :         let inner =;
     218      1400622 :         Ok(inner.file.len())
     219      1400622 :     }
     220              : 
     221              :     /// Create a new, empty, in-memory layer
     222         5713 :     pub async fn create(
     223         5713 :         conf: &'static PageServerConf,
     224         5713 :         timeline_id: TimelineId,
     225         5713 :         tenant_shard_id: TenantShardId,
     226         5713 :         start_lsn: Lsn,
     227         5713 :     ) -> Result<InMemoryLayer> {
     228            0 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     229              : 
     230         5713 :         let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
     231              : 
     232         5713 :         Ok(InMemoryLayer {
     233         5713 :             conf,
     234         5713 :             timeline_id,
     235         5713 :             tenant_shard_id,
     236         5713 :             start_lsn,
     237         5713 :             end_lsn: OnceLock::new(),
     238         5713 :             inner: RwLock::new(InMemoryLayerInner {
     239         5713 :                 index: HashMap::new(),
     240         5713 :                 file,
     241         5713 :             }),
     242         5713 :         })
     243         5713 :     }
     244              : 
     245              :     // Write operations
     246              : 
     247              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     248              :     /// Adds the page version to the in-memory tree
     249      1214102 :     pub(crate) async fn put_value(
     250      1214102 :         &self,
     251      1214102 :         key: Key,
     252      1214102 :         lsn: Lsn,
     253      1214102 :         val: &Value,
     254      1214102 :         ctx: &RequestContext,
     255      1214102 :     ) -> Result<()> {
     256      1214102 :         let mut inner = self.inner.write().await;
     257      1214102 :         self.assert_writable();
     258      1214102 :         self.put_value_locked(&mut inner, key, lsn, val, ctx).await
     259      1214102 :     }
     260              : 
     261      1718651 :     pub(crate) async fn put_values(
     262      1718651 :         &self,
     263      1718651 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
     264      1718651 :         ctx: &RequestContext,
     265      1718651 :     ) -> Result<()> {
     266      1718651 :         let mut inner = self.inner.write().await;
     267      1718651 :         self.assert_writable();
     268     12342190 :         for (key, vals) in values {
     269     74717538 :             for (lsn, val) in vals {
     270     64093999 :                 self.put_value_locked(&mut inner, *key, *lsn, val, ctx)
     271        71121 :                     .await?;
     272              :             }
     273              :         }
     274      1718650 :         Ok(())
     275      1718650 :     }
     276              : 
     277     65308101 :     async fn put_value_locked(
     278     65308101 :         &self,
     279     65308101 :         locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
     280     65308101 :         key: Key,
     281     65308101 :         lsn: Lsn,
     282     65308101 :         val: &Value,
     283     65308101 :         ctx: &RequestContext,
     284     65308101 :     ) -> Result<()> {
     285            0 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     286              : 
     287     65308100 :         let off = {
     288              :             // Avoid doing allocations for "small" values.
     289              :             // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
     290              :             //
     291     65308101 :             let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
     292     65308101 :             buf.clear();
     293     65308101 :             val.ser_into(&mut buf)?;
     294     65308101 :             locked_inner
     295     65308101 :                 .file
     296     65308101 :                 .write_blob(
     297     65308101 :                     &buf,
     298     65308101 :                     &RequestContextBuilder::extend(ctx)
     299     65308101 :                         .page_content_kind(PageContentKind::InMemoryLayer)
     300     65308101 :                         .build(),
     301     65308101 :                 )
     302        71480 :                 .await?
     303              :         };
     304              : 
     305     65308100 :         let vec_map = locked_inner.index.entry(key).or_default();
     306     65308100 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     307     65308100 :         if old.is_some() {
     308              :             // We already had an entry for this LSN. That's odd..
     309            0 :             warn!("Key {} at {} already exists", key, lsn);
     310     65308100 :         }
     311              : 
     312     65308100 :         Ok(())
     313     65308100 :     }
     314              : 
     315        19216 :     pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
     316        19216 :         // TODO: Currently, we just leak the storage for any deleted keys
     317        19216 :         Ok(())
     318        19216 :     }
     319              : 
     320              :     /// Records the end_lsn for non-dropped layers.
     321              :     /// `end_lsn` is exclusive
     322         5288 :     pub async fn freeze(&self, end_lsn: Lsn) {
     323         5288 :         let inner = self.inner.write().await;
     324              : 
     325         5288 :         assert!(self.start_lsn < end_lsn);
     326         5288 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     327              : 
     328      5263643 :         for vec_map in inner.index.values() {
     329     35681071 :             for (lsn, _pos) in vec_map.as_slice() {
     330     35681071 :                 assert!(*lsn < end_lsn);
     331              :             }
     332              :         }
     333         5288 :     }
     334              : 
     335              :     /// Write this frozen in-memory layer to disk.
     336              :     ///
     337              :     /// Returns a new delta layer with all the same data as this in-memory layer
     338         5186 :     pub(crate) async fn write_to_disk(
     339         5186 :         &self,
     340         5186 :         timeline: &Arc<Timeline>,
     341         5186 :         ctx: &RequestContext,
     342         5186 :     ) -> Result<ResidentLayer> {
     343              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     344              :         // layer is not writeable anymore, no one should be trying to acquire the
     345              :         // write lock on it, so we shouldn't block anyone. There's one exception
     346              :         // though: another thread might have grabbed a reference to this layer
     347              :         // in `get_layer_for_write' just before the checkpointer called
     348              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     349              :         // lock, it will see that it's not writeable anymore and retry, but it
     350              :         // would have to wait until we release it. That race condition is very
     351              :         // rare though, so we just accept the potential latency hit for now.
     352         5186 :         let inner =;
     353              : 
     354         5186 :         let end_lsn = *self.end_lsn.get().unwrap();
     355              : 
     356         5186 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     357         5186 :             self.conf,
     358         5186 :             self.timeline_id,
     359         5186 :             self.tenant_shard_id,
     360         5186 :             Key::MIN,
     361         5186 :             self.start_lsn..end_lsn,
     362         5186 :         )
     363          205 :         .await?;
     364              : 
     365         5186 :         let mut buf = Vec::new();
     366         5186 : 
     367         5186 :         let cursor = inner.file.block_cursor();
     368         5186 : 
     369         5186 :         // Sort the keys because delta layer writer expects them sorted.
     370         5186 :         //
     371         5186 :         // NOTE: this sort can take up significant time if the layer has millions of
     372         5186 :         //       keys. To speed up all the comparisons we convert the key to i128 and
     373         5186 :         //       keep the value as a reference.
     374      5213016 :         let mut keys: Vec<_> = inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect();
     375    136844622 :         keys.sort_unstable_by_key(|k| k.0);
     376         5186 : 
     377         5186 :         let ctx = RequestContextBuilder::extend(ctx)
     378         5186 :             .page_content_kind(PageContentKind::InMemoryLayer)
     379         5186 :             .build();
     380      5193826 :         for (key, vec_map) in keys.iter() {
     381      5193826 :             let key = Key::from_i128(*key);
     382              :             // Write all page versions
     383     35035862 :             for (lsn, pos) in vec_map.as_slice() {
     384     35035862 :                 cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
     385     35035861 :                 let will_init = Value::des(&buf)?.will_init();
     386     35035861 :                 delta_layer_writer
     387     35035861 :                     .put_value_bytes(key, *lsn, &buf, will_init)
     388        48979 :                     .await?;
     389              :             }
     390              :         }
     391              : 
     392              :         // MAX is used here because we identify L0 layers by full key range
     393         5184 :         let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
     394         5184 :         Ok(delta_layer)
     395         5184 :     }
     396              : }

