LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: f2bfe5dc5ab550768e936d6bc7b94d9b2e2d4cc9.info Lines: 80.5 % 328 264
Test Date: 2025-01-27 20:39:28 Functions: 83.3 % 36 30

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context};
       2              : use itertools::Itertools;
       3              : use pageserver_api::shard::TenantShardId;
       4              : use std::{collections::HashMap, sync::Arc};
       5              : use tracing::trace;
       6              : use utils::{
       7              :     id::TimelineId,
       8              :     lsn::{AtomicLsn, Lsn},
       9              : };
      10              : 
      11              : use crate::{
      12              :     config::PageServerConf,
      13              :     context::RequestContext,
      14              :     metrics::TimelineMetrics,
      15              :     tenant::{
      16              :         layer_map::{BatchedUpdates, LayerMap},
      17              :         storage_layer::{
      18              :             AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
      19              :             ResidentLayer,
      20              :         },
      21              :     },
      22              : };
      23              : 
      24              : use super::TimelineWriterState;
      25              : 
      26              : /// Provides semantic APIs to manipulate the layer map.
      27              : pub(crate) enum LayerManager {
      28              :     /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
      29              :     /// the layers.
      30              :     Open(OpenLayerManager),
      31              :     /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
      32              :     /// read-only.
      33              :     Closed {
      34              :         layers: HashMap<PersistentLayerKey, Layer>,
      35              :     },
      36              : }
      37              : 
      38              : impl Default for LayerManager {
      39          892 :     fn default() -> Self {
      40          892 :         LayerManager::Open(OpenLayerManager::default())
      41          892 :     }
      42              : }
      43              : 
      44              : impl LayerManager {
      45       482070 :     pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
      46       482070 :         // The assumption for the `expect()` is that all code maintains the following invariant:
      47       482070 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
      48       482070 :         self.try_get_from_key(key)
      49       482070 :             .with_context(|| format!("get layer from key: {key}"))
      50       482070 :             .expect("not found")
      51       482070 :             .clone()
      52       482070 :     }
      53              : 
      54       482070 :     pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
      55       482070 :         self.layers().get(key)
      56       482070 :     }
      57              : 
      58       482026 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
      59       482026 :         self.get_from_key(&desc.key())
      60       482026 :     }
      61              : 
      62              :     /// Get an immutable reference to the layer map.
      63              :     ///
      64              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      65              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      66      2154157 :     pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
      67              :         use LayerManager::*;
      68      2154157 :         match self {
      69      2154157 :             Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
      70            0 :             Closed { .. } => Err(Shutdown),
      71              :         }
      72      2154157 :     }
      73              : 
      74         9828 :     pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
      75              :         use LayerManager::*;
      76              : 
      77         9828 :         match self {
      78         9828 :             Open(open) => Ok(open),
      79            0 :             Closed { .. } => Err(Shutdown),
      80              :         }
      81         9828 :     }
      82              : 
      83              :     /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
      84              :     /// order to allow shutdown to complete.
      85              :     ///
      86              :     /// If there was a want to flush in-memory layers, it must have happened earlier.
      87           20 :     pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
      88              :         use LayerManager::*;
      89           20 :         match self {
      90              :             Open(OpenLayerManager {
      91           20 :                 layer_map,
      92           20 :                 layer_fmgr: LayerFileManager(hashmap),
      93           20 :             }) => {
      94           20 :                 let open = layer_map.open_layer.take();
      95           20 :                 let frozen = layer_map.frozen_layers.len();
      96           20 :                 let taken_writer_state = writer_state.take();
      97           20 :                 tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
      98           20 :                 let layers = std::mem::take(hashmap);
      99           20 :                 *self = Closed { layers };
     100           20 :                 assert_eq!(open.is_some(), taken_writer_state.is_some());
     101              :             }
     102              :             Closed { .. } => {
     103            0 :                 tracing::debug!("ignoring multiple shutdowns on layer manager")
     104              :             }
     105              :         }
     106           20 :     }
     107              : 
     108              :     /// Sum up the historic layer sizes
     109            0 :     pub(crate) fn layer_size_sum(&self) -> u64 {
     110            0 :         self.layers()
     111            0 :             .values()
     112            0 :             .map(|l| l.layer_desc().file_size)
     113            0 :             .sum()
     114            0 :     }
     115              : 
     116           44 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     117           71 :         self.layers().values().filter(|l| l.is_likely_resident())
     118           44 :     }
     119              : 
     120          616 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     121          616 :         self.contains_key(&layer.layer_desc().key())
     122          616 :     }
     123              : 
     124          820 :     pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     125          820 :         self.layers().contains_key(key)
     126          820 :     }
     127              : 
     128            0 :     pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
     129            0 :         self.layers().keys().cloned().collect_vec()
     130            0 :     }
     131              : 
     132       482934 :     fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
     133              :         use LayerManager::*;
     134       482934 :         match self {
     135       482934 :             Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
     136            0 :             Closed { layers } => layers,
     137              :         }
     138       482934 :     }
     139              : }
     140              : 
     141              : #[derive(Default)]
     142              : pub(crate) struct OpenLayerManager {
     143              :     layer_map: LayerMap,
     144              :     layer_fmgr: LayerFileManager<Layer>,
     145              : }
     146              : 
     147              : impl std::fmt::Debug for OpenLayerManager {
     148            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     149            0 :         f.debug_struct("OpenLayerManager")
     150            0 :             .field("layer_count", &self.layer_fmgr.0.len())
     151            0 :             .finish()
     152            0 :     }
     153              : }
     154              : 
     155              : #[derive(Debug, thiserror::Error)]
     156              : #[error("layer manager has been shutdown")]
     157              : pub(crate) struct Shutdown;
     158              : 
     159              : impl OpenLayerManager {
     160              :     /// Called from `load_layer_map`. Initialize the layer manager with:
     161              :     /// 1. all on-disk layers
     162              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
     163           12 :     pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
     164           12 :         let mut updates = self.layer_map.batch_update();
     165           44 :         for layer in layers {
     166           32 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
     167           32 :         }
     168           12 :         updates.flush();
     169           12 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     170           12 :     }
     171              : 
     172              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
     173          880 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
     174          880 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     175          880 :     }
     176              : 
     177              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the
     178              :     /// current open layer, called within `get_layer_for_write`.
     179         2592 :     pub(crate) async fn get_layer_for_write(
     180         2592 :         &mut self,
     181         2592 :         lsn: Lsn,
     182         2592 :         conf: &'static PageServerConf,
     183         2592 :         timeline_id: TimelineId,
     184         2592 :         tenant_shard_id: TenantShardId,
     185         2592 :         gate: &utils::sync::gate::Gate,
     186         2592 :         ctx: &RequestContext,
     187         2592 :     ) -> anyhow::Result<Arc<InMemoryLayer>> {
     188         2592 :         ensure!(lsn.is_aligned());
     189              : 
     190              :         // Do we have a layer open for writing already?
     191         2592 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     192            0 :             if open_layer.get_lsn_range().start > lsn {
     193            0 :                 bail!(
     194            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     195            0 :                     open_layer.get_lsn_range().start,
     196            0 :                     lsn
     197            0 :                 );
     198            0 :             }
     199            0 : 
     200            0 :             Arc::clone(open_layer)
     201              :         } else {
     202              :             // No writeable layer yet. Create one.
     203         2592 :             let start_lsn = self
     204         2592 :                 .layer_map
     205         2592 :                 .next_open_layer_at
     206         2592 :                 .context("No next open layer found")?;
     207              : 
     208         2592 :             trace!(
     209            0 :                 "creating in-memory layer at {}/{} for record at {}",
     210              :                 timeline_id,
     211              :                 start_lsn,
     212              :                 lsn
     213              :             );
     214              : 
     215         2592 :             let new_layer =
     216         2592 :                 InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx)
     217         2592 :                     .await?;
     218         2592 :             let layer = Arc::new(new_layer);
     219         2592 : 
     220         2592 :             self.layer_map.open_layer = Some(layer.clone());
     221         2592 :             self.layer_map.next_open_layer_at = None;
     222         2592 : 
     223         2592 :             layer
     224              :         };
     225              : 
     226         2592 :         Ok(layer)
     227         2592 :     }
     228              : 
     229              :     /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
     230              :     ///
     231              :     /// Returns true if anything was frozen.
     232         2400 :     pub(super) async fn try_freeze_in_memory_layer(
     233         2400 :         &mut self,
     234         2400 :         lsn: Lsn,
     235         2400 :         last_freeze_at: &AtomicLsn,
     236         2400 :         write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
     237         2400 :     ) -> bool {
     238         2400 :         let Lsn(last_record_lsn) = lsn;
     239         2400 :         let end_lsn = Lsn(last_record_lsn + 1);
     240              : 
     241         2400 :         let froze = if let Some(open_layer) = &self.layer_map.open_layer {
     242         2344 :             let open_layer_rc = Arc::clone(open_layer);
     243         2344 :             open_layer.freeze(end_lsn).await;
     244              : 
     245              :             // The layer is no longer open, update the layer map to reflect this.
     246              :             // We will replace it with on-disk historics below.
     247         2344 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     248         2344 :             self.layer_map.open_layer = None;
     249         2344 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     250         2344 : 
     251         2344 :             true
     252              :         } else {
     253           56 :             false
     254              :         };
     255              : 
     256              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     257              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     258         2400 :         last_freeze_at.store(end_lsn);
     259         2400 : 
     260         2400 :         // the writer state must no longer have a reference to the frozen layer
     261         2400 :         let taken = write_lock.take();
     262         2400 :         assert_eq!(
     263         2400 :             froze,
     264         2400 :             taken.is_some(),
     265            0 :             "should only had frozen a layer when TimelineWriterState existed"
     266              :         );
     267              : 
     268         2400 :         froze
     269         2400 :     }
     270              : 
     271              :     /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
     272         1136 :     pub(crate) fn track_new_image_layers(
     273         1136 :         &mut self,
     274         1136 :         image_layers: &[ResidentLayer],
     275         1136 :         metrics: &TimelineMetrics,
     276         1136 :     ) {
     277         1136 :         let mut updates = self.layer_map.batch_update();
     278         1620 :         for layer in image_layers {
     279          484 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     280          484 : 
     281          484 :             // record these here instead of Layer::finish_creating because otherwise partial
     282          484 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     283          484 :             // is that all layers need to be created before metrics are updated.
     284          484 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     285          484 :         }
     286         1136 :         updates.flush();
     287         1136 :     }
     288              : 
     289              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     290         2344 :     pub(crate) fn finish_flush_l0_layer(
     291         2344 :         &mut self,
     292         2344 :         delta_layer: Option<&ResidentLayer>,
     293         2344 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     294         2344 :         metrics: &TimelineMetrics,
     295         2344 :     ) {
     296         2344 :         let inmem = self
     297         2344 :             .layer_map
     298         2344 :             .frozen_layers
     299         2344 :             .pop_front()
     300         2344 :             .expect("there must be a inmem layer to flush");
     301         2344 : 
     302         2344 :         // Only one task may call this function at a time (for this
     303         2344 :         // timeline). If two tasks tried to flush the same frozen
     304         2344 :         // layer to disk at the same time, that would not work.
     305         2344 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     306              : 
     307         2344 :         if let Some(l) = delta_layer {
     308         1936 :             let mut updates = self.layer_map.batch_update();
     309         1936 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     310         1936 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     311         1936 :             updates.flush();
     312         1936 :         }
     313         2344 :     }
     314              : 
     315              :     /// Called when compaction is completed.
     316           56 :     pub(crate) fn finish_compact_l0(
     317           56 :         &mut self,
     318           56 :         compact_from: &[Layer],
     319           56 :         compact_to: &[ResidentLayer],
     320           56 :         metrics: &TimelineMetrics,
     321           56 :     ) {
     322           56 :         let mut updates = self.layer_map.batch_update();
     323          672 :         for l in compact_to {
     324          616 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     325          616 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     326          616 :         }
     327          860 :         for l in compact_from {
     328          804 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     329          804 :         }
     330           56 :         updates.flush();
     331           56 :     }
     332              : 
     333              :     /// Called when a GC-compaction is completed.
     334           96 :     pub(crate) fn finish_gc_compaction(
     335           96 :         &mut self,
     336           96 :         compact_from: &[Layer],
     337           96 :         compact_to: &[ResidentLayer],
     338           96 :         metrics: &TimelineMetrics,
     339           96 :     ) {
     340           96 :         // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
     341           96 : 
     342           96 :         // Match the old layers with the new layers
     343           96 :         let mut add_layers = HashMap::new();
     344           96 :         let mut rewrite_layers = HashMap::new();
     345           96 :         let mut drop_layers = HashMap::new();
     346          304 :         for layer in compact_from {
     347          208 :             drop_layers.insert(layer.layer_desc().key(), layer.clone());
     348          208 :         }
     349          204 :         for layer in compact_to {
     350          108 :             if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
     351            0 :                 rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
     352          108 :             } else {
     353          108 :                 add_layers.insert(layer.layer_desc().key(), layer.clone());
     354          108 :             }
     355              :         }
     356           96 :         let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
     357           96 :         let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
     358           96 :         let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
     359           96 : 
     360           96 :         self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
     361           96 :     }
     362              : 
     363              :     /// Called post-compaction when some previous generation image layers were trimmed.
     364            0 :     pub fn rewrite_layers(
     365            0 :         &mut self,
     366            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     367            0 :         drop_layers: &[Layer],
     368            0 :         metrics: &TimelineMetrics,
     369            0 :     ) {
     370            0 :         self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
     371            0 :     }
     372              : 
     373           96 :     fn rewrite_layers_inner(
     374           96 :         &mut self,
     375           96 :         rewrite_layers: &[(Layer, ResidentLayer)],
     376           96 :         drop_layers: &[Layer],
     377           96 :         add_layers: &[ResidentLayer],
     378           96 :         metrics: &TimelineMetrics,
     379           96 :     ) {
     380           96 :         let mut updates = self.layer_map.batch_update();
     381           96 :         for (old_layer, new_layer) in rewrite_layers {
     382            0 :             debug_assert_eq!(
     383            0 :                 old_layer.layer_desc().key_range,
     384            0 :                 new_layer.layer_desc().key_range
     385              :             );
     386            0 :             debug_assert_eq!(
     387            0 :                 old_layer.layer_desc().lsn_range,
     388            0 :                 new_layer.layer_desc().lsn_range
     389              :             );
     390              : 
     391              :             // Transfer visibility hint from old to new layer, since the new layer covers the same key space.  This is not guaranteed to
     392              :             // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
     393              :             // always marking rewritten layers as visible.
     394            0 :             new_layer.as_ref().set_visibility(old_layer.visibility());
     395            0 : 
     396            0 :             // Safety: we may never rewrite the same file in-place.  Callers are responsible
     397            0 :             // for ensuring that they only rewrite layers after something changes the path,
     398            0 :             // such as an increment in the generation number.
     399            0 :             assert_ne!(old_layer.local_path(), new_layer.local_path());
     400              : 
     401            0 :             Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
     402            0 : 
     403            0 :             Self::insert_historic_layer(
     404            0 :                 new_layer.as_ref().clone(),
     405            0 :                 &mut updates,
     406            0 :                 &mut self.layer_fmgr,
     407            0 :             );
     408            0 : 
     409            0 :             metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
     410              :         }
     411          304 :         for l in drop_layers {
     412          208 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     413          208 :         }
     414          204 :         for l in add_layers {
     415          108 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     416          108 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     417          108 :         }
     418           96 :         updates.flush();
     419           96 :     }
     420              : 
     421              :     /// Called when garbage collect has selected the layers to be removed.
     422           16 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     423           16 :         let mut updates = self.layer_map.batch_update();
     424           36 :         for doomed_layer in gc_layers {
     425           20 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     426           20 :         }
     427           16 :         updates.flush()
     428           16 :     }
     429              : 
     430              :     #[cfg(test)]
     431          296 :     pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
     432          296 :         let mut updates = self.layer_map.batch_update();
     433          296 :         Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     434          296 :         updates.flush()
     435          296 :     }
     436              : 
     437              :     /// Helper function to insert a layer into the layer map and file manager.
     438         3472 :     fn insert_historic_layer(
     439         3472 :         layer: Layer,
     440         3472 :         updates: &mut BatchedUpdates<'_>,
     441         3472 :         mapping: &mut LayerFileManager<Layer>,
     442         3472 :     ) {
     443         3472 :         updates.insert_historic(layer.layer_desc().clone());
     444         3472 :         mapping.insert(layer);
     445         3472 :     }
     446              : 
     447              :     /// Removes the layer from local FS (if present) and from memory.
     448              :     /// Remote storage is not affected by this operation.
     449         1032 :     fn delete_historic_layer(
     450         1032 :         // we cannot remove layers otherwise, since gc and compaction will race
     451         1032 :         layer: &Layer,
     452         1032 :         updates: &mut BatchedUpdates<'_>,
     453         1032 :         mapping: &mut LayerFileManager<Layer>,
     454         1032 :     ) {
     455         1032 :         let desc = layer.layer_desc();
     456         1032 : 
     457         1032 :         // TODO Removing from the bottom of the layer map is expensive.
     458         1032 :         //      Maybe instead discard all layer map historic versions that
     459         1032 :         //      won't be needed for page reconstruction for this timeline,
     460         1032 :         //      and mark what we can't delete yet as deleted from the layer
     461         1032 :         //      map index without actually rebuilding the index.
     462         1032 :         updates.remove_historic(desc);
     463         1032 :         mapping.remove(layer);
     464         1032 :         layer.delete_on_drop();
     465         1032 :     }
     466              : }
     467              : 
     468              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     469              : 
     470              : impl<T> Default for LayerFileManager<T> {
     471          892 :     fn default() -> Self {
     472          892 :         Self(HashMap::default())
     473          892 :     }
     474              : }
     475              : 
     476              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     477         3472 :     pub(crate) fn insert(&mut self, layer: T) {
     478         3472 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     479         3472 :         if present.is_some() && cfg!(debug_assertions) {
     480            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     481         3472 :         }
     482         3472 :     }
     483              : 
     484         1032 :     pub(crate) fn remove(&mut self, layer: &T) {
     485         1032 :         let present = self.0.remove(&layer.layer_desc().key());
     486         1032 :         if present.is_none() && cfg!(debug_assertions) {
     487            0 :             panic!(
     488            0 :                 "removing layer that is not present in layer mapping: {:?}",
     489            0 :                 layer.layer_desc()
     490            0 :             )
     491         1032 :         }
     492         1032 :     }
     493              : }
        

Generated by: LCOV version 2.1-beta