LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 80.0 % 285 228
Test Date: 2024-08-02 21:34:27 Functions: 90.6 % 32 29

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use itertools::Itertools;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use std::{collections::HashMap, sync::Arc};
       5              : use tracing::trace;
       6              : use utils::{
       7              :     id::TimelineId,
       8              :     lsn::{AtomicLsn, Lsn},
       9              : };
      10              : 
      11              : use crate::{
      12              :     config::PageServerConf,
      13              :     context::RequestContext,
      14              :     metrics::TimelineMetrics,
      15              :     tenant::{
      16              :         layer_map::{BatchedUpdates, LayerMap},
      17              :         storage_layer::{
      18              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      19              :             ResidentLayer,
      20              :         },
      21              :     },
      22              : };
      23              : 
      24              : use super::TimelineWriterState;
      25              : 
      26              : /// Provides semantic APIs to manipulate the layer map.
      27              : #[derive(Default)]
      28              : pub(crate) struct LayerManager {
      29              :     layer_map: LayerMap,
      30              :     layer_fmgr: LayerFileManager<Layer>,
      31              : }
      32              : 
      33              : impl LayerManager {
      34       280227 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      35       280227 :         self.layer_fmgr.get_from_desc(desc)
      36       280227 :     }
      37              : 
      38           32 :     pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer {
      39           32 :         self.layer_fmgr.get_from_key(desc)
      40           32 :     }
      41              : 
      42              :     /// Get an immutable reference to the layer map.
      43              :     ///
      44              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      45              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      46      1047755 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      47      1047755 :         &self.layer_map
      48      1047755 :     }
      49              : 
      50              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      51              :     /// 1. all on-disk layers
      52              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      53            6 :     pub(crate) fn initialize_local_layers(
      54            6 :         &mut self,
      55            6 :         on_disk_layers: Vec<Layer>,
      56            6 :         next_open_layer_at: Lsn,
      57            6 :     ) {
      58            6 :         let mut updates = self.layer_map.batch_update();
      59           22 :         for layer in on_disk_layers {
      60           16 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      61           16 :         }
      62            6 :         updates.flush();
      63            6 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      64            6 :     }
      65              : 
      66              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      67          392 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      68          392 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      69          392 :     }
      70              : 
      71              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      72              :     /// called within `get_layer_for_write`.
      73         1256 :     pub(crate) async fn get_layer_for_write(
      74         1256 :         &mut self,
      75         1256 :         lsn: Lsn,
      76         1256 :         last_record_lsn: Lsn,
      77         1256 :         conf: &'static PageServerConf,
      78         1256 :         timeline_id: TimelineId,
      79         1256 :         tenant_shard_id: TenantShardId,
      80         1256 :         ctx: &RequestContext,
      81         1256 :     ) -> Result<Arc<InMemoryLayer>> {
      82         1256 :         ensure!(lsn.is_aligned());
      83              : 
      84         1256 :         ensure!(
      85         1256 :             lsn > last_record_lsn,
      86            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
      87              :             lsn,
      88              :             last_record_lsn,
      89              :         );
      90              : 
      91              :         // Do we have a layer open for writing already?
      92         1256 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
      93            0 :             if open_layer.get_lsn_range().start > lsn {
      94            0 :                 bail!(
      95            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
      96            0 :                     open_layer.get_lsn_range().start,
      97            0 :                     lsn
      98            0 :                 );
      99            0 :             }
     100            0 : 
     101            0 :             Arc::clone(open_layer)
     102              :         } else {
     103              :             // No writeable layer yet. Create one.
     104         1256 :             let start_lsn = self
     105         1256 :                 .layer_map
     106         1256 :                 .next_open_layer_at
     107         1256 :                 .context("No next open layer found")?;
     108              : 
     109         1256 :             trace!(
     110            0 :                 "creating in-memory layer at {}/{} for record at {}",
     111              :                 timeline_id,
     112              :                 start_lsn,
     113              :                 lsn
     114              :             );
     115              : 
     116         1256 :             let new_layer =
     117         1256 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
     118         1256 :             let layer = Arc::new(new_layer);
     119         1256 : 
     120         1256 :             self.layer_map.open_layer = Some(layer.clone());
     121         1256 :             self.layer_map.next_open_layer_at = None;
     122         1256 : 
     123         1256 :             layer
     124              :         };
     125              : 
     126         1256 :         Ok(layer)
     127         1256 :     }
     128              : 
     129              :     /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
     130              :     ///
     131              :     /// Returns true if anything was frozen.
     132         1154 :     pub(super) async fn try_freeze_in_memory_layer(
     133         1154 :         &mut self,
     134         1154 :         lsn: Lsn,
     135         1154 :         last_freeze_at: &AtomicLsn,
     136         1154 :         write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
     137         1154 :     ) -> bool {
     138         1154 :         let Lsn(last_record_lsn) = lsn;
     139         1154 :         let end_lsn = Lsn(last_record_lsn + 1);
     140              : 
     141         1154 :         let froze = if let Some(open_layer) = &self.layer_map.open_layer {
     142         1126 :             let open_layer_rc = Arc::clone(open_layer);
     143         1126 :             open_layer.freeze(end_lsn).await;
     144              : 
     145              :             // The layer is no longer open, update the layer map to reflect this.
     146              :             // We will replace it with on-disk historics below.
     147         1126 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     148         1126 :             self.layer_map.open_layer = None;
     149         1126 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     150         1126 : 
     151         1126 :             true
     152              :         } else {
     153           28 :             false
     154              :         };
     155              : 
     156              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     157              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     158         1154 :         last_freeze_at.store(end_lsn);
     159         1154 : 
     160         1154 :         // the writer state must no longer have a reference to the frozen layer
     161         1154 :         let taken = write_lock.take();
     162         1154 :         assert_eq!(
     163         1154 :             froze,
     164         1154 :             taken.is_some(),
     165            0 :             "should only had frozen a layer when TimelineWriterState existed"
     166              :         );
     167              : 
     168         1154 :         froze
     169         1154 :     }
     170              : 
     171              :     /// Add image layers to the layer map, called from `create_image_layers`.
     172          522 :     pub(crate) fn track_new_image_layers(
     173          522 :         &mut self,
     174          522 :         image_layers: &[ResidentLayer],
     175          522 :         metrics: &TimelineMetrics,
     176          522 :     ) {
     177          522 :         let mut updates = self.layer_map.batch_update();
     178          720 :         for layer in image_layers {
     179          198 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     180          198 : 
     181          198 :             // record these here instead of Layer::finish_creating because otherwise partial
     182          198 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     183          198 :             // is that all layers need to be created before metrics are updated.
     184          198 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     185          198 :         }
     186          522 :         updates.flush();
     187          522 :     }
     188              : 
     189              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     190         1126 :     pub(crate) fn finish_flush_l0_layer(
     191         1126 :         &mut self,
     192         1126 :         delta_layer: Option<&ResidentLayer>,
     193         1126 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     194         1126 :         metrics: &TimelineMetrics,
     195         1126 :     ) {
     196         1126 :         let inmem = self
     197         1126 :             .layer_map
     198         1126 :             .frozen_layers
     199         1126 :             .pop_front()
     200         1126 :             .expect("there must be a inmem layer to flush");
     201         1126 : 
     202         1126 :         // Only one task may call this function at a time (for this
     203         1126 :         // timeline). If two tasks tried to flush the same frozen
     204         1126 :         // layer to disk at the same time, that would not work.
     205         1126 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     206              : 
     207         1126 :         if let Some(l) = delta_layer {
     208          968 :             let mut updates = self.layer_map.batch_update();
     209          968 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     210          968 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     211          968 :             updates.flush();
     212          968 :         }
     213         1126 :     }
     214              : 
     215              :     /// Called when compaction is completed.
     216           46 :     pub(crate) fn finish_compact_l0(
     217           46 :         &mut self,
     218           46 :         compact_from: &[Layer],
     219           46 :         compact_to: &[ResidentLayer],
     220           46 :         metrics: &TimelineMetrics,
     221           46 :     ) {
     222           46 :         let mut updates = self.layer_map.batch_update();
     223          388 :         for l in compact_to {
     224          342 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     225          342 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     226          342 :         }
     227          488 :         for l in compact_from {
     228          442 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     229          442 :         }
     230           46 :         updates.flush();
     231           46 :     }
     232              : 
     233              :     /// Called when a GC-compaction is completed.
     234           18 :     pub(crate) fn finish_gc_compaction(
     235           18 :         &mut self,
     236           18 :         compact_from: &[Layer],
     237           18 :         compact_to: &[ResidentLayer],
     238           18 :         metrics: &TimelineMetrics,
     239           18 :     ) {
     240           18 :         // We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
     241           18 :         self.finish_compact_l0(compact_from, compact_to, metrics)
     242           18 :     }
     243              : 
     244              :     /// Called when compaction is completed.
     245            0 :     pub(crate) fn rewrite_layers(
     246            0 :         &mut self,
     247            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     248            0 :         drop_layers: &[Layer],
     249            0 :         metrics: &TimelineMetrics,
     250            0 :     ) {
     251            0 :         let mut updates = self.layer_map.batch_update();
     252            0 :         for (old_layer, new_layer) in rewrite_layers {
     253            0 :             debug_assert_eq!(
     254            0 :                 old_layer.layer_desc().key_range,
     255            0 :                 new_layer.layer_desc().key_range
     256              :             );
     257            0 :             debug_assert_eq!(
     258            0 :                 old_layer.layer_desc().lsn_range,
     259            0 :                 new_layer.layer_desc().lsn_range
     260              :             );
     261              : 
     262              :             // Transfer visibilty hint from old to new layer, since the new layer covers the same key space.  This is not guaranteed to
     263              :             // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
     264              :             // always marking rewritten layers as visible.
     265            0 :             new_layer
     266            0 :                 .as_ref()
     267            0 :                 .access_stats()
     268            0 :                 .set_visibility(old_layer.access_stats().visibility());
     269            0 : 
     270            0 :             // Safety: we may never rewrite the same file in-place.  Callers are responsible
     271            0 :             // for ensuring that they only rewrite layers after something changes the path,
     272            0 :             // such as an increment in the generation number.
     273            0 :             assert_ne!(old_layer.local_path(), new_layer.local_path());
     274              : 
     275            0 :             Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
     276            0 : 
     277            0 :             Self::insert_historic_layer(
     278            0 :                 new_layer.as_ref().clone(),
     279            0 :                 &mut updates,
     280            0 :                 &mut self.layer_fmgr,
     281            0 :             );
     282            0 : 
     283            0 :             metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
     284              :         }
     285            0 :         for l in drop_layers {
     286            0 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     287            0 :         }
     288            0 :         updates.flush();
     289            0 :     }
     290              : 
     291              :     /// Called when garbage collect has selected the layers to be removed.
     292           12 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     293           12 :         let mut updates = self.layer_map.batch_update();
     294           28 :         for doomed_layer in gc_layers {
     295           16 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     296           16 :         }
     297           12 :         updates.flush()
     298           12 :     }
     299              : 
     300              :     #[cfg(test)]
     301           88 :     pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
     302           88 :         let mut updates = self.layer_map.batch_update();
     303           88 :         Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     304           88 :         updates.flush()
     305           88 :     }
     306              : 
     307              :     /// Helper function to insert a layer into the layer map and file manager.
     308         1612 :     fn insert_historic_layer(
     309         1612 :         layer: Layer,
     310         1612 :         updates: &mut BatchedUpdates<'_>,
     311         1612 :         mapping: &mut LayerFileManager<Layer>,
     312         1612 :     ) {
     313         1612 :         updates.insert_historic(layer.layer_desc().clone());
     314         1612 :         mapping.insert(layer);
     315         1612 :     }
     316              : 
     317              :     /// Removes the layer from local FS (if present) and from memory.
     318              :     /// Remote storage is not affected by this operation.
     319          458 :     fn delete_historic_layer(
     320          458 :         // we cannot remove layers otherwise, since gc and compaction will race
     321          458 :         layer: &Layer,
     322          458 :         updates: &mut BatchedUpdates<'_>,
     323          458 :         mapping: &mut LayerFileManager<Layer>,
     324          458 :     ) {
     325          458 :         let desc = layer.layer_desc();
     326          458 : 
     327          458 :         // TODO Removing from the bottom of the layer map is expensive.
     328          458 :         //      Maybe instead discard all layer map historic versions that
     329          458 :         //      won't be needed for page reconstruction for this timeline,
     330          458 :         //      and mark what we can't delete yet as deleted from the layer
     331          458 :         //      map index without actually rebuilding the index.
     332          458 :         updates.remove_historic(desc);
     333          458 :         mapping.remove(layer);
     334          458 :         layer.delete_on_drop();
     335          458 :     }
     336              : 
     337           20 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
     338           20 :         // for small layer maps, we most likely have all resident, but for larger more are likely
     339           20 :         // to be evicted assuming lots of layers correlated with longer lifespan.
     340           20 : 
     341           24 :         self.layer_map().iter_historic_layers().filter_map(|desc| {
     342           24 :             self.layer_fmgr
     343           24 :                 .0
     344           24 :                 .get(&desc.key())
     345           24 :                 .filter(|l| l.is_likely_resident())
     346           24 :                 .cloned()
     347           24 :         })
     348           20 :     }
     349              : 
     350          308 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     351          308 :         self.layer_fmgr.contains(layer)
     352          308 :     }
     353              : 
     354           82 :     pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     355           82 :         self.layer_fmgr.contains_key(key)
     356           82 :     }
     357              : 
     358            0 :     pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
     359            0 :         self.layer_fmgr.0.keys().cloned().collect_vec()
     360            0 :     }
     361              : }
     362              : 
     363              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     364              : 
     365              : impl<T> Default for LayerFileManager<T> {
     366          398 :     fn default() -> Self {
     367          398 :         Self(HashMap::default())
     368          398 :     }
     369              : }
     370              : 
     371              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     372       280259 :     fn get_from_key(&self, key: &PersistentLayerKey) -> T {
     373       280259 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     374       280259 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     375       280259 :         self.0
     376       280259 :             .get(key)
     377       280259 :             .with_context(|| format!("get layer from key: {}", key))
     378       280259 :             .expect("not found")
     379       280259 :             .clone()
     380       280259 :     }
     381              : 
     382       280227 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
     383       280227 :         self.get_from_key(&desc.key())
     384       280227 :     }
     385              : 
     386           82 :     fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     387           82 :         self.0.contains_key(key)
     388           82 :     }
     389              : 
     390         1612 :     pub(crate) fn insert(&mut self, layer: T) {
     391         1612 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     392         1612 :         if present.is_some() && cfg!(debug_assertions) {
     393            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     394         1612 :         }
     395         1612 :     }
     396              : 
     397          308 :     pub(crate) fn contains(&self, layer: &T) -> bool {
     398          308 :         self.0.contains_key(&layer.layer_desc().key())
     399          308 :     }
     400              : 
     401          458 :     pub(crate) fn remove(&mut self, layer: &T) {
     402          458 :         let present = self.0.remove(&layer.layer_desc().key());
     403          458 :         if present.is_none() && cfg!(debug_assertions) {
     404            0 :             panic!(
     405            0 :                 "removing layer that is not present in layer mapping: {:?}",
     406            0 :                 layer.layer_desc()
     407            0 :             )
     408          458 :         }
     409          458 :     }
     410              : }
        

Generated by: LCOV version 2.1-beta