LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.7 % 204 187 17 187
Current Date: 2024-01-09 02:06:09 Functions: 91.3 % 23 21 2 21
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::{bail, ensure, Context, Result};
       2                 : use pageserver_api::shard::TenantShardId;
       3                 : use std::{collections::HashMap, sync::Arc};
       4                 : use tracing::trace;
       5                 : use utils::{
       6                 :     id::TimelineId,
       7                 :     lsn::{AtomicLsn, Lsn},
       8                 : };
       9                 : 
      10                 : use crate::{
      11                 :     config::PageServerConf,
      12                 :     metrics::TimelineMetrics,
      13                 :     tenant::{
      14                 :         layer_map::{BatchedUpdates, LayerMap},
      15                 :         storage_layer::{
      16                 :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      17                 :             ResidentLayer,
      18                 :         },
      19                 :     },
      20                 : };
      21                 : 
      22                 : /// Provides semantic APIs to manipulate the layer map.
      23                 : pub(crate) struct LayerManager {
      24                 :     layer_map: LayerMap,
      25                 :     layer_fmgr: LayerFileManager<Layer>,
      26                 : }
      27                 : 
      28                 : impl LayerManager {
      29 CBC        1290 :     pub(crate) fn create() -> Self {
      30            1290 :         Self {
      31            1290 :             layer_map: LayerMap::default(),
      32            1290 :             layer_fmgr: LayerFileManager::new(),
      33            1290 :         }
      34            1290 :     }
      35                 : 
      36        15472440 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      37        15472440 :         self.layer_fmgr.get_from_desc(desc)
      38        15472440 :     }
      39                 : 
      40                 :     /// Get an immutable reference to the layer map.
      41                 :     ///
      42                 :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      43                 :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      44        20885588 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      45        20885588 :         &self.layer_map
      46        20885588 :     }
      47                 : 
      48                 :     /// Called from `load_layer_map`. Initialize the layer manager with:
      49                 :     /// 1. all on-disk layers
      50                 :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      51             357 :     pub(crate) fn initialize_local_layers(
      52             357 :         &mut self,
      53             357 :         on_disk_layers: Vec<Layer>,
      54             357 :         next_open_layer_at: Lsn,
      55             357 :     ) {
      56             357 :         let mut updates = self.layer_map.batch_update();
      57           58158 :         for layer in on_disk_layers {
      58           57801 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      59           57801 :         }
      60             357 :         updates.flush();
      61             357 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      62             357 :     }
      63                 : 
      64                 :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      65             921 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      66             921 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      67             921 :     }
      68                 : 
      69                 :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      70                 :     /// called within `get_layer_for_write`.
      71         1781989 :     pub(crate) async fn get_layer_for_write(
      72         1781989 :         &mut self,
      73         1781989 :         lsn: Lsn,
      74         1781989 :         last_record_lsn: Lsn,
      75         1781989 :         conf: &'static PageServerConf,
      76         1781989 :         timeline_id: TimelineId,
      77         1781989 :         tenant_shard_id: TenantShardId,
      78         1781989 :     ) -> Result<Arc<InMemoryLayer>> {
      79         1781989 :         ensure!(lsn.is_aligned());
      80                 : 
      81                 :         ensure!(
      82         1781989 :             lsn > last_record_lsn,
      83 UBC           0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      84                 :             lsn,
      85                 :             last_record_lsn,
      86                 :         );
      87                 : 
      88                 :         // Do we have a layer open for writing already?
      89 CBC     1781989 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      90         1777166 :             if open_layer.get_lsn_range().start > lsn {
      91 UBC           0 :                 bail!(
      92               0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      93               0 :                     open_layer.get_lsn_range().start,
      94               0 :                     lsn
      95               0 :                 );
      96 CBC     1777166 :             }
      97         1777166 : 
      98         1777166 :             Arc::clone(open_layer)
      99                 :         } else {
     100                 :             // No writeable layer yet. Create one.
     101            4823 :             let start_lsn = self
     102            4823 :                 .layer_map
     103            4823 :                 .next_open_layer_at
     104            4823 :                 .context("No next open layer found")?;
     105                 : 
     106 UBC           0 :             trace!(
     107               0 :                 "creating in-memory layer at {}/{} for record at {}",
     108               0 :                 timeline_id,
     109               0 :                 start_lsn,
     110               0 :                 lsn
     111               0 :             );
     112                 : 
     113 CBC        4823 :             let new_layer =
     114            4823 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
     115            4823 :             let layer = Arc::new(new_layer);
     116            4823 : 
     117            4823 :             self.layer_map.open_layer = Some(layer.clone());
     118            4823 :             self.layer_map.next_open_layer_at = None;
     119            4823 : 
     120            4823 :             layer
     121                 :         };
     122                 : 
     123         1781989 :         Ok(layer)
     124         1781989 :     }
     125                 : 
     126                 :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     127            4663 :     pub(crate) async fn try_freeze_in_memory_layer(
     128            4663 :         &mut self,
     129            4663 :         Lsn(last_record_lsn): Lsn,
     130            4663 :         last_freeze_at: &AtomicLsn,
     131            4663 :     ) {
     132            4663 :         let end_lsn = Lsn(last_record_lsn + 1);
     133                 : 
     134            4663 :         if let Some(open_layer) = &self.layer_map.open_layer {
     135            4467 :             let open_layer_rc = Arc::clone(open_layer);
     136            4467 :             // Does this layer need freezing?
     137            4467 :             open_layer.freeze(end_lsn).await;
     138                 : 
     139                 :             // The layer is no longer open, update the layer map to reflect this.
     140                 :             // We will replace it with on-disk historics below.
     141            4467 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     142            4467 :             self.layer_map.open_layer = None;
     143            4467 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     144            4467 :             last_freeze_at.store(end_lsn);
     145             196 :         }
     146            4663 :     }
     147                 : 
     148                 :     /// Add image layers to the layer map, called from `create_image_layers`.
     149            1408 :     pub(crate) fn track_new_image_layers(
     150            1408 :         &mut self,
     151            1408 :         image_layers: &[ResidentLayer],
     152            1408 :         metrics: &TimelineMetrics,
     153            1408 :     ) {
     154            1408 :         let mut updates = self.layer_map.batch_update();
     155            7046 :         for layer in image_layers {
     156            5638 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     157            5638 : 
     158            5638 :             // record these here instead of Layer::finish_creating because otherwise partial
     159            5638 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     160            5638 :             // is that all layers need to be created before metrics are updated.
     161            5638 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     162            5638 :         }
     163            1408 :         updates.flush();
     164            1408 :     }
     165                 : 
     166                 :     /// Flush a frozen layer and add the written delta layer to the layer map.
     167            4440 :     pub(crate) fn finish_flush_l0_layer(
     168            4440 :         &mut self,
     169            4440 :         delta_layer: Option<&ResidentLayer>,
     170            4440 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     171            4440 :         metrics: &TimelineMetrics,
     172            4440 :     ) {
     173            4440 :         let inmem = self
     174            4440 :             .layer_map
     175            4440 :             .frozen_layers
     176            4440 :             .pop_front()
     177            4440 :             .expect("there must be a inmem layer to flush");
     178            4440 : 
     179            4440 :         // Only one task may call this function at a time (for this
     180            4440 :         // timeline). If two tasks tried to flush the same frozen
     181            4440 :         // layer to disk at the same time, that would not work.
     182            4440 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     183                 : 
     184            4440 :         if let Some(l) = delta_layer {
     185            4396 :             let mut updates = self.layer_map.batch_update();
     186            4396 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     187            4396 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     188            4396 :             updates.flush();
     189            4396 :         }
     190            4440 :     }
     191                 : 
     192                 :     /// Called when compaction is completed.
     193             282 :     pub(crate) fn finish_compact_l0(
     194             282 :         &mut self,
     195             282 :         compact_from: &[Layer],
     196             282 :         compact_to: &[ResidentLayer],
     197             282 :         metrics: &TimelineMetrics,
     198             282 :     ) {
     199             282 :         let mut updates = self.layer_map.batch_update();
     200           10667 :         for l in compact_to {
     201           10385 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     202           10385 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     203           10385 :         }
     204            3885 :         for l in compact_from {
     205            3603 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     206            3603 :         }
     207             282 :         updates.flush();
     208             282 :     }
     209                 : 
     210                 :     /// Called when garbage collect has selected the layers to be removed.
     211              24 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     212              24 :         let mut updates = self.layer_map.batch_update();
     213            1426 :         for doomed_layer in gc_layers {
     214            1402 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     215            1402 :         }
     216              24 :         updates.flush()
     217              24 :     }
     218                 : 
     219                 :     /// Helper function to insert a layer into the layer map and file manager.
     220           78220 :     fn insert_historic_layer(
     221           78220 :         layer: Layer,
     222           78220 :         updates: &mut BatchedUpdates<'_>,
     223           78220 :         mapping: &mut LayerFileManager<Layer>,
     224           78220 :     ) {
     225           78220 :         updates.insert_historic(layer.layer_desc().clone());
     226           78220 :         mapping.insert(layer);
     227           78220 :     }
     228                 : 
     229                 :     /// Removes the layer from local FS (if present) and from memory.
     230                 :     /// Remote storage is not affected by this operation.
     231            5005 :     fn delete_historic_layer(
     232            5005 :         // we cannot remove layers otherwise, since gc and compaction will race
     233            5005 :         layer: &Layer,
     234            5005 :         updates: &mut BatchedUpdates<'_>,
     235            5005 :         mapping: &mut LayerFileManager<Layer>,
     236            5005 :     ) {
     237            5005 :         let desc = layer.layer_desc();
     238            5005 : 
     239            5005 :         // TODO Removing from the bottom of the layer map is expensive.
     240            5005 :         //      Maybe instead discard all layer map historic versions that
     241            5005 :         //      won't be needed for page reconstruction for this timeline,
     242            5005 :         //      and mark what we can't delete yet as deleted from the layer
     243            5005 :         //      map index without actually rebuilding the index.
     244            5005 :         updates.remove_historic(desc);
     245            5005 :         mapping.remove(layer);
     246            5005 :         layer.delete_on_drop();
     247            5005 :     }
     248                 : 
     249           10425 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     250           10425 :         self.layer_fmgr.contains(layer)
     251           10425 :     }
     252                 : }
     253                 : 
     254                 : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     255                 : 
     256                 : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     257        15472440 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     258        15472440 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     259        15472440 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     260        15472440 :         self.0
     261        15472440 :             .get(&desc.key())
     262        15472440 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     263        15472440 :             .expect("not found")
     264        15472440 :             .clone()
     265        15472440 :     }
     266                 : 
     267           78220 :     pub(crate) fn insert(&mut self, layer: T) {
     268           78220 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     269           78220 :         if present.is_some() && cfg!(debug_assertions) {
     270 UBC           0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     271 CBC       78220 :         }
     272           78220 :     }
     273                 : 
     274           10425 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     275           10425 :         self.0.contains_key(&layer.layer_desc().key())
     276           10425 :     }
     277                 : 
     278            1290 :     pub(crate) fn new() -> Self {
     279            1290 :         Self(HashMap::new())
     280            1290 :     }
     281                 : 
     282            5005 :     pub(crate) fn remove(&mut self, layer: &T) {
     283            5005 :         let present = self.0.remove(&layer.layer_desc().key());
     284            5005 :         if present.is_none() && cfg!(debug_assertions) {
     285 UBC           0 :             panic!(
     286               0 :                 "removing layer that is not present in layer mapping: {:?}",
     287               0 :                 layer.layer_desc()
     288               0 :             )
     289 CBC        5005 :         }
     290            5005 :     }
     291                 : }
        

Generated by: LCOV version 2.1-beta