LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 77.2 % 302 233
Test Date: 2024-09-20 13:14:58 Functions: 80.0 % 35 28

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context};
       2              : use itertools::Itertools;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use std::{collections::HashMap, sync::Arc};
       5              : use tracing::trace;
       6              : use utils::{
       7              :     id::TimelineId,
       8              :     lsn::{AtomicLsn, Lsn},
       9              : };
      10              : 
      11              : use crate::{
      12              :     config::PageServerConf,
      13              :     context::RequestContext,
      14              :     metrics::TimelineMetrics,
      15              :     tenant::{
      16              :         layer_map::{BatchedUpdates, LayerMap},
      17              :         storage_layer::{
      18              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      19              :             ResidentLayer,
      20              :         },
      21              :     },
      22              : };
      23              : 
      24              : use super::TimelineWriterState;
      25              : 
      26              : /// Provides semantic APIs to manipulate the layer map.
      27              : pub(crate) enum LayerManager {
      28              :     /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
      29              :     /// the layers.
      30              :     Open(OpenLayerManager),
      31              :     /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
      32              :     /// read-only.
      33              :     Closed {
      34              :         layers: HashMap<PersistentLayerKey, Layer>,
      35              :     },
      36              : }
      37              : 
      38              : impl Default for LayerManager {
      39         1242 :     fn default() -> Self {
      40         1242 :         LayerManager::Open(OpenLayerManager::default())
      41         1242 :     }
      42              : }
      43              : 
      44              : impl LayerManager {
      45       837625 :     pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
      46       837625 :         // The assumption for the `expect()` is that all code maintains the following invariant:
      47       837625 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
      48       837625 :         self.layers()
      49       837625 :             .get(key)
      50       837625 :             .with_context(|| format!("get layer from key: {key}"))
      51       837625 :             .expect("not found")
      52       837625 :             .clone()
      53       837625 :     }
      54              : 
      55       837577 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      56       837577 :         self.get_from_key(&desc.key())
      57       837577 :     }
      58              : 
      59              :     /// Get an immutable reference to the layer map.
      60              :     ///
      61              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      62              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      63      3146794 :     pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
      64              :         use LayerManager::*;
      65      3146794 :         match self {
      66      3146794 :             Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
      67            0 :             Closed { .. } => Err(Shutdown),
      68              :         }
      69      3146794 :     }
      70              : 
      71        14646 :     pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
      72              :         use LayerManager::*;
      73              : 
      74        14646 :         match self {
      75        14646 :             Open(open) => Ok(open),
      76            0 :             Closed { .. } => Err(Shutdown),
      77              :         }
      78        14646 :     }
      79              : 
      80              :     /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
      81              :     /// order to allow shutdown to complete.
      82              :     ///
      83              :     /// If there was a want to flush in-memory layers, it must have happened earlier.
      84           24 :     pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
      85              :         use LayerManager::*;
      86           24 :         match self {
      87              :             Open(OpenLayerManager {
      88           24 :                 layer_map,
      89           24 :                 layer_fmgr: LayerFileManager(hashmap),
      90           24 :             }) => {
      91           24 :                 let open = layer_map.open_layer.take();
      92           24 :                 let frozen = layer_map.frozen_layers.len();
      93           24 :                 let taken_writer_state = writer_state.take();
      94           24 :                 tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
      95           24 :                 let layers = std::mem::take(hashmap);
      96           24 :                 *self = Closed { layers };
      97           24 :                 assert_eq!(open.is_some(), taken_writer_state.is_some());
      98              :             }
      99              :             Closed { .. } => {
     100            0 :                 tracing::debug!("ignoring multiple shutdowns on layer manager")
     101              :             }
     102              :         }
     103           24 :     }
     104              : 
     105              :     /// Sum up the historic layer sizes
     106            0 :     pub(crate) fn layer_size_sum(&self) -> u64 {
     107            0 :         self.layers()
     108            0 :             .values()
     109            0 :             .map(|l| l.layer_desc().file_size)
     110            0 :             .sum()
     111            0 :     }
     112              : 
     113           66 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     114           99 :         self.layers().values().filter(|l| l.is_likely_resident())
     115           66 :     }
     116              : 
     117          924 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     118          924 :         self.contains_key(&layer.layer_desc().key())
     119          924 :     }
     120              : 
     121         1122 :     pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     122         1122 :         self.layers().contains_key(key)
     123         1122 :     }
     124              : 
     125            0 :     pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
     126            0 :         self.layers().keys().cloned().collect_vec()
     127            0 :     }
     128              : 
     129       838813 :     fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
     130              :         use LayerManager::*;
     131       838813 :         match self {
     132       838813 :             Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
     133            0 :             Closed { layers } => layers,
     134              :         }
     135       838813 :     }
     136              : }
     137              : 
     138              : #[derive(Default)]
     139              : pub(crate) struct OpenLayerManager {
     140              :     layer_map: LayerMap,
     141              :     layer_fmgr: LayerFileManager<Layer>,
     142              : }
     143              : 
     144              : impl std::fmt::Debug for OpenLayerManager {
     145            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     146            0 :         f.debug_struct("OpenLayerManager")
     147            0 :             .field("layer_count", &self.layer_fmgr.0.len())
     148            0 :             .finish()
     149            0 :     }
     150              : }
     151              : 
     152            0 : #[derive(Debug, thiserror::Error)]
     153              : #[error("layer manager has been shutdown")]
     154              : pub(crate) struct Shutdown;
     155              : 
     156              : impl OpenLayerManager {
     157              :     /// Called from `load_layer_map`. Initialize the layer manager with:
     158              :     /// 1. all on-disk layers
     159              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
     160           18 :     pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
     161           18 :         let mut updates = self.layer_map.batch_update();
     162           66 :         for layer in layers {
     163           48 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
     164           48 :         }
     165           18 :         updates.flush();
     166           18 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     167           18 :     }
     168              : 
     169              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
     170         1224 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
     171         1224 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     172         1224 :     }
     173              : 
     174              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the
     175              :     /// current open layer, called within `get_layer_for_write`.
     176         3816 :     pub(crate) async fn get_layer_for_write(
     177         3816 :         &mut self,
     178         3816 :         lsn: Lsn,
     179         3816 :         conf: &'static PageServerConf,
     180         3816 :         timeline_id: TimelineId,
     181         3816 :         tenant_shard_id: TenantShardId,
     182         3816 :         gate_guard: utils::sync::gate::GateGuard,
     183         3816 :         ctx: &RequestContext,
     184         3816 :     ) -> anyhow::Result<Arc<InMemoryLayer>> {
     185         3816 :         ensure!(lsn.is_aligned());
     186              : 
     187              :         // Do we have a layer open for writing already?
     188         3816 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     189            0 :             if open_layer.get_lsn_range().start > lsn {
     190            0 :                 bail!(
     191            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     192            0 :                     open_layer.get_lsn_range().start,
     193            0 :                     lsn
     194            0 :                 );
     195            0 :             }
     196            0 : 
     197            0 :             Arc::clone(open_layer)
     198              :         } else {
     199              :             // No writeable layer yet. Create one.
     200         3816 :             let start_lsn = self
     201         3816 :                 .layer_map
     202         3816 :                 .next_open_layer_at
     203         3816 :                 .context("No next open layer found")?;
     204              : 
     205         3816 :             trace!(
     206            0 :                 "creating in-memory layer at {}/{} for record at {}",
     207              :                 timeline_id,
     208              :                 start_lsn,
     209              :                 lsn
     210              :             );
     211              : 
     212         3816 :             let new_layer = InMemoryLayer::create(
     213         3816 :                 conf,
     214         3816 :                 timeline_id,
     215         3816 :                 tenant_shard_id,
     216         3816 :                 start_lsn,
     217         3816 :                 gate_guard,
     218         3816 :                 ctx,
     219         3816 :             )
     220         2160 :             .await?;
     221         3816 :             let layer = Arc::new(new_layer);
     222         3816 : 
     223         3816 :             self.layer_map.open_layer = Some(layer.clone());
     224         3816 :             self.layer_map.next_open_layer_at = None;
     225         3816 : 
     226         3816 :             layer
     227              :         };
     228              : 
     229         3816 :         Ok(layer)
     230         3816 :     }
     231              : 
     232              :     /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
     233              :     ///
     234              :     /// Returns true if anything was frozen.
     235         3510 :     pub(super) async fn try_freeze_in_memory_layer(
     236         3510 :         &mut self,
     237         3510 :         lsn: Lsn,
     238         3510 :         last_freeze_at: &AtomicLsn,
     239         3510 :         write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
     240         3510 :     ) -> bool {
     241         3510 :         let Lsn(last_record_lsn) = lsn;
     242         3510 :         let end_lsn = Lsn(last_record_lsn + 1);
     243              : 
     244         3510 :         let froze = if let Some(open_layer) = &self.layer_map.open_layer {
     245         3426 :             let open_layer_rc = Arc::clone(open_layer);
     246         3426 :             open_layer.freeze(end_lsn).await;
     247              : 
     248              :             // The layer is no longer open, update the layer map to reflect this.
     249              :             // We will replace it with on-disk historics below.
     250         3426 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     251         3426 :             self.layer_map.open_layer = None;
     252         3426 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     253         3426 : 
     254         3426 :             true
     255              :         } else {
     256           84 :             false
     257              :         };
     258              : 
     259              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     260              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     261         3510 :         last_freeze_at.store(end_lsn);
     262         3510 : 
     263         3510 :         // the writer state must no longer have a reference to the frozen layer
     264         3510 :         let taken = write_lock.take();
     265         3510 :         assert_eq!(
     266         3510 :             froze,
     267         3510 :             taken.is_some(),
     268            0 :             "should only had frozen a layer when TimelineWriterState existed"
     269              :         );
     270              : 
     271         3510 :         froze
     272         3510 :     }
     273              : 
     274              :     /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
     275         2136 :     pub(crate) fn track_new_image_layers(
     276         2136 :         &mut self,
     277         2136 :         image_layers: &[ResidentLayer],
     278         2136 :         metrics: &TimelineMetrics,
     279         2136 :     ) {
     280         2136 :         let mut updates = self.layer_map.batch_update();
     281         2772 :         for layer in image_layers {
     282          636 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     283          636 : 
     284          636 :             // record these here instead of Layer::finish_creating because otherwise partial
     285          636 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     286          636 :             // is that all layers need to be created before metrics are updated.
     287          636 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     288          636 :         }
     289         2136 :         updates.flush();
     290         2136 :     }
     291              : 
     292              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     293         3426 :     pub(crate) fn finish_flush_l0_layer(
     294         3426 :         &mut self,
     295         3426 :         delta_layer: Option<&ResidentLayer>,
     296         3426 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     297         3426 :         metrics: &TimelineMetrics,
     298         3426 :     ) {
     299         3426 :         let inmem = self
     300         3426 :             .layer_map
     301         3426 :             .frozen_layers
     302         3426 :             .pop_front()
     303         3426 :             .expect("there must be a inmem layer to flush");
     304         3426 : 
     305         3426 :         // Only one task may call this function at a time (for this
     306         3426 :         // timeline). If two tasks tried to flush the same frozen
     307         3426 :         // layer to disk at the same time, that would not work.
     308         3426 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     309              : 
     310         3426 :         if let Some(l) = delta_layer {
     311         2904 :             let mut updates = self.layer_map.batch_update();
     312         2904 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     313         2904 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     314         2904 :             updates.flush();
     315         2904 :         }
     316         3426 :     }
     317              : 
     318              :     /// Called when compaction is completed.
     319          150 :     pub(crate) fn finish_compact_l0(
     320          150 :         &mut self,
     321          150 :         compact_from: &[Layer],
     322          150 :         compact_to: &[ResidentLayer],
     323          150 :         metrics: &TimelineMetrics,
     324          150 :     ) {
     325          150 :         let mut updates = self.layer_map.batch_update();
     326         1140 :         for l in compact_to {
     327          990 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     328          990 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     329          990 :         }
     330         1500 :         for l in compact_from {
     331         1350 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     332         1350 :         }
     333          150 :         updates.flush();
     334          150 :     }
     335              : 
     336              :     /// Called when a GC-compaction is completed.
     337           66 :     pub(crate) fn finish_gc_compaction(
     338           66 :         &mut self,
     339           66 :         compact_from: &[Layer],
     340           66 :         compact_to: &[ResidentLayer],
     341           66 :         metrics: &TimelineMetrics,
     342           66 :     ) {
     343           66 :         // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
     344           66 :         self.finish_compact_l0(compact_from, compact_to, metrics)
     345           66 :     }
     346              : 
     347              :     /// Called post-compaction when some previous generation image layers were trimmed.
     348            0 :     pub(crate) fn rewrite_layers(
     349            0 :         &mut self,
     350            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     351            0 :         drop_layers: &[Layer],
     352            0 :         metrics: &TimelineMetrics,
     353            0 :     ) {
     354            0 :         let mut updates = self.layer_map.batch_update();
     355            0 :         for (old_layer, new_layer) in rewrite_layers {
     356            0 :             debug_assert_eq!(
     357            0 :                 old_layer.layer_desc().key_range,
     358            0 :                 new_layer.layer_desc().key_range
     359              :             );
     360            0 :             debug_assert_eq!(
     361            0 :                 old_layer.layer_desc().lsn_range,
     362            0 :                 new_layer.layer_desc().lsn_range
     363              :             );
     364              : 
     365              :             // Transfer visibility hint from old to new layer, since the new layer covers the same key space.  This is not guaranteed to
     366              :             // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
     367              :             // always marking rewritten layers as visible.
     368            0 :             new_layer.as_ref().set_visibility(old_layer.visibility());
     369            0 : 
     370            0 :             // Safety: we may never rewrite the same file in-place.  Callers are responsible
     371            0 :             // for ensuring that they only rewrite layers after something changes the path,
     372            0 :             // such as an increment in the generation number.
     373            0 :             assert_ne!(old_layer.local_path(), new_layer.local_path());
     374              : 
     375            0 :             Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
     376            0 : 
     377            0 :             Self::insert_historic_layer(
     378            0 :                 new_layer.as_ref().clone(),
     379            0 :                 &mut updates,
     380            0 :                 &mut self.layer_fmgr,
     381            0 :             );
     382            0 : 
     383            0 :             metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
     384              :         }
     385            0 :         for l in drop_layers {
     386            0 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     387            0 :         }
     388            0 :         updates.flush();
     389            0 :     }
     390              : 
     391              :     /// Called when garbage collect has selected the layers to be removed.
     392           36 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     393           36 :         let mut updates = self.layer_map.batch_update();
     394           84 :         for doomed_layer in gc_layers {
     395           48 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     396           48 :         }
     397           36 :         updates.flush()
     398           36 :     }
     399              : 
     400              :     #[cfg(test)]
     401          330 :     pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
     402          330 :         let mut updates = self.layer_map.batch_update();
     403          330 :         Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     404          330 :         updates.flush()
     405          330 :     }
     406              : 
     407              :     /// Helper function to insert a layer into the layer map and file manager.
     408         4908 :     fn insert_historic_layer(
     409         4908 :         layer: Layer,
     410         4908 :         updates: &mut BatchedUpdates<'_>,
     411         4908 :         mapping: &mut LayerFileManager<Layer>,
     412         4908 :     ) {
     413         4908 :         updates.insert_historic(layer.layer_desc().clone());
     414         4908 :         mapping.insert(layer);
     415         4908 :     }
     416              : 
     417              :     /// Removes the layer from local FS (if present) and from memory.
     418              :     /// Remote storage is not affected by this operation.
     419         1398 :     fn delete_historic_layer(
     420         1398 :         // we cannot remove layers otherwise, since gc and compaction will race
     421         1398 :         layer: &Layer,
     422         1398 :         updates: &mut BatchedUpdates<'_>,
     423         1398 :         mapping: &mut LayerFileManager<Layer>,
     424         1398 :     ) {
     425         1398 :         let desc = layer.layer_desc();
     426         1398 : 
     427         1398 :         // TODO Removing from the bottom of the layer map is expensive.
     428         1398 :         //      Maybe instead discard all layer map historic versions that
     429         1398 :         //      won't be needed for page reconstruction for this timeline,
     430         1398 :         //      and mark what we can't delete yet as deleted from the layer
     431         1398 :         //      map index without actually rebuilding the index.
     432         1398 :         updates.remove_historic(desc);
     433         1398 :         mapping.remove(layer);
     434         1398 :         layer.delete_on_drop();
     435         1398 :     }
     436              : }
     437              : 
     438              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     439              : 
     440              : impl<T> Default for LayerFileManager<T> {
     441         1242 :     fn default() -> Self {
     442         1242 :         Self(HashMap::default())
     443         1242 :     }
     444              : }
     445              : 
     446              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     447         4908 :     pub(crate) fn insert(&mut self, layer: T) {
     448         4908 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     449         4908 :         if present.is_some() && cfg!(debug_assertions) {
     450            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     451         4908 :         }
     452         4908 :     }
     453              : 
     454         1398 :     pub(crate) fn remove(&mut self, layer: &T) {
     455         1398 :         let present = self.0.remove(&layer.layer_desc().key());
     456         1398 :         if present.is_none() && cfg!(debug_assertions) {
     457            0 :             panic!(
     458            0 :                 "removing layer that is not present in layer mapping: {:?}",
     459            0 :                 layer.layer_desc()
     460            0 :             )
     461         1398 :         }
     462         1398 :     }
     463              : }
        

Generated by: LCOV version 2.1-beta