LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 91.7 % 204 187
Test Date: 2024-02-07 07:37:29 Functions: 91.3 % 23 21

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use pageserver_api::shard::TenantShardId;
       3              : use std::{collections::HashMap, sync::Arc};
       4              : use tracing::trace;
       5              : use utils::{
       6              :     id::TimelineId,
       7              :     lsn::{AtomicLsn, Lsn},
       8              : };
       9              : 
      10              : use crate::{
      11              :     config::PageServerConf,
      12              :     metrics::TimelineMetrics,
      13              :     tenant::{
      14              :         layer_map::{BatchedUpdates, LayerMap},
      15              :         storage_layer::{
      16              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      17              :             ResidentLayer,
      18              :         },
      19              :     },
      20              : };
      21              : 
      22              : /// Provides semantic APIs to manipulate the layer map.
      23              : pub(crate) struct LayerManager {
      24              :     layer_map: LayerMap,
      25              :     layer_fmgr: LayerFileManager<Layer>,
      26              : }
      27              : 
      28              : impl LayerManager {
      29         1568 :     pub(crate) fn create() -> Self {
      30         1568 :         Self {
      31         1568 :             layer_map: LayerMap::default(),
      32         1568 :             layer_fmgr: LayerFileManager::new(),
      33         1568 :         }
      34         1568 :     }
      35              : 
      36     23952731 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      37     23952731 :         self.layer_fmgr.get_from_desc(desc)
      38     23952731 :     }
      39              : 
      40              :     /// Get an immutable reference to the layer map.
      41              :     ///
      42              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      43              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      44     32493414 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      45     32493414 :         &self.layer_map
      46     32493414 :     }
      47              : 
      48              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      49              :     /// 1. all on-disk layers
      50              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      51          412 :     pub(crate) fn initialize_local_layers(
      52          412 :         &mut self,
      53          412 :         on_disk_layers: Vec<Layer>,
      54          412 :         next_open_layer_at: Lsn,
      55          412 :     ) {
      56          412 :         let mut updates = self.layer_map.batch_update();
      57        57240 :         for layer in on_disk_layers {
      58        56828 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      59        56828 :         }
      60          412 :         updates.flush();
      61          412 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      62          412 :     }
      63              : 
      64              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      65         1144 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      66         1144 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      67         1144 :     }
      68              : 
      69              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      70              :     /// called within `get_layer_for_write`.
      71      2951969 :     pub(crate) async fn get_layer_for_write(
      72      2951969 :         &mut self,
      73      2951969 :         lsn: Lsn,
      74      2951969 :         last_record_lsn: Lsn,
      75      2951969 :         conf: &'static PageServerConf,
      76      2951969 :         timeline_id: TimelineId,
      77      2951969 :         tenant_shard_id: TenantShardId,
      78      2951969 :     ) -> Result<Arc<InMemoryLayer>> {
      79      2951969 :         ensure!(lsn.is_aligned());
      80              : 
      81              :         ensure!(
      82      2951969 :             lsn > last_record_lsn,
      83            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      84              :             lsn,
      85              :             last_record_lsn,
      86              :         );
      87              : 
      88              :         // Do we have a layer open for writing already?
      89      2951969 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      90      2946256 :             if open_layer.get_lsn_range().start > lsn {
      91            0 :                 bail!(
      92            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      93            0 :                     open_layer.get_lsn_range().start,
      94            0 :                     lsn
      95            0 :                 );
      96      2946256 :             }
      97      2946256 : 
      98      2946256 :             Arc::clone(open_layer)
      99              :         } else {
     100              :             // No writeable layer yet. Create one.
     101         5713 :             let start_lsn = self
     102         5713 :                 .layer_map
     103         5713 :                 .next_open_layer_at
     104         5713 :                 .context("No next open layer found")?;
     105              : 
     106            0 :             trace!(
     107            0 :                 "creating in-memory layer at {}/{} for record at {}",
     108            0 :                 timeline_id,
     109            0 :                 start_lsn,
     110            0 :                 lsn
     111            0 :             );
     112              : 
     113         5713 :             let new_layer =
     114         5713 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
     115         5713 :             let layer = Arc::new(new_layer);
     116         5713 : 
     117         5713 :             self.layer_map.open_layer = Some(layer.clone());
     118         5713 :             self.layer_map.next_open_layer_at = None;
     119         5713 : 
     120         5713 :             layer
     121              :         };
     122              : 
     123      2951969 :         Ok(layer)
     124      2951969 :     }
     125              : 
     126              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     127         5489 :     pub(crate) async fn try_freeze_in_memory_layer(
     128         5489 :         &mut self,
     129         5489 :         Lsn(last_record_lsn): Lsn,
     130         5489 :         last_freeze_at: &AtomicLsn,
     131         5489 :     ) {
     132         5489 :         let end_lsn = Lsn(last_record_lsn + 1);
     133              : 
     134         5489 :         if let Some(open_layer) = &self.layer_map.open_layer {
     135         5288 :             let open_layer_rc = Arc::clone(open_layer);
     136         5288 :             // Does this layer need freezing?
     137         5288 :             open_layer.freeze(end_lsn).await;
     138              : 
     139              :             // The layer is no longer open, update the layer map to reflect this.
     140              :             // We will replace it with on-disk historics below.
     141         5288 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     142         5288 :             self.layer_map.open_layer = None;
     143         5288 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     144         5288 :             last_freeze_at.store(end_lsn);
     145          201 :         }
     146         5489 :     }
     147              : 
     148              :     /// Add image layers to the layer map, called from `create_image_layers`.
     149         1597 :     pub(crate) fn track_new_image_layers(
     150         1597 :         &mut self,
     151         1597 :         image_layers: &[ResidentLayer],
     152         1597 :         metrics: &TimelineMetrics,
     153         1597 :     ) {
     154         1597 :         let mut updates = self.layer_map.batch_update();
     155         7626 :         for layer in image_layers {
     156         6029 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     157         6029 : 
     158         6029 :             // record these here instead of Layer::finish_creating because otherwise partial
     159         6029 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     160         6029 :             // is that all layers need to be created before metrics are updated.
     161         6029 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     162         6029 :         }
     163         1597 :         updates.flush();
     164         1597 :     }
     165              : 
     166              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     167         5263 :     pub(crate) fn finish_flush_l0_layer(
     168         5263 :         &mut self,
     169         5263 :         delta_layer: Option<&ResidentLayer>,
     170         5263 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     171         5263 :         metrics: &TimelineMetrics,
     172         5263 :     ) {
     173         5263 :         let inmem = self
     174         5263 :             .layer_map
     175         5263 :             .frozen_layers
     176         5263 :             .pop_front()
     177         5263 :             .expect("there must be a inmem layer to flush");
     178         5263 : 
     179         5263 :         // Only one task may call this function at a time (for this
     180         5263 :         // timeline). If two tasks tried to flush the same frozen
     181         5263 :         // layer to disk at the same time, that would not work.
     182         5263 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     183              : 
     184         5263 :         if let Some(l) = delta_layer {
     185         5183 :             let mut updates = self.layer_map.batch_update();
     186         5183 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     187         5183 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     188         5183 :             updates.flush();
     189         5183 :         }
     190         5263 :     }
     191              : 
     192              :     /// Called when compaction is completed.
     193          295 :     pub(crate) fn finish_compact_l0(
     194          295 :         &mut self,
     195          295 :         compact_from: &[Layer],
     196          295 :         compact_to: &[ResidentLayer],
     197          295 :         metrics: &TimelineMetrics,
     198          295 :     ) {
     199          295 :         let mut updates = self.layer_map.batch_update();
     200        11025 :         for l in compact_to {
     201        10730 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     202        10730 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     203        10730 :         }
     204         4508 :         for l in compact_from {
     205         4213 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     206         4213 :         }
     207          295 :         updates.flush();
     208          295 :     }
     209              : 
     210              :     /// Called when garbage collect has selected the layers to be removed.
     211           20 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     212           20 :         let mut updates = self.layer_map.batch_update();
     213         1217 :         for doomed_layer in gc_layers {
     214         1197 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     215         1197 :         }
     216           20 :         updates.flush()
     217           20 :     }
     218              : 
     219              :     /// Helper function to insert a layer into the layer map and file manager.
     220        78770 :     fn insert_historic_layer(
     221        78770 :         layer: Layer,
     222        78770 :         updates: &mut BatchedUpdates<'_>,
     223        78770 :         mapping: &mut LayerFileManager<Layer>,
     224        78770 :     ) {
     225        78770 :         updates.insert_historic(layer.layer_desc().clone());
     226        78770 :         mapping.insert(layer);
     227        78770 :     }
     228              : 
     229              :     /// Removes the layer from local FS (if present) and from memory.
     230              :     /// Remote storage is not affected by this operation.
     231         5410 :     fn delete_historic_layer(
     232         5410 :         // we cannot remove layers otherwise, since gc and compaction will race
     233         5410 :         layer: &Layer,
     234         5410 :         updates: &mut BatchedUpdates<'_>,
     235         5410 :         mapping: &mut LayerFileManager<Layer>,
     236         5410 :     ) {
     237         5410 :         let desc = layer.layer_desc();
     238         5410 : 
     239         5410 :         // TODO Removing from the bottom of the layer map is expensive.
     240         5410 :         //      Maybe instead discard all layer map historic versions that
     241         5410 :         //      won't be needed for page reconstruction for this timeline,
     242         5410 :         //      and mark what we can't delete yet as deleted from the layer
     243         5410 :         //      map index without actually rebuilding the index.
     244         5410 :         updates.remove_historic(desc);
     245         5410 :         mapping.remove(layer);
     246         5410 :         layer.delete_on_drop();
     247         5410 :     }
     248              : 
     249        10770 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     250        10770 :         self.layer_fmgr.contains(layer)
     251        10770 :     }
     252              : }
     253              : 
     254              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     255              : 
     256              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     257     23952738 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     258     23952738 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     259     23952738 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     260     23952738 :         self.0
     261     23952738 :             .get(&desc.key())
     262     23952738 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     263     23952738 :             .expect("not found")
     264     23952738 :             .clone()
     265     23952738 :     }
     266              : 
     267        78770 :     pub(crate) fn insert(&mut self, layer: T) {
     268        78770 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     269        78770 :         if present.is_some() && cfg!(debug_assertions) {
     270            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     271        78770 :         }
     272        78770 :     }
     273              : 
     274        10770 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     275        10770 :         self.0.contains_key(&layer.layer_desc().key())
     276        10770 :     }
     277              : 
     278         1568 :     pub(crate) fn new() -> Self {
     279         1568 :         Self(HashMap::new())
     280         1568 :     }
     281              : 
     282         5410 :     pub(crate) fn remove(&mut self, layer: &T) {
     283         5410 :         let present = self.0.remove(&layer.layer_desc().key());
     284         5410 :         if present.is_none() && cfg!(debug_assertions) {
     285            0 :             panic!(
     286            0 :                 "removing layer that is not present in layer mapping: {:?}",
     287            0 :                 layer.layer_desc()
     288            0 :             )
     289         5410 :         }
     290         5410 :     }
     291              : }
        

Generated by: LCOV version 2.1-beta