LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 92.5 % 212 196
Test Date: 2024-04-08 10:22:05 Functions: 92.0 % 25 23

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use pageserver_api::shard::TenantShardId;
       3              : use std::{collections::HashMap, sync::Arc};
       4              : use tracing::trace;
       5              : use utils::{
       6              :     id::TimelineId,
       7              :     lsn::{AtomicLsn, Lsn},
       8              : };
       9              : 
      10              : use crate::{
      11              :     config::PageServerConf,
      12              :     metrics::TimelineMetrics,
      13              :     tenant::{
      14              :         layer_map::{BatchedUpdates, LayerMap},
      15              :         storage_layer::{
      16              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      17              :             ResidentLayer,
      18              :         },
      19              :     },
      20              : };
      21              : 
      22              : /// Provides semantic APIs to manipulate the layer map.
      23              : #[derive(Default)]
      24              : pub(crate) struct LayerManager {
      25              :     layer_map: LayerMap,
      26              :     layer_fmgr: LayerFileManager<Layer>,
      27              : }
      28              : 
      29              : impl LayerManager {
      30       126924 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      31       126924 :         self.layer_fmgr.get_from_desc(desc)
      32       126924 :     }
      33              : 
      34              :     /// Get an immutable reference to the layer map.
      35              :     ///
      36              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      37              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      38       631061 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      39       631061 :         &self.layer_map
      40       631061 :     }
      41              : 
      42              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      43              :     /// 1. all on-disk layers
      44              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      45            6 :     pub(crate) fn initialize_local_layers(
      46            6 :         &mut self,
      47            6 :         on_disk_layers: Vec<Layer>,
      48            6 :         next_open_layer_at: Lsn,
      49            6 :     ) {
      50            6 :         let mut updates = self.layer_map.batch_update();
      51           22 :         for layer in on_disk_layers {
      52           16 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      53           16 :         }
      54            6 :         updates.flush();
      55            6 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      56            6 :     }
      57              : 
      58              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      59          308 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      60          308 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      61          308 :     }
      62              : 
      63              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      64              :     /// called within `get_layer_for_write`.
      65      3648012 :     pub(crate) async fn get_layer_for_write(
      66      3648012 :         &mut self,
      67      3648012 :         lsn: Lsn,
      68      3648012 :         last_record_lsn: Lsn,
      69      3648012 :         conf: &'static PageServerConf,
      70      3648012 :         timeline_id: TimelineId,
      71      3648012 :         tenant_shard_id: TenantShardId,
      72      3648012 :     ) -> Result<Arc<InMemoryLayer>> {
      73      3648012 :         ensure!(lsn.is_aligned());
      74              : 
      75      3648012 :         ensure!(
      76      3648012 :             lsn > last_record_lsn,
      77            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      78              :             lsn,
      79              :             last_record_lsn,
      80              :         );
      81              : 
      82              :         // Do we have a layer open for writing already?
      83      3648012 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      84      3647208 :             if open_layer.get_lsn_range().start > lsn {
      85            0 :                 bail!(
      86            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      87            0 :                     open_layer.get_lsn_range().start,
      88            0 :                     lsn
      89            0 :                 );
      90      3647208 :             }
      91      3647208 : 
      92      3647208 :             Arc::clone(open_layer)
      93              :         } else {
      94              :             // No writeable layer yet. Create one.
      95          804 :             let start_lsn = self
      96          804 :                 .layer_map
      97          804 :                 .next_open_layer_at
      98          804 :                 .context("No next open layer found")?;
      99              : 
     100          804 :             trace!(
     101            0 :                 "creating in-memory layer at {}/{} for record at {}",
     102            0 :                 timeline_id,
     103            0 :                 start_lsn,
     104            0 :                 lsn
     105            0 :             );
     106              : 
     107          804 :             let new_layer =
     108          804 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
     109          804 :             let layer = Arc::new(new_layer);
     110          804 : 
     111          804 :             self.layer_map.open_layer = Some(layer.clone());
     112          804 :             self.layer_map.next_open_layer_at = None;
     113          804 : 
     114          804 :             layer
     115              :         };
     116              : 
     117      3648012 :         Ok(layer)
     118      3648012 :     }
     119              : 
     120              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     121          690 :     pub(crate) async fn try_freeze_in_memory_layer(
     122          690 :         &mut self,
     123          690 :         lsn: Lsn,
     124          690 :         last_freeze_at: &AtomicLsn,
     125          690 :     ) {
     126          690 :         let Lsn(last_record_lsn) = lsn;
     127          690 :         let end_lsn = Lsn(last_record_lsn + 1);
     128              : 
     129          690 :         if let Some(open_layer) = &self.layer_map.open_layer {
     130          684 :             let open_layer_rc = Arc::clone(open_layer);
     131          684 :             // Does this layer need freezing?
     132          684 :             open_layer.freeze(end_lsn).await;
     133              : 
     134              :             // The layer is no longer open, update the layer map to reflect this.
     135              :             // We will replace it with on-disk historics below.
     136          684 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     137          684 :             self.layer_map.open_layer = None;
     138          684 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     139            6 :         }
     140              : 
     141              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     142              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     143          690 :         last_freeze_at.store(end_lsn);
     144          690 :     }
     145              : 
     146              :     /// Add image layers to the layer map, called from `create_image_layers`.
     147          600 :     pub(crate) fn track_new_image_layers(
     148          600 :         &mut self,
     149          600 :         image_layers: &[ResidentLayer],
     150          600 :         metrics: &TimelineMetrics,
     151          600 :     ) {
     152          600 :         let mut updates = self.layer_map.batch_update();
     153          704 :         for layer in image_layers {
     154          104 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     155          104 : 
     156          104 :             // record these here instead of Layer::finish_creating because otherwise partial
     157          104 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     158          104 :             // is that all layers need to be created before metrics are updated.
     159          104 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     160          104 :         }
     161          600 :         updates.flush();
     162          600 :     }
     163              : 
     164              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     165          684 :     pub(crate) fn finish_flush_l0_layer(
     166          684 :         &mut self,
     167          684 :         delta_layer: Option<&ResidentLayer>,
     168          684 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     169          684 :         metrics: &TimelineMetrics,
     170          684 :     ) {
     171          684 :         let inmem = self
     172          684 :             .layer_map
     173          684 :             .frozen_layers
     174          684 :             .pop_front()
     175          684 :             .expect("there must be a inmem layer to flush");
     176          684 : 
     177          684 :         // Only one task may call this function at a time (for this
     178          684 :         // timeline). If two tasks tried to flush the same frozen
     179          684 :         // layer to disk at the same time, that would not work.
     180          684 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     181              : 
     182          684 :         if let Some(l) = delta_layer {
     183          594 :             let mut updates = self.layer_map.batch_update();
     184          594 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     185          594 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     186          594 :             updates.flush();
     187          594 :         }
     188          684 :     }
     189              : 
     190              :     /// Called when compaction is completed.
     191           42 :     pub(crate) fn finish_compact_l0(
     192           42 :         &mut self,
     193           42 :         compact_from: &[Layer],
     194           42 :         compact_to: &[ResidentLayer],
     195           42 :         metrics: &TimelineMetrics,
     196           42 :     ) {
     197           42 :         let mut updates = self.layer_map.batch_update();
     198          284 :         for l in compact_to {
     199          242 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     200          242 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     201          242 :         }
     202          484 :         for l in compact_from {
     203          442 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     204          442 :         }
     205           42 :         updates.flush();
     206           42 :     }
     207              : 
     208              :     /// Called when garbage collect has selected the layers to be removed.
     209            4 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     210            4 :         let mut updates = self.layer_map.batch_update();
     211            8 :         for doomed_layer in gc_layers {
     212            4 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     213            4 :         }
     214            4 :         updates.flush()
     215            4 :     }
     216              : 
     217              :     /// Helper function to insert a layer into the layer map and file manager.
     218          956 :     fn insert_historic_layer(
     219          956 :         layer: Layer,
     220          956 :         updates: &mut BatchedUpdates<'_>,
     221          956 :         mapping: &mut LayerFileManager<Layer>,
     222          956 :     ) {
     223          956 :         updates.insert_historic(layer.layer_desc().clone());
     224          956 :         mapping.insert(layer);
     225          956 :     }
     226              : 
     227              :     /// Removes the layer from local FS (if present) and from memory.
     228              :     /// Remote storage is not affected by this operation.
     229          446 :     fn delete_historic_layer(
     230          446 :         // we cannot remove layers otherwise, since gc and compaction will race
     231          446 :         layer: &Layer,
     232          446 :         updates: &mut BatchedUpdates<'_>,
     233          446 :         mapping: &mut LayerFileManager<Layer>,
     234          446 :     ) {
     235          446 :         let desc = layer.layer_desc();
     236          446 : 
     237          446 :         // TODO Removing from the bottom of the layer map is expensive.
     238          446 :         //      Maybe instead discard all layer map historic versions that
     239          446 :         //      won't be needed for page reconstruction for this timeline,
     240          446 :         //      and mark what we can't delete yet as deleted from the layer
     241          446 :         //      map index without actually rebuilding the index.
     242          446 :         updates.remove_historic(desc);
     243          446 :         mapping.remove(layer);
     244          446 :         layer.delete_on_drop();
     245          446 :     }
     246              : 
     247           14 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
     248           14 :         // for small layer maps, we most likely have all resident, but for larger more are likely
     249           14 :         // to be evicted assuming lots of layers correlated with longer lifespan.
     250           14 : 
     251           14 :         self.layer_map().iter_historic_layers().filter_map(|desc| {
     252           14 :             self.layer_fmgr
     253           14 :                 .0
     254           14 :                 .get(&desc.key())
     255           14 :                 .filter(|l| l.is_likely_resident())
     256           14 :                 .cloned()
     257           14 :         })
     258           14 :     }
     259              : 
     260          242 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     261          242 :         self.layer_fmgr.contains(layer)
     262          242 :     }
     263              : }
     264              : 
     265              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     266              : 
     267              : impl<T> Default for LayerFileManager<T> {
     268          314 :     fn default() -> Self {
     269          314 :         Self(HashMap::default())
     270          314 :     }
     271              : }
     272              : 
     273              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     274       126924 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     275       126924 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     276       126924 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     277       126924 :         self.0
     278       126924 :             .get(&desc.key())
     279       126924 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     280       126924 :             .expect("not found")
     281       126924 :             .clone()
     282       126924 :     }
     283              : 
     284          956 :     pub(crate) fn insert(&mut self, layer: T) {
     285          956 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     286          956 :         if present.is_some() && cfg!(debug_assertions) {
     287            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     288          956 :         }
     289          956 :     }
     290              : 
     291          242 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     292          242 :         self.0.contains_key(&layer.layer_desc().key())
     293          242 :     }
     294              : 
     295          446 :     pub(crate) fn remove(&mut self, layer: &T) {
     296          446 :         let present = self.0.remove(&layer.layer_desc().key());
     297          446 :         if present.is_none() && cfg!(debug_assertions) {
     298            0 :             panic!(
     299            0 :                 "removing layer that is not present in layer mapping: {:?}",
     300            0 :                 layer.layer_desc()
     301            0 :             )
     302          446 :         }
     303          446 :     }
     304              : }
        

Generated by: LCOV version 2.1-beta