LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 90.1 % 223 201
Test Date: 2024-02-14 18:05:35 Functions: 89.3 % 28 25

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use futures::StreamExt;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use std::{collections::HashMap, sync::Arc};
       5              : use tracing::trace;
       6              : use utils::{
       7              :     id::TimelineId,
       8              :     lsn::{AtomicLsn, Lsn},
       9              : };
      10              : 
      11              : use crate::{
      12              :     config::PageServerConf,
      13              :     metrics::TimelineMetrics,
      14              :     tenant::{
      15              :         layer_map::{BatchedUpdates, LayerMap},
      16              :         storage_layer::{
      17              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      18              :             ResidentLayer,
      19              :         },
      20              :     },
      21              : };
      22              : 
      23              : /// Provides semantic APIs to manipulate the layer map.
      24         1592 : #[derive(Default)]
      25              : pub(crate) struct LayerManager {
      26              :     layer_map: LayerMap,
      27              :     layer_fmgr: LayerFileManager<Layer>,
      28              : }
      29              : 
      30              : impl LayerManager {
      31     16862196 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      32     16862196 :         self.layer_fmgr.get_from_desc(desc)
      33     16862196 :     }
      34              : 
      35              :     /// Get an immutable reference to the layer map.
      36              :     ///
      37              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      38              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      39     24426310 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      40     24426310 :         &self.layer_map
      41     24426310 :     }
      42              : 
      43              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      44              :     /// 1. all on-disk layers
      45              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      46          431 :     pub(crate) fn initialize_local_layers(
      47          431 :         &mut self,
      48          431 :         on_disk_layers: Vec<Layer>,
      49          431 :         next_open_layer_at: Lsn,
      50          431 :     ) {
      51          431 :         let mut updates = self.layer_map.batch_update();
      52        53475 :         for layer in on_disk_layers {
      53        53044 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      54        53044 :         }
      55          431 :         updates.flush();
      56          431 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      57          431 :     }
      58              : 
      59              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      60         1149 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      61         1149 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      62         1149 :     }
      63              : 
      64              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      65              :     /// called within `get_layer_for_write`.
      66      2907667 :     pub(crate) async fn get_layer_for_write(
      67      2907667 :         &mut self,
      68      2907667 :         lsn: Lsn,
      69      2907667 :         last_record_lsn: Lsn,
      70      2907667 :         conf: &'static PageServerConf,
      71      2907667 :         timeline_id: TimelineId,
      72      2907667 :         tenant_shard_id: TenantShardId,
      73      2907667 :     ) -> Result<Arc<InMemoryLayer>> {
      74      2907667 :         ensure!(lsn.is_aligned());
      75              : 
      76      2907667 :         ensure!(
      77      2907667 :             lsn > last_record_lsn,
      78            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      79              :             lsn,
      80              :             last_record_lsn,
      81              :         );
      82              : 
      83              :         // Do we have a layer open for writing already?
      84      2907667 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      85      2902027 :             if open_layer.get_lsn_range().start > lsn {
      86            0 :                 bail!(
      87            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      88            0 :                     open_layer.get_lsn_range().start,
      89            0 :                     lsn
      90            0 :                 );
      91      2902027 :             }
      92      2902027 : 
      93      2902027 :             Arc::clone(open_layer)
      94              :         } else {
      95              :             // No writeable layer yet. Create one.
      96         5640 :             let start_lsn = self
      97         5640 :                 .layer_map
      98         5640 :                 .next_open_layer_at
      99         5640 :                 .context("No next open layer found")?;
     100              : 
     101            0 :             trace!(
     102            0 :                 "creating in-memory layer at {}/{} for record at {}",
     103            0 :                 timeline_id,
     104            0 :                 start_lsn,
     105            0 :                 lsn
     106            0 :             );
     107              : 
     108         5640 :             let new_layer =
     109         5640 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
     110         5640 :             let layer = Arc::new(new_layer);
     111         5640 : 
     112         5640 :             self.layer_map.open_layer = Some(layer.clone());
     113         5640 :             self.layer_map.next_open_layer_at = None;
     114         5640 : 
     115         5640 :             layer
     116              :         };
     117              : 
     118      2907667 :         Ok(layer)
     119      2907667 :     }
     120              : 
     121              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     122         5410 :     pub(crate) async fn try_freeze_in_memory_layer(
     123         5410 :         &mut self,
     124         5410 :         Lsn(last_record_lsn): Lsn,
     125         5410 :         last_freeze_at: &AtomicLsn,
     126         5410 :     ) {
     127         5410 :         let end_lsn = Lsn(last_record_lsn + 1);
     128              : 
     129         5410 :         if let Some(open_layer) = &self.layer_map.open_layer {
     130         5202 :             let open_layer_rc = Arc::clone(open_layer);
     131         5202 :             // Does this layer need freezing?
     132         5202 :             open_layer.freeze(end_lsn).await;
     133              : 
     134              :             // The layer is no longer open, update the layer map to reflect this.
     135              :             // We will replace it with on-disk historics below.
     136         5202 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     137         5202 :             self.layer_map.open_layer = None;
     138         5202 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     139         5202 :             last_freeze_at.store(end_lsn);
     140          208 :         }
     141         5410 :     }
     142              : 
     143              :     /// Add image layers to the layer map, called from `create_image_layers`.
     144         1699 :     pub(crate) fn track_new_image_layers(
     145         1699 :         &mut self,
     146         1699 :         image_layers: &[ResidentLayer],
     147         1699 :         metrics: &TimelineMetrics,
     148         1699 :     ) {
     149         1699 :         let mut updates = self.layer_map.batch_update();
     150         8174 :         for layer in image_layers {
     151         6475 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     152         6475 : 
     153         6475 :             // record these here instead of Layer::finish_creating because otherwise partial
     154         6475 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     155         6475 :             // is that all layers need to be created before metrics are updated.
     156         6475 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     157         6475 :         }
     158         1699 :         updates.flush();
     159         1699 :     }
     160              : 
     161              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     162         5177 :     pub(crate) fn finish_flush_l0_layer(
     163         5177 :         &mut self,
     164         5177 :         delta_layer: Option<&ResidentLayer>,
     165         5177 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     166         5177 :         metrics: &TimelineMetrics,
     167         5177 :     ) {
     168         5177 :         let inmem = self
     169         5177 :             .layer_map
     170         5177 :             .frozen_layers
     171         5177 :             .pop_front()
     172         5177 :             .expect("there must be a inmem layer to flush");
     173         5177 : 
     174         5177 :         // Only one task may call this function at a time (for this
     175         5177 :         // timeline). If two tasks tried to flush the same frozen
     176         5177 :         // layer to disk at the same time, that would not work.
     177         5177 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     178              : 
     179         5177 :         if let Some(l) = delta_layer {
     180         5097 :             let mut updates = self.layer_map.batch_update();
     181         5097 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     182         5097 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     183         5097 :             updates.flush();
     184         5097 :         }
     185         5177 :     }
     186              : 
     187              :     /// Called when compaction is completed.
     188          295 :     pub(crate) fn finish_compact_l0(
     189          295 :         &mut self,
     190          295 :         compact_from: &[Layer],
     191          295 :         compact_to: &[ResidentLayer],
     192          295 :         metrics: &TimelineMetrics,
     193          295 :     ) {
     194          295 :         let mut updates = self.layer_map.batch_update();
     195        11021 :         for l in compact_to {
     196        10726 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     197        10726 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     198        10726 :         }
     199         4372 :         for l in compact_from {
     200         4077 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     201         4077 :         }
     202          295 :         updates.flush();
     203          295 :     }
     204              : 
     205              :     /// Called when garbage collect has selected the layers to be removed.
     206           20 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     207           20 :         let mut updates = self.layer_map.batch_update();
     208         1100 :         for doomed_layer in gc_layers {
     209         1080 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     210         1080 :         }
     211           20 :         updates.flush()
     212           20 :     }
     213              : 
     214              :     /// Helper function to insert a layer into the layer map and file manager.
     215        75342 :     fn insert_historic_layer(
     216        75342 :         layer: Layer,
     217        75342 :         updates: &mut BatchedUpdates<'_>,
     218        75342 :         mapping: &mut LayerFileManager<Layer>,
     219        75342 :     ) {
     220        75342 :         updates.insert_historic(layer.layer_desc().clone());
     221        75342 :         mapping.insert(layer);
     222        75342 :     }
     223              : 
     224              :     /// Removes the layer from local FS (if present) and from memory.
     225              :     /// Remote storage is not affected by this operation.
     226         5157 :     fn delete_historic_layer(
     227         5157 :         // we cannot remove layers otherwise, since gc and compaction will race
     228         5157 :         layer: &Layer,
     229         5157 :         updates: &mut BatchedUpdates<'_>,
     230         5157 :         mapping: &mut LayerFileManager<Layer>,
     231         5157 :     ) {
     232         5157 :         let desc = layer.layer_desc();
     233         5157 : 
     234         5157 :         // TODO Removing from the bottom of the layer map is expensive.
     235         5157 :         //      Maybe instead discard all layer map historic versions that
     236         5157 :         //      won't be needed for page reconstruction for this timeline,
     237         5157 :         //      and mark what we can't delete yet as deleted from the layer
     238         5157 :         //      map index without actually rebuilding the index.
     239         5157 :         updates.remove_historic(desc);
     240         5157 :         mapping.remove(layer);
     241         5157 :         layer.delete_on_drop();
     242         5157 :     }
     243              : 
     244           47 :     pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
     245           47 :         // for small layer maps, we most likely have all resident, but for larger more are likely
     246           47 :         // to be evicted assuming lots of layers correlated with longer lifespan.
     247           47 : 
     248           47 :         let layers = self
     249           47 :             .layer_map()
     250           47 :             .iter_historic_layers()
     251         3040 :             .map(|desc| self.get_from_desc(&desc));
     252           47 : 
     253           47 :         let layers = futures::stream::iter(layers);
     254           47 : 
     255         3040 :         layers.filter_map(|layer| async move {
     256         3040 :             // TODO(#6028): this query does not really need to see the ResidentLayer
     257         3040 :             match layer.keep_resident().await {
     258         3039 :                 Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
     259            1 :                 Ok(None) => None,
     260            0 :                 Err(e) => {
     261            0 :                     // these should not happen, but we cannot make them statically impossible right
     262            0 :                     // now.
     263            0 :                     tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
     264            0 :                     None
     265              :                 }
     266              :             }
     267         6080 :         })
     268           47 :     }
     269              : 
     270        10754 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     271        10754 :         self.layer_fmgr.contains(layer)
     272        10754 :     }
     273              : }
     274              : 
     275              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     276              : 
     277              : impl<T> Default for LayerFileManager<T> {
     278         1592 :     fn default() -> Self {
     279         1592 :         Self(HashMap::default())
     280         1592 :     }
     281              : }
     282              : 
     283              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     284     16862219 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     285     16862219 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     286     16862219 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     287     16862219 :         self.0
     288     16862219 :             .get(&desc.key())
     289     16862219 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     290     16862219 :             .expect("not found")
     291     16862219 :             .clone()
     292     16862219 :     }
     293              : 
     294        75342 :     pub(crate) fn insert(&mut self, layer: T) {
     295        75342 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     296        75342 :         if present.is_some() && cfg!(debug_assertions) {
     297            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     298        75342 :         }
     299        75342 :     }
     300              : 
     301        10754 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     302        10754 :         self.0.contains_key(&layer.layer_desc().key())
     303        10754 :     }
     304              : 
     305         5157 :     pub(crate) fn remove(&mut self, layer: &T) {
     306         5157 :         let present = self.0.remove(&layer.layer_desc().key());
     307         5157 :         if present.is_none() && cfg!(debug_assertions) {
     308            0 :             panic!(
     309            0 :                 "removing layer that is not present in layer mapping: {:?}",
     310            0 :                 layer.layer_desc()
     311            0 :             )
     312         5157 :         }
     313         5157 :     }
     314              : }
        

Generated by: LCOV version 2.1-beta