LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 87.9 % 224 197
Test Date: 2024-05-21 18:28:29 Functions: 92.0 % 25 23

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use pageserver_api::shard::TenantShardId;
       3              : use std::{collections::HashMap, sync::Arc};
       4              : use tracing::trace;
       5              : use utils::{
       6              :     id::TimelineId,
       7              :     lsn::{AtomicLsn, Lsn},
       8              : };
       9              : 
      10              : use crate::{
      11              :     config::PageServerConf,
      12              :     context::RequestContext,
      13              :     metrics::TimelineMetrics,
      14              :     tenant::{
      15              :         layer_map::{BatchedUpdates, LayerMap},
      16              :         storage_layer::{
      17              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      18              :             ResidentLayer,
      19              :         },
      20              :     },
      21              : };
      22              : 
      23              : /// Provides semantic APIs to manipulate the layer map.
      24              : #[derive(Default)]
      25              : pub(crate) struct LayerManager {
      26              :     layer_map: LayerMap,
      27              :     layer_fmgr: LayerFileManager<Layer>,
      28              : }
      29              : 
      30              : impl LayerManager {
      31       276061 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      32       276061 :         self.layer_fmgr.get_from_desc(desc)
      33       276061 :     }
      34              : 
      35              :     /// Get an immutable reference to the layer map.
      36              :     ///
      37              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      38              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      39       820285 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      40       820285 :         &self.layer_map
      41       820285 :     }
      42              : 
      43              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      44              :     /// 1. all on-disk layers
      45              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      46            6 :     pub(crate) fn initialize_local_layers(
      47            6 :         &mut self,
      48            6 :         on_disk_layers: Vec<Layer>,
      49            6 :         next_open_layer_at: Lsn,
      50            6 :     ) {
      51            6 :         let mut updates = self.layer_map.batch_update();
      52           22 :         for layer in on_disk_layers {
      53           16 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      54           16 :         }
      55            6 :         updates.flush();
      56            6 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      57            6 :     }
      58              : 
      59              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      60          346 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      61          346 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      62          346 :     }
      63              : 
      64              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      65              :     /// called within `get_layer_for_write`.
      66      4804100 :     pub(crate) async fn get_layer_for_write(
      67      4804100 :         &mut self,
      68      4804100 :         lsn: Lsn,
      69      4804100 :         last_record_lsn: Lsn,
      70      4804100 :         conf: &'static PageServerConf,
      71      4804100 :         timeline_id: TimelineId,
      72      4804100 :         tenant_shard_id: TenantShardId,
      73      4804100 :         ctx: &RequestContext,
      74      4804100 :     ) -> Result<Arc<InMemoryLayer>> {
      75      4804100 :         ensure!(lsn.is_aligned());
      76              : 
      77      4804100 :         ensure!(
      78      4804100 :             lsn > last_record_lsn,
      79            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      80              :             lsn,
      81              :             last_record_lsn,
      82              :         );
      83              : 
      84              :         // Do we have a layer open for writing already?
      85      4804100 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      86      4802964 :             if open_layer.get_lsn_range().start > lsn {
      87            0 :                 bail!(
      88            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      89            0 :                     open_layer.get_lsn_range().start,
      90            0 :                     lsn
      91            0 :                 );
      92      4802964 :             }
      93      4802964 : 
      94      4802964 :             Arc::clone(open_layer)
      95              :         } else {
      96              :             // No writeable layer yet. Create one.
      97         1136 :             let start_lsn = self
      98         1136 :                 .layer_map
      99         1136 :                 .next_open_layer_at
     100         1136 :                 .context("No next open layer found")?;
     101              : 
     102         1136 :             trace!(
     103            0 :                 "creating in-memory layer at {}/{} for record at {}",
     104              :                 timeline_id,
     105              :                 start_lsn,
     106              :                 lsn
     107              :             );
     108              : 
     109         1136 :             let new_layer =
     110         1136 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
     111         1136 :             let layer = Arc::new(new_layer);
     112         1136 : 
     113         1136 :             self.layer_map.open_layer = Some(layer.clone());
     114         1136 :             self.layer_map.next_open_layer_at = None;
     115         1136 : 
     116         1136 :             layer
     117              :         };
     118              : 
     119      4804100 :         Ok(layer)
     120      4804100 :     }
     121              : 
     122              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     123         1038 :     pub(crate) async fn try_freeze_in_memory_layer(
     124         1038 :         &mut self,
     125         1038 :         lsn: Lsn,
     126         1038 :         last_freeze_at: &AtomicLsn,
     127         1038 :     ) {
     128         1038 :         let Lsn(last_record_lsn) = lsn;
     129         1038 :         let end_lsn = Lsn(last_record_lsn + 1);
     130              : 
     131         1038 :         if let Some(open_layer) = &self.layer_map.open_layer {
     132         1010 :             let open_layer_rc = Arc::clone(open_layer);
     133         1010 :             // Does this layer need freezing?
     134         1010 :             open_layer.freeze(end_lsn).await;
     135              : 
     136              :             // The layer is no longer open, update the layer map to reflect this.
     137              :             // We will replace it with on-disk historics below.
     138         1010 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     139         1010 :             self.layer_map.open_layer = None;
     140         1010 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     141           28 :         }
     142              : 
     143              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     144              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     145         1038 :         last_freeze_at.store(end_lsn);
     146         1038 :     }
     147              : 
     148              :     /// Add image layers to the layer map, called from `create_image_layers`.
     149          480 :     pub(crate) fn track_new_image_layers(
     150          480 :         &mut self,
     151          480 :         image_layers: &[ResidentLayer],
     152          480 :         metrics: &TimelineMetrics,
     153          480 :     ) {
     154          480 :         let mut updates = self.layer_map.batch_update();
     155          638 :         for layer in image_layers {
     156          158 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     157          158 : 
     158          158 :             // record these here instead of Layer::finish_creating because otherwise partial
     159          158 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     160          158 :             // is that all layers need to be created before metrics are updated.
     161          158 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     162          158 :         }
     163          480 :         updates.flush();
     164          480 :     }
     165              : 
     166              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     167         1010 :     pub(crate) fn finish_flush_l0_layer(
     168         1010 :         &mut self,
     169         1010 :         delta_layer: Option<&ResidentLayer>,
     170         1010 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     171         1010 :         metrics: &TimelineMetrics,
     172         1010 :     ) {
     173         1010 :         let inmem = self
     174         1010 :             .layer_map
     175         1010 :             .frozen_layers
     176         1010 :             .pop_front()
     177         1010 :             .expect("there must be a inmem layer to flush");
     178         1010 : 
     179         1010 :         // Only one task may call this function at a time (for this
     180         1010 :         // timeline). If two tasks tried to flush the same frozen
     181         1010 :         // layer to disk at the same time, that would not work.
     182         1010 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     183              : 
     184         1010 :         if let Some(l) = delta_layer {
     185          896 :             let mut updates = self.layer_map.batch_update();
     186          896 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     187          896 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     188          896 :             updates.flush();
     189          896 :         }
     190         1010 :     }
     191              : 
     192              :     /// Called when compaction is completed.
     193           28 :     pub(crate) fn finish_compact_l0(
     194           28 :         &mut self,
     195           28 :         compact_from: &[Layer],
     196           28 :         compact_to: &[ResidentLayer],
     197           28 :         metrics: &TimelineMetrics,
     198           28 :     ) {
     199           28 :         let mut updates = self.layer_map.batch_update();
     200          256 :         for l in compact_to {
     201          228 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     202          228 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     203          228 :         }
     204          350 :         for l in compact_from {
     205          322 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     206          322 :         }
     207           28 :         updates.flush();
     208           28 :     }
     209              : 
     210              :     /// Called when compaction is completed.
     211            0 :     pub(crate) fn rewrite_layers(
     212            0 :         &mut self,
     213            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     214            0 :         drop_layers: &[Layer],
     215            0 :         _metrics: &TimelineMetrics,
     216            0 :     ) {
     217            0 :         let mut updates = self.layer_map.batch_update();
     218            0 : 
     219            0 :         // TODO: implement rewrites (currently this code path only used for drops)
     220            0 :         assert!(rewrite_layers.is_empty());
     221              : 
     222            0 :         for l in drop_layers {
     223            0 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     224            0 :         }
     225            0 :         updates.flush();
     226            0 :     }
     227              : 
     228              :     /// Called when garbage collect has selected the layers to be removed.
     229           10 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     230           10 :         let mut updates = self.layer_map.batch_update();
     231           24 :         for doomed_layer in gc_layers {
     232           14 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     233           14 :         }
     234           10 :         updates.flush()
     235           10 :     }
     236              : 
     237              :     /// Helper function to insert a layer into the layer map and file manager.
     238         1298 :     fn insert_historic_layer(
     239         1298 :         layer: Layer,
     240         1298 :         updates: &mut BatchedUpdates<'_>,
     241         1298 :         mapping: &mut LayerFileManager<Layer>,
     242         1298 :     ) {
     243         1298 :         updates.insert_historic(layer.layer_desc().clone());
     244         1298 :         mapping.insert(layer);
     245         1298 :     }
     246              : 
     247              :     /// Removes the layer from local FS (if present) and from memory.
     248              :     /// Remote storage is not affected by this operation.
     249          336 :     fn delete_historic_layer(
     250          336 :         // we cannot remove layers otherwise, since gc and compaction will race
     251          336 :         layer: &Layer,
     252          336 :         updates: &mut BatchedUpdates<'_>,
     253          336 :         mapping: &mut LayerFileManager<Layer>,
     254          336 :     ) {
     255          336 :         let desc = layer.layer_desc();
     256          336 : 
     257          336 :         // TODO Removing from the bottom of the layer map is expensive.
     258          336 :         //      Maybe instead discard all layer map historic versions that
     259          336 :         //      won't be needed for page reconstruction for this timeline,
     260          336 :         //      and mark what we can't delete yet as deleted from the layer
     261          336 :         //      map index without actually rebuilding the index.
     262          336 :         updates.remove_historic(desc);
     263          336 :         mapping.remove(layer);
     264          336 :         layer.delete_on_drop();
     265          336 :     }
     266              : 
     267           20 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
     268           20 :         // for small layer maps, we most likely have all resident, but for larger more are likely
     269           20 :         // to be evicted assuming lots of layers correlated with longer lifespan.
     270           20 : 
     271           24 :         self.layer_map().iter_historic_layers().filter_map(|desc| {
     272           24 :             self.layer_fmgr
     273           24 :                 .0
     274           24 :                 .get(&desc.key())
     275           24 :                 .filter(|l| l.is_likely_resident())
     276           24 :                 .cloned()
     277           24 :         })
     278           20 :     }
     279              : 
     280          228 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     281          228 :         self.layer_fmgr.contains(layer)
     282          228 :     }
     283              : }
     284              : 
     285              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     286              : 
     287              : impl<T> Default for LayerFileManager<T> {
     288          352 :     fn default() -> Self {
     289          352 :         Self(HashMap::default())
     290          352 :     }
     291              : }
     292              : 
     293              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     294       276061 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     295       276061 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     296       276061 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     297       276061 :         self.0
     298       276061 :             .get(&desc.key())
     299       276061 :             .with_context(|| format!("get layer from desc: {}", desc.layer_name()))
     300       276061 :             .expect("not found")
     301       276061 :             .clone()
     302       276061 :     }
     303              : 
     304         1298 :     pub(crate) fn insert(&mut self, layer: T) {
     305         1298 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     306         1298 :         if present.is_some() && cfg!(debug_assertions) {
     307            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     308         1298 :         }
     309         1298 :     }
     310              : 
     311          228 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     312          228 :         self.0.contains_key(&layer.layer_desc().key())
     313          228 :     }
     314              : 
     315          336 :     pub(crate) fn remove(&mut self, layer: &T) {
     316          336 :         let present = self.0.remove(&layer.layer_desc().key());
     317          336 :         if present.is_none() && cfg!(debug_assertions) {
     318            0 :             panic!(
     319            0 :                 "removing layer that is not present in layer mapping: {:?}",
     320            0 :                 layer.layer_desc()
     321            0 :             )
     322          336 :         }
     323          336 :     }
     324              : }
        

Generated by: LCOV version 2.1-beta