LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 89.7 % 223 200
Test Date: 2024-02-29 11:57:12 Functions: 89.3 % 28 25

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use futures::StreamExt;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use std::{collections::HashMap, sync::Arc};
       5              : use tracing::trace;
       6              : use utils::{
       7              :     id::TimelineId,
       8              :     lsn::{AtomicLsn, Lsn},
       9              : };
      10              : 
      11              : use crate::{
      12              :     config::PageServerConf,
      13              :     metrics::TimelineMetrics,
      14              :     tenant::{
      15              :         layer_map::{BatchedUpdates, LayerMap},
      16              :         storage_layer::{
      17              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      18              :             ResidentLayer,
      19              :         },
      20              :     },
      21              : };
      22              : 
      23              : /// Provides semantic APIs to manipulate the layer map.
      24          296 : #[derive(Default)]
      25              : pub(crate) struct LayerManager {
      26              :     layer_map: LayerMap,
      27              :     layer_fmgr: LayerFileManager<Layer>,
      28              : }
      29              : 
      30              : impl LayerManager {
      31       125818 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      32       125818 :         self.layer_fmgr.get_from_desc(desc)
      33       125818 :     }
      34              : 
      35              :     /// Get an immutable reference to the layer map.
      36              :     ///
      37              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      38              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      39       629679 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      40       629679 :         &self.layer_map
      41       629679 :     }
      42              : 
      43              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      44              :     /// 1. all on-disk layers
      45              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      46            6 :     pub(crate) fn initialize_local_layers(
      47            6 :         &mut self,
      48            6 :         on_disk_layers: Vec<Layer>,
      49            6 :         next_open_layer_at: Lsn,
      50            6 :     ) {
      51            6 :         let mut updates = self.layer_map.batch_update();
      52           22 :         for layer in on_disk_layers {
      53           16 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      54           16 :         }
      55            6 :         updates.flush();
      56            6 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      57            6 :     }
      58              : 
      59              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      60          290 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      61          290 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      62          290 :     }
      63              : 
      64              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      65              :     /// called within `get_layer_for_write`.
      66      2628044 :     pub(crate) async fn get_layer_for_write(
      67      2628044 :         &mut self,
      68      2628044 :         lsn: Lsn,
      69      2628044 :         last_record_lsn: Lsn,
      70      2628044 :         conf: &'static PageServerConf,
      71      2628044 :         timeline_id: TimelineId,
      72      2628044 :         tenant_shard_id: TenantShardId,
      73      2628044 :     ) -> Result<Arc<InMemoryLayer>> {
      74      2628044 :         ensure!(lsn.is_aligned());
      75              : 
      76      2628044 :         ensure!(
      77      2628044 :             lsn > last_record_lsn,
      78            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      79              :             lsn,
      80              :             last_record_lsn,
      81              :         );
      82              : 
      83              :         // Do we have a layer open for writing already?
      84      2628044 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      85      2627400 :             if open_layer.get_lsn_range().start > lsn {
      86            0 :                 bail!(
      87            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      88            0 :                     open_layer.get_lsn_range().start,
      89            0 :                     lsn
      90            0 :                 );
      91      2627400 :             }
      92      2627400 : 
      93      2627400 :             Arc::clone(open_layer)
      94              :         } else {
      95              :             // No writeable layer yet. Create one.
      96          644 :             let start_lsn = self
      97          644 :                 .layer_map
      98          644 :                 .next_open_layer_at
      99          644 :                 .context("No next open layer found")?;
     100              : 
     101            0 :             trace!(
     102            0 :                 "creating in-memory layer at {}/{} for record at {}",
     103            0 :                 timeline_id,
     104            0 :                 start_lsn,
     105            0 :                 lsn
     106            0 :             );
     107              : 
     108          644 :             let new_layer =
     109          644 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
     110          644 :             let layer = Arc::new(new_layer);
     111          644 : 
     112          644 :             self.layer_map.open_layer = Some(layer.clone());
     113          644 :             self.layer_map.next_open_layer_at = None;
     114          644 : 
     115          644 :             layer
     116              :         };
     117              : 
     118      2628044 :         Ok(layer)
     119      2628044 :     }
     120              : 
     121              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     122          530 :     pub(crate) async fn try_freeze_in_memory_layer(
     123          530 :         &mut self,
     124          530 :         Lsn(last_record_lsn): Lsn,
     125          530 :         last_freeze_at: &AtomicLsn,
     126          530 :     ) {
     127          530 :         let end_lsn = Lsn(last_record_lsn + 1);
     128              : 
     129          530 :         if let Some(open_layer) = &self.layer_map.open_layer {
     130          524 :             let open_layer_rc = Arc::clone(open_layer);
     131          524 :             // Does this layer need freezing?
     132          524 :             open_layer.freeze(end_lsn).await;
     133              : 
     134              :             // The layer is no longer open, update the layer map to reflect this.
     135              :             // We will replace it with on-disk historics below.
     136          524 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     137          524 :             self.layer_map.open_layer = None;
     138          524 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     139          524 :             last_freeze_at.store(end_lsn);
     140            6 :         }
     141          530 :     }
     142              : 
     143              :     /// Add image layers to the layer map, called from `create_image_layers`.
     144          482 :     pub(crate) fn track_new_image_layers(
     145          482 :         &mut self,
     146          482 :         image_layers: &[ResidentLayer],
     147          482 :         metrics: &TimelineMetrics,
     148          482 :     ) {
     149          482 :         let mut updates = self.layer_map.batch_update();
     150          556 :         for layer in image_layers {
     151           74 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     152           74 : 
     153           74 :             // record these here instead of Layer::finish_creating because otherwise partial
     154           74 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     155           74 :             // is that all layers need to be created before metrics are updated.
     156           74 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     157           74 :         }
     158          482 :         updates.flush();
     159          482 :     }
     160              : 
     161              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     162          524 :     pub(crate) fn finish_flush_l0_layer(
     163          524 :         &mut self,
     164          524 :         delta_layer: Option<&ResidentLayer>,
     165          524 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     166          524 :         metrics: &TimelineMetrics,
     167          524 :     ) {
     168          524 :         let inmem = self
     169          524 :             .layer_map
     170          524 :             .frozen_layers
     171          524 :             .pop_front()
     172          524 :             .expect("there must be a inmem layer to flush");
     173          524 : 
     174          524 :         // Only one task may call this function at a time (for this
     175          524 :         // timeline). If two tasks tried to flush the same frozen
     176          524 :         // layer to disk at the same time, that would not work.
     177          524 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     178              : 
     179          524 :         if let Some(l) = delta_layer {
     180          450 :             let mut updates = self.layer_map.batch_update();
     181          450 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     182          450 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     183          450 :             updates.flush();
     184          450 :         }
     185          524 :     }
     186              : 
     187              :     /// Called when compaction is completed.
     188           30 :     pub(crate) fn finish_compact_l0(
     189           30 :         &mut self,
     190           30 :         compact_from: &[Layer],
     191           30 :         compact_to: &[ResidentLayer],
     192           30 :         metrics: &TimelineMetrics,
     193           30 :     ) {
     194           30 :         let mut updates = self.layer_map.batch_update();
     195           60 :         for l in compact_to {
     196           30 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     197           30 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     198           30 :         }
     199          330 :         for l in compact_from {
     200          300 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     201          300 :         }
     202           30 :         updates.flush();
     203           30 :     }
     204              : 
     205              :     /// Called when garbage collect has selected the layers to be removed.
     206            2 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     207            2 :         let mut updates = self.layer_map.batch_update();
     208            4 :         for doomed_layer in gc_layers {
     209            2 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     210            2 :         }
     211            2 :         updates.flush()
     212            2 :     }
     213              : 
     214              :     /// Helper function to insert a layer into the layer map and file manager.
     215          570 :     fn insert_historic_layer(
     216          570 :         layer: Layer,
     217          570 :         updates: &mut BatchedUpdates<'_>,
     218          570 :         mapping: &mut LayerFileManager<Layer>,
     219          570 :     ) {
     220          570 :         updates.insert_historic(layer.layer_desc().clone());
     221          570 :         mapping.insert(layer);
     222          570 :     }
     223              : 
     224              :     /// Removes the layer from local FS (if present) and from memory.
     225              :     /// Remote storage is not affected by this operation.
     226          302 :     fn delete_historic_layer(
     227          302 :         // we cannot remove layers otherwise, since gc and compaction will race
     228          302 :         layer: &Layer,
     229          302 :         updates: &mut BatchedUpdates<'_>,
     230          302 :         mapping: &mut LayerFileManager<Layer>,
     231          302 :     ) {
     232          302 :         let desc = layer.layer_desc();
     233          302 : 
     234          302 :         // TODO Removing from the bottom of the layer map is expensive.
     235          302 :         //      Maybe instead discard all layer map historic versions that
     236          302 :         //      won't be needed for page reconstruction for this timeline,
     237          302 :         //      and mark what we can't delete yet as deleted from the layer
     238          302 :         //      map index without actually rebuilding the index.
     239          302 :         updates.remove_historic(desc);
     240          302 :         mapping.remove(layer);
     241          302 :         layer.delete_on_drop();
     242          302 :     }
     243              : 
     244            4 :     pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
     245            4 :         // for small layer maps, we most likely have all resident, but for larger more are likely
     246            4 :         // to be evicted assuming lots of layers correlated with longer lifespan.
     247            4 : 
     248            4 :         let layers = self
     249            4 :             .layer_map()
     250            4 :             .iter_historic_layers()
     251            4 :             .map(|desc| self.get_from_desc(&desc));
     252            4 : 
     253            4 :         let layers = futures::stream::iter(layers);
     254            4 : 
     255            4 :         layers.filter_map(|layer| async move {
     256            4 :             // TODO(#6028): this query does not really need to see the ResidentLayer
     257            4 :             match layer.keep_resident().await {
     258            4 :                 Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
     259            0 :                 Ok(None) => None,
     260            0 :                 Err(e) => {
     261            0 :                     // these should not happen, but we cannot make them statically impossible right
     262            0 :                     // now.
     263            0 :                     tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
     264            0 :                     None
     265              :                 }
     266              :             }
     267            8 :         })
     268            4 :     }
     269              : 
     270           30 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     271           30 :         self.layer_fmgr.contains(layer)
     272           30 :     }
     273              : }
     274              : 
     275              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     276              : 
     277              : impl<T> Default for LayerFileManager<T> {
     278          296 :     fn default() -> Self {
     279          296 :         Self(HashMap::default())
     280          296 :     }
     281              : }
     282              : 
     283              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     284       125818 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     285       125818 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     286       125818 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     287       125818 :         self.0
     288       125818 :             .get(&desc.key())
     289       125818 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     290       125818 :             .expect("not found")
     291       125818 :             .clone()
     292       125818 :     }
     293              : 
     294          570 :     pub(crate) fn insert(&mut self, layer: T) {
     295          570 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     296          570 :         if present.is_some() && cfg!(debug_assertions) {
     297            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     298          570 :         }
     299          570 :     }
     300              : 
     301           30 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     302           30 :         self.0.contains_key(&layer.layer_desc().key())
     303           30 :     }
     304              : 
     305          302 :     pub(crate) fn remove(&mut self, layer: &T) {
     306          302 :         let present = self.0.remove(&layer.layer_desc().key());
     307          302 :         if present.is_none() && cfg!(debug_assertions) {
     308            0 :             panic!(
     309            0 :                 "removing layer that is not present in layer mapping: {:?}",
     310            0 :                 layer.layer_desc()
     311            0 :             )
     312          302 :         }
     313          302 :     }
     314              : }
        

Generated by: LCOV version 2.1-beta