LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - inmemory_layer.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 67.7 % 192 130
Test Date: 2023-09-06 10:18:01 Functions: 65.5 % 29 19

            Line data    Source code
       1              : //! An in-memory layer stores recently received key-value pairs.
       2              : //!
       3              : //! The "in-memory" part of the name is a bit misleading: the actual page versions are
       4              : //! held in an ephemeral file, not in memory. The metadata for each page version, i.e.
       5              : //! its position in the file, is kept in memory, though.
       6              : //!
       7              : use crate::config::PageServerConf;
       8              : use crate::context::RequestContext;
       9              : use crate::repository::{Key, Value};
      10              : use crate::tenant::block_io::BlockReader;
      11              : use crate::tenant::ephemeral_file::EphemeralFile;
      12              : use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
      13              : use crate::walrecord;
      14              : use anyhow::{ensure, Result};
      15              : use pageserver_api::models::InMemoryLayerInfo;
      16              : use std::collections::HashMap;
      17              : use std::sync::OnceLock;
      18              : use tracing::*;
      19              : use utils::{
      20              :     bin_ser::BeSer,
      21              :     id::{TenantId, TimelineId},
      22              :     lsn::Lsn,
      23              :     vec_map::VecMap,
      24              : };
      25              : // avoid binding to Write (conflicts with std::io::Write)
      26              : // while being able to use std::fmt::Write's methods
      27              : use std::fmt::Write as _;
      28              : use std::ops::Range;
      29              : use tokio::sync::RwLock;
      30              : 
      31              : use super::{DeltaLayer, DeltaLayerWriter, Layer};
      32              : 
      33              : pub struct InMemoryLayer {
      34              :     conf: &'static PageServerConf,
      35              :     tenant_id: TenantId,
      36              :     timeline_id: TimelineId,
      37              : 
      38              :     /// This layer contains all the changes from 'start_lsn'. The
      39              :     /// start is inclusive.
      40              :     start_lsn: Lsn,
      41              : 
      42              :     /// Frozen layers have an exclusive end LSN.
      43              :     /// Writes are only allowed when this is `None`.
      44              :     end_lsn: OnceLock<Lsn>,
      45              : 
      46              :     /// The above fields never change, except for `end_lsn`, which is only set once.
      47              :     /// All other changing parts are in `inner`, and protected by a mutex.
      48              :     inner: RwLock<InMemoryLayerInner>,
      49              : }
      50              : 
      51              : impl std::fmt::Debug for InMemoryLayer {
      52            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      53            0 :         f.debug_struct("InMemoryLayer")
      54            0 :             .field("start_lsn", &self.start_lsn)
      55            0 :             .field("end_lsn", &self.end_lsn)
      56            0 :             .field("inner", &self.inner)
      57            0 :             .finish()
      58            0 :     }
      59              : }
      60              : 
      61              : pub struct InMemoryLayerInner {
      62              :     /// All versions of all pages in the layer are kept here.  Indexed
      63              :     /// by block number and LSN. The value is an offset into the
      64              :     /// ephemeral file where the page version is stored.
      65              :     index: HashMap<Key, VecMap<Lsn, u64>>,
      66              : 
      67              :     /// The values are stored in a serialized format in this file.
      68              :     /// Each serialized Value is preceded by a 'u32' length field.
      69              :     /// PerSeg::page_versions map stores offsets into this file.
      70              :     file: EphemeralFile,
      71              : }
      72              : 
      73              : impl std::fmt::Debug for InMemoryLayerInner {
      74            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      75            0 :         f.debug_struct("InMemoryLayerInner").finish()
      76            0 :     }
      77              : }
      78              : 
      79              : impl InMemoryLayer {
      80            2 :     pub(crate) fn get_timeline_id(&self) -> TimelineId {
      81            2 :         self.timeline_id
      82            2 :     }
      83              : 
      84           23 :     pub(crate) fn info(&self) -> InMemoryLayerInfo {
      85           23 :         let lsn_start = self.start_lsn;
      86              : 
      87           23 :         if let Some(&lsn_end) = self.end_lsn.get() {
      88            0 :             InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
      89              :         } else {
      90           23 :             InMemoryLayerInfo::Open { lsn_start }
      91              :         }
      92           23 :     }
      93              : 
      94     82545974 :     pub(crate) fn assert_writable(&self) {
      95     82545974 :         assert!(self.end_lsn.get().is_none());
      96     82545974 :     }
      97              : 
      98    131619481 :     pub(crate) fn end_lsn_or_max(&self) -> Lsn {
      99    131619481 :         self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
     100    131619481 :     }
     101              : 
     102    131612965 :     pub(crate) fn get_lsn_range(&self) -> Range<Lsn> {
     103    131612965 :         self.start_lsn..self.end_lsn_or_max()
     104    131612965 :     }
     105              : 
     106              :     /// debugging function to print out the contents of the layer
     107              :     ///
     108              :     /// this is likely completly unused
     109            0 :     pub async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
     110            0 :         let inner = self.inner.read().await;
     111              : 
     112            0 :         let end_str = self.end_lsn_or_max();
     113            0 : 
     114            0 :         println!(
     115            0 :             "----- in-memory layer for tli {} LSNs {}-{} ----",
     116            0 :             self.timeline_id, self.start_lsn, end_str,
     117            0 :         );
     118            0 : 
     119            0 :         if !verbose {
     120            0 :             return Ok(());
     121            0 :         }
     122            0 : 
     123            0 :         let cursor = inner.file.block_cursor();
     124            0 :         let mut buf = Vec::new();
     125            0 :         for (key, vec_map) in inner.index.iter() {
     126            0 :             for (lsn, pos) in vec_map.as_slice() {
     127            0 :                 let mut desc = String::new();
     128            0 :                 cursor.read_blob_into_buf(*pos, &mut buf).await?;
     129            0 :                 let val = Value::des(&buf);
     130            0 :                 match val {
     131            0 :                     Ok(Value::Image(img)) => {
     132            0 :                         write!(&mut desc, " img {} bytes", img.len())?;
     133              :                     }
     134            0 :                     Ok(Value::WalRecord(rec)) => {
     135            0 :                         let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
     136            0 :                         write!(
     137            0 :                             &mut desc,
     138            0 :                             " rec {} bytes will_init: {} {}",
     139            0 :                             buf.len(),
     140            0 :                             rec.will_init(),
     141            0 :                             wal_desc
     142            0 :                         )?;
     143              :                     }
     144            0 :                     Err(err) => {
     145            0 :                         write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
     146              :                     }
     147              :                 }
     148            0 :                 println!("  key {} at {}: {}", key, lsn, desc);
     149              :             }
     150              :         }
     151              : 
     152            0 :         Ok(())
     153            0 :     }
     154              : 
     155              :     /// Look up given value in the layer.
     156      6248109 :     pub(crate) async fn get_value_reconstruct_data(
     157      6248109 :         &self,
     158      6248109 :         key: Key,
     159      6248109 :         lsn_range: Range<Lsn>,
     160      6248109 :         reconstruct_state: &mut ValueReconstructState,
     161      6248109 :         _ctx: &RequestContext,
     162      6248112 :     ) -> anyhow::Result<ValueReconstructResult> {
     163      6248112 :         ensure!(lsn_range.start >= self.start_lsn);
     164      6248112 :         let mut need_image = true;
     165              : 
     166      6248112 :         let inner = self.inner.read().await;
     167              : 
     168      6248112 :         let reader = inner.file.block_cursor();
     169              : 
     170              :         // Scan the page versions backwards, starting from `lsn`.
     171      6248112 :         if let Some(vec_map) = inner.index.get(&key) {
     172      3434973 :             let slice = vec_map.slice_range(lsn_range);
     173     33295413 :             for (entry_lsn, pos) in slice.iter().rev() {
     174     33295413 :                 let buf = reader.read_blob(*pos).await?;
     175     33295412 :                 let value = Value::des(&buf)?;
     176     33295412 :                 match value {
     177      1641578 :                     Value::Image(img) => {
     178      1641578 :                         reconstruct_state.img = Some((*entry_lsn, img));
     179      1641578 :                         return Ok(ValueReconstructResult::Complete);
     180              :                     }
     181     31653834 :                     Value::WalRecord(rec) => {
     182     31653834 :                         let will_init = rec.will_init();
     183     31653834 :                         reconstruct_state.records.push((*entry_lsn, rec));
     184     31653834 :                         if will_init {
     185              :                             // This WAL record initializes the page, so no need to go further back
     186       444686 :                             need_image = false;
     187       444686 :                             break;
     188     31209148 :                         }
     189              :                     }
     190              :                 }
     191              :             }
     192      2813139 :         }
     193              : 
     194              :         // release lock on 'inner'
     195              : 
     196              :         // If an older page image is needed to reconstruct the page, let the
     197              :         // caller know.
     198      4606533 :         if need_image {
     199      4161847 :             Ok(ValueReconstructResult::Continue)
     200              :         } else {
     201       444686 :             Ok(ValueReconstructResult::Complete)
     202              :         }
     203      6248111 :     }
     204              : }
     205              : 
     206              : #[async_trait::async_trait]
     207              : impl Layer for InMemoryLayer {
     208            0 :     async fn get_value_reconstruct_data(
     209            0 :         &self,
     210            0 :         key: Key,
     211            0 :         lsn_range: Range<Lsn>,
     212            0 :         reconstruct_data: &mut ValueReconstructState,
     213            0 :         ctx: &RequestContext,
     214            0 :     ) -> Result<ValueReconstructResult> {
     215            0 :         self.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
     216            0 :             .await
     217            0 :     }
     218              : }
     219              : 
     220              : impl std::fmt::Display for InMemoryLayer {
     221         6516 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     222         6516 :         let end_lsn = self.end_lsn_or_max();
     223         6516 :         write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
     224         6516 :     }
     225              : }
     226              : 
     227              : impl InMemoryLayer {
     228              :     ///
     229              :     /// Get layer size.
     230              :     ///
     231       733761 :     pub async fn size(&self) -> Result<u64> {
     232       733761 :         let inner = self.inner.read().await;
     233       733761 :         Ok(inner.file.len())
     234       733761 :     }
     235              : 
     236              :     ///
     237              :     /// Create a new, empty, in-memory layer
     238              :     ///
     239         6868 :     pub fn create(
     240         6868 :         conf: &'static PageServerConf,
     241         6868 :         timeline_id: TimelineId,
     242         6868 :         tenant_id: TenantId,
     243         6868 :         start_lsn: Lsn,
     244         6868 :     ) -> Result<InMemoryLayer> {
     245         6868 :         trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
     246              : 
     247         6868 :         let file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
     248              : 
     249         6868 :         Ok(InMemoryLayer {
     250         6868 :             conf,
     251         6868 :             timeline_id,
     252         6868 :             tenant_id,
     253         6868 :             start_lsn,
     254         6868 :             end_lsn: OnceLock::new(),
     255         6868 :             inner: RwLock::new(InMemoryLayerInner {
     256         6868 :                 index: HashMap::new(),
     257         6868 :                 file,
     258         6868 :             }),
     259         6868 :         })
     260         6868 :     }
     261              : 
     262              :     // Write operations
     263              : 
     264              :     /// Common subroutine of the public put_wal_record() and put_page_image() functions.
     265              :     /// Adds the page version to the in-memory tree
     266     82545974 :     pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
     267            0 :         trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
     268     82546097 :         let inner: &mut _ = &mut *self.inner.write().await;
     269     82546097 :         self.assert_writable();
     270              : 
     271     82546095 :         let off = {
     272              :             // Avoid doing allocations for "small" values.
     273              :             // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
     274              :             // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
     275     82546097 :             let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
     276     82546097 :             buf.clear();
     277     82546097 :             val.ser_into(&mut buf)?;
     278     82546097 :             inner.file.write_blob(&buf).await?
     279              :         };
     280              : 
     281     82546095 :         let vec_map = inner.index.entry(key).or_default();
     282     82546095 :         let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
     283     82546095 :         if old.is_some() {
     284              :             // We already had an entry for this LSN. That's odd..
     285            0 :             warn!("Key {} at {} already exists", key, lsn);
     286     82546095 :         }
     287              : 
     288     82546095 :         Ok(())
     289     82546095 :     }
     290              : 
     291        17794 :     pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
     292        17794 :         // TODO: Currently, we just leak the storage for any deleted keys
     293        17794 : 
     294        17794 :         Ok(())
     295        17794 :     }
     296              : 
     297              :     /// Make the layer non-writeable. Only call once.
     298              :     /// Records the end_lsn for non-dropped layers.
     299              :     /// `end_lsn` is exclusive
     300         6562 :     pub async fn freeze(&self, end_lsn: Lsn) {
     301         6562 :         let inner = self.inner.write().await;
     302              : 
     303         6562 :         assert!(self.start_lsn < end_lsn);
     304         6562 :         self.end_lsn.set(end_lsn).expect("end_lsn set only once");
     305              : 
     306      5263399 :         for vec_map in inner.index.values() {
     307     59430639 :             for (lsn, _pos) in vec_map.as_slice() {
     308     59430639 :                 assert!(*lsn < end_lsn);
     309              :             }
     310              :         }
     311         6562 :     }
     312              : 
     313              :     /// Write this frozen in-memory layer to disk.
     314              :     ///
     315              :     /// Returns a new delta layer with all the same data as this in-memory layer
     316         6477 :     pub(crate) async fn write_to_disk(&self) -> Result<DeltaLayer> {
     317              :         // Grab the lock in read-mode. We hold it over the I/O, but because this
     318              :         // layer is not writeable anymore, no one should be trying to acquire the
     319              :         // write lock on it, so we shouldn't block anyone. There's one exception
     320              :         // though: another thread might have grabbed a reference to this layer
     321              :         // in `get_layer_for_write' just before the checkpointer called
     322              :         // `freeze`, and then `write_to_disk` on it. When the thread gets the
     323              :         // lock, it will see that it's not writeable anymore and retry, but it
     324              :         // would have to wait until we release it. That race condition is very
     325              :         // rare though, so we just accept the potential latency hit for now.
     326         6477 :         let inner = self.inner.read().await;
     327              : 
     328         6477 :         let end_lsn = *self.end_lsn.get().unwrap();
     329              : 
     330         6477 :         let mut delta_layer_writer = DeltaLayerWriter::new(
     331         6477 :             self.conf,
     332         6477 :             self.timeline_id,
     333         6477 :             self.tenant_id,
     334         6477 :             Key::MIN,
     335         6477 :             self.start_lsn..end_lsn,
     336         6477 :         )
     337            0 :         .await?;
     338              : 
     339         6477 :         let mut buf = Vec::new();
     340         6477 : 
     341         6477 :         let cursor = inner.file.block_cursor();
     342         6477 : 
     343         6477 :         let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
     344    118417390 :         keys.sort_by_key(|k| k.0);
     345              : 
     346      5242685 :         for (key, vec_map) in keys.iter() {
     347      5242685 :             let key = **key;
     348              :             // Write all page versions
     349     59022791 :             for (lsn, pos) in vec_map.as_slice() {
     350     59022791 :                 cursor.read_blob_into_buf(*pos, &mut buf).await?;
     351     59022790 :                 let will_init = Value::des(&buf)?.will_init();
     352     59022790 :                 delta_layer_writer
     353     59022790 :                     .put_value_bytes(key, *lsn, &buf, will_init)
     354            0 :                     .await?;
     355              :             }
     356              :         }
     357              : 
     358         6476 :         let delta_layer = delta_layer_writer.finish(Key::MAX).await?;
     359         6476 :         Ok(delta_layer)
     360         6476 :     }
     361              : }
        

Generated by: LCOV version 2.1-beta