LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 82.8 % 418 346
Test Date: 2025-07-16 12:29:03 Functions: 85.2 % 54 46

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::mem::ManuallyDrop;
       3              : use std::ops::{Deref, DerefMut};
       4              : use std::sync::Arc;
       5              : use std::time::Duration;
       6              : 
       7              : use anyhow::{Context, bail, ensure};
       8              : use itertools::Itertools;
       9              : use pageserver_api::keyspace::KeySpace;
      10              : use pageserver_api::shard::TenantShardId;
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::trace;
      13              : use utils::id::TimelineId;
      14              : use utils::lsn::{AtomicLsn, Lsn};
      15              : 
      16              : use super::{LayerFringe, ReadableLayer, TimelineWriterState};
      17              : use crate::config::PageServerConf;
      18              : use crate::context::RequestContext;
      19              : use crate::metrics::TimelineMetrics;
      20              : use crate::tenant::layer_map::{BatchedUpdates, LayerMap, SearchResult};
      21              : use crate::tenant::storage_layer::{
      22              :     AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
      23              :     PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
      24              : };
      25              : 
      26              : /// Warn if the lock was held for longer than this threshold.
      27              : /// It's very generous and we should bring this value down over time.
      28              : const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
      29              : const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
      30              : 
      31              : /// Describes the operation that is holding the layer manager lock
      32              : #[derive(Debug, Clone, Copy, strum_macros::Display)]
      33              : #[strum(serialize_all = "kebab_case")]
      34              : pub(crate) enum LayerManagerLockHolder {
      35              :     GetLayerMapInfo,
      36              :     GenerateHeatmap,
      37              :     GetPage,
      38              :     Init,
      39              :     LoadLayerMap,
      40              :     GetLayerForWrite,
      41              :     TryFreezeLayer,
      42              :     FlushFrozenLayer,
      43              :     FlushLoop,
      44              :     Compaction,
      45              :     GarbageCollection,
      46              :     Shutdown,
      47              :     ImportPgData,
      48              :     DetachAncestor,
      49              :     Eviction,
      50              :     ComputeImageConsistentLsn,
      51              :     #[cfg(test)]
      52              :     Testing,
      53              : }
      54              : 
      55              : /// Wrapper for the layer manager that tracks the amount of time during which
      56              : /// it was held under read or write lock
      57              : #[derive(Default)]
      58              : pub(crate) struct LockedLayerManager {
      59              :     locked: tokio::sync::RwLock<LayerManager>,
      60              : }
      61              : 
      62              : pub(crate) struct LayerManagerReadGuard<'a> {
      63              :     guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
      64              :     acquired_at: std::time::Instant,
      65              :     holder: LayerManagerLockHolder,
      66              : }
      67              : 
      68              : pub(crate) struct LayerManagerWriteGuard<'a> {
      69              :     guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
      70              :     acquired_at: std::time::Instant,
      71              :     holder: LayerManagerLockHolder,
      72              : }
      73              : 
      74              : impl Drop for LayerManagerReadGuard<'_> {
      75       556861 :     fn drop(&mut self) {
      76              :         // Drop the lock first, before potentially warning if it was held for too long.
      77              :         // SAFETY: ManuallyDrop in Drop implementation
      78       556861 :         unsafe { ManuallyDrop::drop(&mut self.guard) };
      79              : 
      80       556861 :         let held_for = self.acquired_at.elapsed();
      81       556861 :         if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
      82            0 :             tracing::warn!(
      83              :                 holder=%self.holder,
      84            0 :                 "Layer manager read lock held for {}s",
      85            0 :                 held_for.as_secs_f64(),
      86              :             );
      87       556861 :         }
      88       556861 :     }
      89              : }
      90              : 
      91              : impl Drop for LayerManagerWriteGuard<'_> {
      92         2441 :     fn drop(&mut self) {
      93              :         // Drop the lock first, before potentially warning if it was held for too long.
      94              :         // SAFETY: ManuallyDrop in Drop implementation
      95         2441 :         unsafe { ManuallyDrop::drop(&mut self.guard) };
      96              : 
      97         2441 :         let held_for = self.acquired_at.elapsed();
      98         2441 :         if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
      99            0 :             tracing::warn!(
     100              :                 holder=%self.holder,
     101            0 :                 "Layer manager write lock held for {}s",
     102            0 :                 held_for.as_secs_f64(),
     103              :             );
     104         2441 :         }
     105         2441 :     }
     106              : }
     107              : 
     108              : impl Deref for LayerManagerReadGuard<'_> {
     109              :     type Target = LayerManager;
     110              : 
     111       568289 :     fn deref(&self) -> &Self::Target {
     112       568289 :         self.guard.deref()
     113       568289 :     }
     114              : }
     115              : 
     116              : impl Deref for LayerManagerWriteGuard<'_> {
     117              :     type Target = LayerManager;
     118              : 
     119          168 :     fn deref(&self) -> &Self::Target {
     120          168 :         self.guard.deref()
     121          168 :     }
     122              : }
     123              : 
     124              : impl DerefMut for LayerManagerWriteGuard<'_> {
     125         2441 :     fn deref_mut(&mut self) -> &mut Self::Target {
     126         2441 :         self.guard.deref_mut()
     127         2441 :     }
     128              : }
     129              : 
     130              : impl LockedLayerManager {
     131       556861 :     pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
     132       556861 :         let guard = ManuallyDrop::new(self.locked.read().await);
     133       556861 :         LayerManagerReadGuard {
     134       556861 :             guard,
     135       556861 :             acquired_at: std::time::Instant::now(),
     136       556861 :             holder,
     137       556861 :         }
     138       556861 :     }
     139              : 
     140            0 :     pub(crate) fn try_read(
     141            0 :         &self,
     142            0 :         holder: LayerManagerLockHolder,
     143            0 :     ) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
     144            0 :         let guard = ManuallyDrop::new(self.locked.try_read()?);
     145              : 
     146            0 :         Ok(LayerManagerReadGuard {
     147            0 :             guard,
     148            0 :             acquired_at: std::time::Instant::now(),
     149            0 :             holder,
     150            0 :         })
     151            0 :     }
     152              : 
     153         2209 :     pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
     154         2209 :         let guard = ManuallyDrop::new(self.locked.write().await);
     155         2209 :         LayerManagerWriteGuard {
     156         2209 :             guard,
     157         2209 :             acquired_at: std::time::Instant::now(),
     158         2209 :             holder,
     159         2209 :         }
     160         2209 :     }
     161              : 
     162          232 :     pub(crate) fn try_write(
     163          232 :         &self,
     164          232 :         holder: LayerManagerLockHolder,
     165          232 :     ) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
     166          232 :         let guard = ManuallyDrop::new(self.locked.try_write()?);
     167              : 
     168          232 :         Ok(LayerManagerWriteGuard {
     169          232 :             guard,
     170          232 :             acquired_at: std::time::Instant::now(),
     171          232 :             holder,
     172          232 :         })
     173          232 :     }
     174              : }
     175              : 
     176              : /// Provides semantic APIs to manipulate the layer map.
     177              : pub(crate) enum LayerManager {
     178              :     /// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
     179              :     /// the layers.
     180              :     Open(OpenLayerManager),
     181              :     /// Shutdown layer manager where there are no more in-memory layers and persistent layers are
     182              :     /// read-only.
     183              :     Closed {
     184              :         layers: HashMap<PersistentLayerKey, Layer>,
     185              :     },
     186              : }
     187              : 
     188              : impl Default for LayerManager {
     189          235 :     fn default() -> Self {
     190          235 :         LayerManager::Open(OpenLayerManager::default())
     191          235 :     }
     192              : }
     193              : 
     194              : impl LayerManager {
     195       460553 :     fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
     196       460553 :         match weak {
     197       149392 :             ReadableLayerWeak::PersistentLayer(desc) => {
     198       149392 :                 ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
     199              :             }
     200       311161 :             ReadableLayerWeak::InMemoryLayer(desc) => {
     201       311161 :                 let inmem = self
     202       311161 :                     .layer_map()
     203       311161 :                     .expect("no concurrent shutdown")
     204       311161 :                     .in_memory_layer(&desc);
     205       311161 :                 ReadableLayer::InMemoryLayer(inmem)
     206              :             }
     207              :         }
     208       460553 :     }
     209              : 
     210       149880 :     pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
     211              :         // The assumption for the `expect()` is that all code maintains the following invariant:
     212              :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     213       149880 :         self.try_get_from_key(key)
     214       149880 :             .with_context(|| format!("get layer from key: {key}"))
     215       149880 :             .expect("not found")
     216       149880 :             .clone()
     217       149880 :     }
     218              : 
     219       149907 :     pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
     220       149907 :         self.layers().get(key)
     221       149907 :     }
     222              : 
     223       149869 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
     224       149869 :         self.get_from_key(&desc.key())
     225       149869 :     }
     226              : 
     227              :     /// Get an immutable reference to the layer map.
     228              :     ///
     229              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
     230              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
     231       878677 :     pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
     232              :         use LayerManager::*;
     233       878677 :         match self {
     234       878677 :             Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
     235            0 :             Closed { .. } => Err(Shutdown),
     236              :         }
     237       878677 :     }
     238              : 
     239         2436 :     pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
     240              :         use LayerManager::*;
     241              : 
     242         2436 :         match self {
     243         2436 :             Open(open) => Ok(open),
     244            0 :             Closed { .. } => Err(Shutdown),
     245              :         }
     246         2436 :     }
     247              : 
     248              :     /// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
     249              :     /// order to allow shutdown to complete.
     250              :     ///
     251              :     /// If there was a want to flush in-memory layers, it must have happened earlier.
     252            5 :     pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
     253              :         use LayerManager::*;
     254            5 :         match self {
     255              :             Open(OpenLayerManager {
     256            5 :                 layer_map,
     257            5 :                 layer_fmgr: LayerFileManager(hashmap),
     258              :             }) => {
     259              :                 // NB: no need to decrement layer metrics; metrics are removed on timeline shutdown.
     260            5 :                 let open = layer_map.open_layer.take();
     261            5 :                 let frozen = layer_map.frozen_layers.len();
     262            5 :                 let taken_writer_state = writer_state.take();
     263            5 :                 tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
     264            5 :                 let layers = std::mem::take(hashmap);
     265            5 :                 *self = Closed { layers };
     266            5 :                 assert_eq!(open.is_some(), taken_writer_state.is_some());
     267              :             }
     268              :             Closed { .. } => {
     269            0 :                 tracing::debug!("ignoring multiple shutdowns on layer manager")
     270              :             }
     271              :         }
     272            5 :     }
     273              : 
     274              :     /// Sum up the historic layer sizes
     275            0 :     pub(crate) fn layer_size_sum(&self) -> u64 {
     276            0 :         self.layers()
     277            0 :             .values()
     278            0 :             .map(|l| l.layer_desc().file_size)
     279            0 :             .sum()
     280            0 :     }
     281              : 
     282           20 :     pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     283           56 :         self.layers().values().filter(|l| l.is_likely_resident())
     284           20 :     }
     285              : 
     286            0 :     pub(crate) fn visible_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
     287            0 :         self.layers()
     288            0 :             .values()
     289            0 :             .filter(|l| l.visibility() == LayerVisibilityHint::Visible)
     290            0 :     }
     291              : 
     292          163 :     pub(crate) fn contains(&self, layer: &Layer) -> bool {
     293          163 :         self.contains_key(&layer.layer_desc().key())
     294          163 :     }
     295              : 
     296          214 :     pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
     297          214 :         self.layers().contains_key(key)
     298          214 :     }
     299              : 
     300          192 :     pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
     301          192 :         self.layers().keys().cloned().collect_vec()
     302          192 :     }
     303              : 
     304              :     /// Update the [`LayerFringe`] of a read request
     305              :     ///
     306              :     /// Take a key space at a given LSN and query the layer map below each range
     307              :     /// of the key space to find the next layers to visit.
     308       565789 :     pub(crate) fn update_search_fringe(
     309       565789 :         &self,
     310       565789 :         keyspace: &KeySpace,
     311       565789 :         cont_lsn: Lsn,
     312       565789 :         fringe: &mut LayerFringe,
     313       565789 :     ) -> Result<(), Shutdown> {
     314       565789 :         let map = self.layer_map()?;
     315              : 
     316       575634 :         for range in keyspace.ranges.iter() {
     317       575634 :             let results = map.range_search(range.clone(), cont_lsn);
     318       575634 :             results
     319       575634 :                 .found
     320       575634 :                 .into_iter()
     321       575634 :                 .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
     322       460553 :                     (
     323       460553 :                         self.upgrade(layer),
     324       460553 :                         keyspace_accum.to_keyspace(),
     325       460553 :                         lsn_floor..cont_lsn,
     326       460553 :                     )
     327       460553 :                 })
     328       575634 :                 .for_each(|(layer, keyspace, lsn_range)| fringe.update(layer, keyspace, lsn_range));
     329              :         }
     330              : 
     331       565789 :         Ok(())
     332       565789 :     }
     333              : 
     334       150333 :     fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
     335              :         use LayerManager::*;
     336       150333 :         match self {
     337       150333 :             Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
     338            0 :             Closed { layers } => layers,
     339              :         }
     340       150333 :     }
     341              : }
     342              : 
     343              : #[derive(Default)]
     344              : pub(crate) struct OpenLayerManager {
     345              :     layer_map: LayerMap,
     346              :     layer_fmgr: LayerFileManager<Layer>,
     347              : }
     348              : 
     349              : impl std::fmt::Debug for OpenLayerManager {
     350            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     351            0 :         f.debug_struct("OpenLayerManager")
     352            0 :             .field("layer_count", &self.layer_fmgr.0.len())
     353            0 :             .finish()
     354            0 :     }
     355              : }
     356              : 
     357              : #[derive(Debug, thiserror::Error)]
     358              : #[error("layer manager has been shutdown")]
     359              : pub(crate) struct Shutdown;
     360              : 
     361              : impl OpenLayerManager {
     362              :     /// Called from `load_layer_map`. Initialize the layer manager with:
     363              :     /// 1. all on-disk layers
     364              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
     365            3 :     pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
     366            3 :         let mut updates = self.layer_map.batch_update();
     367           11 :         for layer in layers {
     368            8 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
     369            8 :         }
     370            3 :         updates.flush();
     371            3 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     372            3 :     }
     373              : 
     374              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
     375          232 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
     376          232 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
     377          232 :     }
     378              : 
     379              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the
     380              :     /// current open layer, called within `get_layer_for_write`.
     381              :     #[allow(clippy::too_many_arguments)]
     382          660 :     pub(crate) async fn get_layer_for_write(
     383          660 :         &mut self,
     384          660 :         lsn: Lsn,
     385          660 :         conf: &'static PageServerConf,
     386          660 :         timeline_id: TimelineId,
     387          660 :         tenant_shard_id: TenantShardId,
     388          660 :         gate: &utils::sync::gate::Gate,
     389          660 :         cancel: &CancellationToken,
     390          660 :         ctx: &RequestContext,
     391          660 :     ) -> anyhow::Result<Arc<InMemoryLayer>> {
     392          660 :         ensure!(lsn.is_aligned());
     393              : 
     394              :         // Do we have a layer open for writing already?
     395          660 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     396            0 :             if open_layer.get_lsn_range().start > lsn {
     397            0 :                 bail!(
     398            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     399            0 :                     open_layer.get_lsn_range().start,
     400              :                     lsn
     401              :                 );
     402            0 :             }
     403              : 
     404            0 :             Arc::clone(open_layer)
     405              :         } else {
     406              :             // No writeable layer yet. Create one.
     407          660 :             let start_lsn = self
     408          660 :                 .layer_map
     409          660 :                 .next_open_layer_at
     410          660 :                 .context("No next open layer found")?;
     411              : 
     412          660 :             trace!(
     413            0 :                 "creating in-memory layer at {}/{} for record at {}",
     414              :                 timeline_id, start_lsn, lsn
     415              :             );
     416              : 
     417          660 :             let new_layer = InMemoryLayer::create(
     418          660 :                 conf,
     419          660 :                 timeline_id,
     420          660 :                 tenant_shard_id,
     421          660 :                 start_lsn,
     422          660 :                 gate,
     423          660 :                 cancel,
     424          660 :                 ctx,
     425          660 :             )
     426          660 :             .await?;
     427          660 :             let layer = Arc::new(new_layer);
     428              : 
     429          660 :             self.layer_map.open_layer = Some(layer.clone());
     430          660 :             self.layer_map.next_open_layer_at = None;
     431              : 
     432          660 :             layer
     433              :         };
     434              : 
     435          660 :         Ok(layer)
     436          660 :     }
     437              : 
     438              :     /// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
     439              :     ///
     440              :     /// Returns true if anything was frozen.
     441          611 :     pub(super) async fn try_freeze_in_memory_layer(
     442          611 :         &mut self,
     443          611 :         lsn: Lsn,
     444          611 :         last_freeze_at: &AtomicLsn,
     445          611 :         write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
     446          611 :         metrics: &TimelineMetrics,
     447          611 :     ) -> bool {
     448          611 :         let Lsn(last_record_lsn) = lsn;
     449          611 :         let end_lsn = Lsn(last_record_lsn + 1);
     450              : 
     451          611 :         let froze = if let Some(open_layer) = &self.layer_map.open_layer {
     452          597 :             let open_layer_rc = Arc::clone(open_layer);
     453          597 :             open_layer.freeze(end_lsn).await;
     454              : 
     455              :             // Increment the frozen layer metrics. This is decremented in `finish_flush_l0_layer()`.
     456              :             // TODO: It would be nicer to do this via `InMemoryLayer::drop()`, but it requires a
     457              :             // reference to the timeline metrics. Other methods use a metrics borrow as well.
     458          597 :             metrics.inc_frozen_layer(open_layer);
     459              : 
     460              :             // The layer is no longer open, update the layer map to reflect this.
     461              :             // We will replace it with on-disk historics below.
     462          597 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     463          597 :             self.layer_map.open_layer = None;
     464          597 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     465              : 
     466          597 :             true
     467              :         } else {
     468           14 :             false
     469              :         };
     470              : 
     471              :         // Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
     472              :         // accounts for regions in the LSN range where we might have ingested no data due to sharding.
     473          611 :         last_freeze_at.store(end_lsn);
     474              : 
     475              :         // the writer state must no longer have a reference to the frozen layer
     476          611 :         let taken = write_lock.take();
     477          611 :         assert_eq!(
     478              :             froze,
     479          611 :             taken.is_some(),
     480            0 :             "should only had frozen a layer when TimelineWriterState existed"
     481              :         );
     482              : 
     483          611 :         froze
     484          611 :     }
     485              : 
     486              :     /// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
     487          191 :     pub(crate) fn track_new_image_layers(
     488          191 :         &mut self,
     489          191 :         image_layers: &[ResidentLayer],
     490          191 :         metrics: &TimelineMetrics,
     491          191 :     ) {
     492          191 :         let mut updates = self.layer_map.batch_update();
     493          321 :         for layer in image_layers {
     494          130 :             Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     495          130 : 
     496          130 :             // record these here instead of Layer::finish_creating because otherwise partial
     497          130 :             // failure with create_image_layers would balloon up the physical size gauge. downside
     498          130 :             // is that all layers need to be created before metrics are updated.
     499          130 :             metrics.record_new_file_metrics(layer.layer_desc().file_size);
     500          130 :         }
     501          191 :         updates.flush();
     502          191 :     }
     503              : 
     504              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     505          596 :     pub(crate) fn finish_flush_l0_layer(
     506          596 :         &mut self,
     507          596 :         delta_layer: Option<&ResidentLayer>,
     508          596 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     509          596 :         metrics: &TimelineMetrics,
     510          596 :     ) {
     511          596 :         let inmem = self
     512          596 :             .layer_map
     513          596 :             .frozen_layers
     514          596 :             .pop_front()
     515          596 :             .expect("there must be a inmem layer to flush");
     516          596 :         metrics.dec_frozen_layer(&inmem);
     517              : 
     518              :         // Only one task may call this function at a time (for this
     519              :         // timeline). If two tasks tried to flush the same frozen
     520              :         // layer to disk at the same time, that would not work.
     521          596 :         assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
     522              : 
     523          596 :         if let Some(l) = delta_layer {
     524          485 :             let mut updates = self.layer_map.batch_update();
     525          485 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     526          485 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     527          485 :             updates.flush();
     528          485 :         }
     529          596 :     }
     530              : 
     531              :     /// Called when compaction is completed.
     532           23 :     pub(crate) fn finish_compact_l0(
     533           23 :         &mut self,
     534           23 :         compact_from: &[Layer],
     535           23 :         compact_to: &[ResidentLayer],
     536           23 :         metrics: &TimelineMetrics,
     537           23 :     ) {
     538           23 :         let mut updates = self.layer_map.batch_update();
     539          186 :         for l in compact_to {
     540          163 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     541          163 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     542          163 :         }
     543          224 :         for l in compact_from {
     544          201 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     545          201 :         }
     546           23 :         updates.flush();
     547           23 :     }
     548              : 
     549              :     /// Called when a GC-compaction is completed.
     550           24 :     pub(crate) fn finish_gc_compaction(
     551           24 :         &mut self,
     552           24 :         compact_from: &[Layer],
     553           24 :         compact_to: &[ResidentLayer],
     554           24 :         metrics: &TimelineMetrics,
     555           24 :     ) {
     556              :         // gc-compaction could contain layer rewrites. We need to delete the old layers and insert the new ones.
     557              : 
     558              :         // Match the old layers with the new layers
     559           24 :         let mut add_layers = HashMap::new();
     560           24 :         let mut rewrite_layers = HashMap::new();
     561           24 :         let mut drop_layers = HashMap::new();
     562           76 :         for layer in compact_from {
     563           52 :             drop_layers.insert(layer.layer_desc().key(), layer.clone());
     564           52 :         }
     565           51 :         for layer in compact_to {
     566           27 :             if let Some(old_layer) = drop_layers.remove(&layer.layer_desc().key()) {
     567            0 :                 rewrite_layers.insert(layer.layer_desc().key(), (old_layer.clone(), layer.clone()));
     568           27 :             } else {
     569           27 :                 add_layers.insert(layer.layer_desc().key(), layer.clone());
     570           27 :             }
     571              :         }
     572           24 :         let add_layers = add_layers.values().cloned().collect::<Vec<_>>();
     573           24 :         let drop_layers = drop_layers.values().cloned().collect::<Vec<_>>();
     574           24 :         let rewrite_layers = rewrite_layers.values().cloned().collect::<Vec<_>>();
     575              : 
     576           24 :         self.rewrite_layers_inner(&rewrite_layers, &drop_layers, &add_layers, metrics);
     577           24 :     }
     578              : 
     579              :     /// Called post-compaction when some previous generation image layers were trimmed.
     580            0 :     pub fn rewrite_layers(
     581            0 :         &mut self,
     582            0 :         rewrite_layers: &[(Layer, ResidentLayer)],
     583            0 :         drop_layers: &[Layer],
     584            0 :         metrics: &TimelineMetrics,
     585            0 :     ) {
     586            0 :         self.rewrite_layers_inner(rewrite_layers, drop_layers, &[], metrics);
     587            0 :     }
     588              : 
     589           24 :     fn rewrite_layers_inner(
     590           24 :         &mut self,
     591           24 :         rewrite_layers: &[(Layer, ResidentLayer)],
     592           24 :         drop_layers: &[Layer],
     593           24 :         add_layers: &[ResidentLayer],
     594           24 :         metrics: &TimelineMetrics,
     595           24 :     ) {
     596           24 :         let mut updates = self.layer_map.batch_update();
     597           24 :         for (old_layer, new_layer) in rewrite_layers {
     598            0 :             debug_assert_eq!(
     599            0 :                 old_layer.layer_desc().key_range,
     600            0 :                 new_layer.layer_desc().key_range
     601              :             );
     602            0 :             debug_assert_eq!(
     603            0 :                 old_layer.layer_desc().lsn_range,
     604            0 :                 new_layer.layer_desc().lsn_range
     605              :             );
     606              : 
     607              :             // Transfer visibility hint from old to new layer, since the new layer covers the same key space.  This is not guaranteed to
     608              :             // be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
     609              :             // always marking rewritten layers as visible.
     610            0 :             new_layer.as_ref().set_visibility(old_layer.visibility());
     611              : 
     612              :             // Safety: we may never rewrite the same file in-place.  Callers are responsible
     613              :             // for ensuring that they only rewrite layers after something changes the path,
     614              :             // such as an increment in the generation number.
     615            0 :             assert_ne!(old_layer.local_path(), new_layer.local_path());
     616              : 
     617            0 :             Self::delete_historic_layer(old_layer, &mut updates, &mut self.layer_fmgr);
     618              : 
     619            0 :             Self::insert_historic_layer(
     620            0 :                 new_layer.as_ref().clone(),
     621            0 :                 &mut updates,
     622            0 :                 &mut self.layer_fmgr,
     623              :             );
     624              : 
     625            0 :             metrics.record_new_file_metrics(new_layer.layer_desc().file_size);
     626              :         }
     627           76 :         for l in drop_layers {
     628           52 :             Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     629           52 :         }
     630           51 :         for l in add_layers {
     631           27 :             Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     632           27 :             metrics.record_new_file_metrics(l.layer_desc().file_size);
     633           27 :         }
     634           24 :         updates.flush();
     635           24 :     }
     636              : 
     637              :     /// Called when garbage collect has selected the layers to be removed.
     638            6 :     pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
     639            6 :         let mut updates = self.layer_map.batch_update();
     640           14 :         for doomed_layer in gc_layers {
     641            8 :             Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
     642            8 :         }
     643            6 :         updates.flush()
     644            6 :     }
     645              : 
     646              :     #[cfg(test)]
     647           86 :     pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
     648           86 :         let mut updates = self.layer_map.batch_update();
     649           86 :         Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
     650           86 :         updates.flush()
     651           86 :     }
     652              : 
     653              :     /// Helper function to insert a layer into the layer map and file manager.
     654          899 :     fn insert_historic_layer(
     655          899 :         layer: Layer,
     656          899 :         updates: &mut BatchedUpdates<'_>,
     657          899 :         mapping: &mut LayerFileManager<Layer>,
     658          899 :     ) {
     659          899 :         updates.insert_historic(layer.layer_desc().clone());
     660          899 :         mapping.insert(layer);
     661          899 :     }
     662              : 
     663              :     /// Removes the layer from local FS (if present) and from memory.
     664              :     /// Remote storage is not affected by this operation.
     665          261 :     fn delete_historic_layer(
     666          261 :         // we cannot remove layers otherwise, since gc and compaction will race
     667          261 :         layer: &Layer,
     668          261 :         updates: &mut BatchedUpdates<'_>,
     669          261 :         mapping: &mut LayerFileManager<Layer>,
     670          261 :     ) {
     671          261 :         let desc = layer.layer_desc();
     672              : 
     673              :         // TODO Removing from the bottom of the layer map is expensive.
     674              :         //      Maybe instead discard all layer map historic versions that
     675              :         //      won't be needed for page reconstruction for this timeline,
     676              :         //      and mark what we can't delete yet as deleted from the layer
     677              :         //      map index without actually rebuilding the index.
     678          261 :         updates.remove_historic(desc);
     679          261 :         mapping.remove(layer);
     680          261 :         layer.delete_on_drop();
     681          261 :     }
     682              : 
     683              :     #[cfg(test)]
     684            4 :     pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
     685              :         use pageserver_api::models::InMemoryLayerInfo;
     686              : 
     687            4 :         match layer.info() {
     688              :             InMemoryLayerInfo::Open { .. } => {
     689            1 :                 assert!(self.layer_map.open_layer.is_none());
     690            1 :                 self.layer_map.open_layer = Some(layer);
     691              :             }
     692            3 :             InMemoryLayerInfo::Frozen { lsn_start, .. } => {
     693            3 :                 if let Some(last) = self.layer_map.frozen_layers.back() {
     694            1 :                     assert!(last.get_lsn_range().end <= lsn_start);
     695            2 :                 }
     696              : 
     697            3 :                 self.layer_map.frozen_layers.push_back(layer);
     698              :             }
     699              :         }
     700            4 :     }
     701              : }
     702              : 
     703              : pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
     704              : 
     705              : impl<T> Default for LayerFileManager<T> {
     706          235 :     fn default() -> Self {
     707          235 :         Self(HashMap::default())
     708          235 :     }
     709              : }
     710              : 
     711              : impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
     712          899 :     pub(crate) fn insert(&mut self, layer: T) {
     713          899 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     714          899 :         if present.is_some() && cfg!(debug_assertions) {
     715            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     716          899 :         }
     717          899 :     }
     718              : 
     719          261 :     pub(crate) fn remove(&mut self, layer: &T) {
     720          261 :         let present = self.0.remove(&layer.layer_desc().key());
     721          261 :         if present.is_none() && cfg!(debug_assertions) {
     722            0 :             panic!(
     723            0 :                 "removing layer that is not present in layer mapping: {:?}",
     724            0 :                 layer.layer_desc()
     725              :             )
     726          261 :         }
     727          261 :     }
     728              : }
        

Generated by: LCOV version 2.1-beta