LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 92.1 % 242 223 19 223
Current Date: 2023-10-19 02:04:12 Functions: 90.3 % 31 28 3 28
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use anyhow::{bail, ensure, Context, Result};
       2                 : use std::{collections::HashMap, sync::Arc};
       3                 : use tracing::trace;
       4                 : use utils::{
       5                 :     id::{TenantId, TimelineId},
       6                 :     lsn::{AtomicLsn, Lsn},
       7                 : };
       8                 : 
       9                 : use crate::{
      10                 :     config::PageServerConf,
      11                 :     metrics::TimelineMetrics,
      12                 :     tenant::{
      13                 :         layer_map::{BatchedUpdates, LayerMap},
      14                 :         storage_layer::{
      15                 :             AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
      16                 :             PersistentLayerDesc, PersistentLayerKey,
      17                 :         },
      18                 :         timeline::compare_arced_layers,
      19                 :     },
      20                 : };
      21                 : 
      22                 : /// Provides semantic APIs to manipulate the layer map.
      23                 : pub(crate) struct LayerManager {
      24                 :     layer_map: LayerMap,
      25                 :     layer_fmgr: LayerFileManager,
      26                 : }
      27                 : 
      28                 : /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
      29                 : /// scheduling deletes in remote client.
      30                 : pub(crate) struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
      31                 : 
      32                 : impl ApplyGcResultGuard<'_> {
      33 CBC          20 :     pub(crate) fn flush(self) {
      34              20 :         self.0.flush();
      35              20 :     }
      36                 : }
      37                 : 
      38                 : impl LayerManager {
      39            1302 :     pub(crate) fn create() -> Self {
      40            1302 :         Self {
      41            1302 :             layer_map: LayerMap::default(),
      42            1302 :             layer_fmgr: LayerFileManager::new(),
      43            1302 :         }
      44            1302 :     }
      45                 : 
      46        23869889 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
      47        23869889 :         self.layer_fmgr.get_from_desc(desc)
      48        23869889 :     }
      49                 : 
      50                 :     /// Get an immutable reference to the layer map.
      51                 :     ///
      52                 :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      53                 :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      54        30228675 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      55        30228675 :         &self.layer_map
      56        30228675 :     }
      57                 : 
      58                 :     /// Replace layers in the layer file manager, used in evictions and layer downloads.
      59            1961 :     pub(crate) fn replace_and_verify(
      60            1961 :         &mut self,
      61            1961 :         expected: Arc<dyn PersistentLayer>,
      62            1961 :         new: Arc<dyn PersistentLayer>,
      63            1961 :     ) -> Result<()> {
      64            1961 :         self.layer_fmgr.replace_and_verify(expected, new)
      65            1961 :     }
      66                 : 
      67                 :     /// Called from `load_layer_map`. Initialize the layer manager with:
      68                 :     /// 1. all on-disk layers
      69                 :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      70             314 :     pub(crate) fn initialize_local_layers(
      71             314 :         &mut self,
      72             314 :         on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
      73             314 :         next_open_layer_at: Lsn,
      74             314 :     ) {
      75             314 :         let mut updates = self.layer_map.batch_update();
      76            8392 :         for layer in on_disk_layers {
      77            8078 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      78            8078 :         }
      79             314 :         updates.flush();
      80             314 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      81             314 :     }
      82                 : 
      83                 :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      84             967 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      85             967 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      86             967 :     }
      87                 : 
      88                 :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      89                 :     /// called within `get_layer_for_write`.
      90        76639831 :     pub(crate) async fn get_layer_for_write(
      91        76639831 :         &mut self,
      92        76639831 :         lsn: Lsn,
      93        76639831 :         last_record_lsn: Lsn,
      94        76639831 :         conf: &'static PageServerConf,
      95        76639831 :         timeline_id: TimelineId,
      96        76639831 :         tenant_id: TenantId,
      97        76639945 :     ) -> Result<Arc<InMemoryLayer>> {
      98        76639945 :         ensure!(lsn.is_aligned());
      99                 : 
     100        76639945 :         ensure!(
     101        76639945 :             lsn > last_record_lsn,
     102 UBC           0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
     103                 :             lsn,
     104                 :             last_record_lsn,
     105                 :         );
     106                 : 
     107                 :         // Do we have a layer open for writing already?
     108 CBC    76639945 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     109        76634266 :             if open_layer.get_lsn_range().start > lsn {
     110 UBC           0 :                 bail!(
     111               0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     112               0 :                     open_layer.get_lsn_range().start,
     113               0 :                     lsn
     114               0 :                 );
     115 CBC    76634266 :             }
     116        76634266 : 
     117        76634266 :             Arc::clone(open_layer)
     118                 :         } else {
     119                 :             // No writeable layer yet. Create one.
     120            5679 :             let start_lsn = self
     121            5679 :                 .layer_map
     122            5679 :                 .next_open_layer_at
     123            5679 :                 .context("No next open layer found")?;
     124                 : 
     125 UBC           0 :             trace!(
     126               0 :                 "creating in-memory layer at {}/{} for record at {}",
     127               0 :                 timeline_id,
     128               0 :                 start_lsn,
     129               0 :                 lsn
     130               0 :             );
     131                 : 
     132 CBC        5679 :             let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
     133            5679 :             let layer = Arc::new(new_layer);
     134            5679 : 
     135            5679 :             self.layer_map.open_layer = Some(layer.clone());
     136            5679 :             self.layer_map.next_open_layer_at = None;
     137            5679 : 
     138            5679 :             layer
     139                 :         };
     140                 : 
     141        76639945 :         Ok(layer)
     142        76639945 :     }
     143                 : 
     144                 :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     145            5575 :     pub(crate) async fn try_freeze_in_memory_layer(
     146            5575 :         &mut self,
     147            5575 :         Lsn(last_record_lsn): Lsn,
     148            5575 :         last_freeze_at: &AtomicLsn,
     149            5575 :     ) {
     150            5575 :         let end_lsn = Lsn(last_record_lsn + 1);
     151                 : 
     152            5575 :         if let Some(open_layer) = &self.layer_map.open_layer {
     153            5385 :             let open_layer_rc = Arc::clone(open_layer);
     154            5385 :             // Does this layer need freezing?
     155            5385 :             open_layer.freeze(end_lsn).await;
     156                 : 
     157                 :             // The layer is no longer open, update the layer map to reflect this.
     158                 :             // We will replace it with on-disk historics below.
     159            5385 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     160            5385 :             self.layer_map.open_layer = None;
     161            5385 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     162            5385 :             last_freeze_at.store(end_lsn);
     163             190 :         }
     164            5575 :     }
     165                 : 
     166                 :     /// Add image layers to the layer map, called from `create_image_layers`.
     167            1296 :     pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
     168            1296 :         let mut updates = self.layer_map.batch_update();
     169            4698 :         for layer in image_layers {
     170            3402 :             Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
     171            3402 :         }
     172            1296 :         updates.flush();
     173            1296 :     }
     174                 : 
     175                 :     /// Flush a frozen layer and add the written delta layer to the layer map.
     176            5347 :     pub(crate) fn finish_flush_l0_layer(
     177            5347 :         &mut self,
     178            5347 :         delta_layer: Option<DeltaLayer>,
     179            5347 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     180            5347 :     ) {
     181            5347 :         let l = self.layer_map.frozen_layers.pop_front();
     182            5347 :         let mut updates = self.layer_map.batch_update();
     183            5347 : 
     184            5347 :         // Only one thread may call this function at a time (for this
     185            5347 :         // timeline). If two threads tried to flush the same frozen
     186            5347 :         // layer to disk at the same time, that would not work.
     187            5347 :         assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
     188                 : 
     189            5347 :         if let Some(delta_layer) = delta_layer {
     190            5308 :             Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
     191            5308 :         }
     192            5347 :         updates.flush();
     193            5347 :     }
     194                 : 
     195                 :     /// Called when compaction is completed.
     196             304 :     pub(crate) fn finish_compact_l0(
     197             304 :         &mut self,
     198             304 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     199             304 :         compact_from: Vec<Arc<dyn PersistentLayer>>,
     200             304 :         compact_to: Vec<Arc<dyn PersistentLayer>>,
     201             304 :         metrics: &TimelineMetrics,
     202             304 :     ) -> Result<()> {
     203             304 :         let mut updates = self.layer_map.batch_update();
     204           10166 :         for l in compact_to {
     205            9862 :             Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     206            9862 :         }
     207            4584 :         for l in compact_from {
     208                 :             // NB: the layer file identified by descriptor `l` is guaranteed to be present
     209                 :             // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
     210                 :             // time, even though we dropped `Timeline::layers` inbetween.
     211            4280 :             Self::delete_historic_layer(
     212            4280 :                 layer_removal_cs.clone(),
     213            4280 :                 l,
     214            4280 :                 &mut updates,
     215            4280 :                 metrics,
     216            4280 :                 &mut self.layer_fmgr,
     217            4280 :             )?;
     218                 :         }
     219             304 :         updates.flush();
     220             304 :         Ok(())
     221             304 :     }
     222                 : 
     223                 :     /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
     224              25 :     pub(crate) fn finish_gc_timeline(
     225              25 :         &mut self,
     226              25 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     227              25 :         gc_layers: Vec<Arc<dyn PersistentLayer>>,
     228              25 :         metrics: &TimelineMetrics,
     229              25 :     ) -> Result<ApplyGcResultGuard> {
     230              25 :         let mut updates = self.layer_map.batch_update();
     231            3294 :         for doomed_layer in gc_layers {
     232            3269 :             Self::delete_historic_layer(
     233            3269 :                 layer_removal_cs.clone(),
     234            3269 :                 doomed_layer,
     235            3269 :                 &mut updates,
     236            3269 :                 metrics,
     237            3269 :                 &mut self.layer_fmgr,
     238            3269 :             )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
     239                 :         }
     240              25 :         Ok(ApplyGcResultGuard(updates))
     241              25 :     }
     242                 : 
     243                 :     /// Helper function to insert a layer into the layer map and file manager.
     244           26650 :     fn insert_historic_layer(
     245           26650 :         layer: Arc<dyn PersistentLayer>,
     246           26650 :         updates: &mut BatchedUpdates<'_>,
     247           26650 :         mapping: &mut LayerFileManager,
     248           26650 :     ) {
     249           26650 :         updates.insert_historic(layer.layer_desc().clone());
     250           26650 :         mapping.insert(layer);
     251           26650 :     }
     252                 : 
     253                 :     /// Removes the layer from local FS (if present) and from memory.
     254                 :     /// Remote storage is not affected by this operation.
     255            7549 :     fn delete_historic_layer(
     256            7549 :         // we cannot remove layers otherwise, since gc and compaction will race
     257            7549 :         _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     258            7549 :         layer: Arc<dyn PersistentLayer>,
     259            7549 :         updates: &mut BatchedUpdates<'_>,
     260            7549 :         metrics: &TimelineMetrics,
     261            7549 :         mapping: &mut LayerFileManager,
     262            7549 :     ) -> anyhow::Result<()> {
     263            7549 :         let desc = layer.layer_desc();
     264            7549 :         if !layer.is_remote_layer() {
     265            6851 :             layer.delete_resident_layer_file()?;
     266            6851 :             metrics.resident_physical_size_sub(desc.file_size);
     267             698 :         }
     268                 : 
     269                 :         // TODO Removing from the bottom of the layer map is expensive.
     270                 :         //      Maybe instead discard all layer map historic versions that
     271                 :         //      won't be needed for page reconstruction for this timeline,
     272                 :         //      and mark what we can't delete yet as deleted from the layer
     273                 :         //      map index without actually rebuilding the index.
     274            7549 :         updates.remove_historic(desc);
     275            7549 :         mapping.remove(layer);
     276            7549 : 
     277            7549 :         Ok(())
     278            7549 :     }
     279                 : 
     280            9890 :     pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
     281            9890 :         self.layer_fmgr.contains(layer)
     282            9890 :     }
     283                 : }
     284                 : 
     285                 : pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
     286                 :     HashMap<PersistentLayerKey, Arc<T>>,
     287                 : );
     288                 : 
     289                 : impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
     290        23869909 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
     291        23869909 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     292        23869909 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     293        23869909 :         self.0
     294        23869909 :             .get(&desc.key())
     295        23869909 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     296        23869909 :             .expect("not found")
     297        23869909 :             .clone()
     298        23869909 :     }
     299                 : 
     300           26653 :     pub(crate) fn insert(&mut self, layer: Arc<T>) {
     301           26653 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     302           26653 :         if present.is_some() && cfg!(debug_assertions) {
     303 UBC           0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     304 CBC       26653 :         }
     305           26653 :     }
     306                 : 
     307            9890 :     pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
     308            9890 :         self.0.contains_key(&layer.layer_desc().key())
     309            9890 :     }
     310                 : 
     311            1306 :     pub(crate) fn new() -> Self {
     312            1306 :         Self(HashMap::new())
     313            1306 :     }
     314                 : 
     315            7549 :     pub(crate) fn remove(&mut self, layer: Arc<T>) {
     316            7549 :         let present = self.0.remove(&layer.layer_desc().key());
     317            7549 :         if present.is_none() && cfg!(debug_assertions) {
     318 UBC           0 :             panic!(
     319               0 :                 "removing layer that is not present in layer mapping: {:?}",
     320               0 :                 layer.layer_desc()
     321               0 :             )
     322 CBC        7549 :         }
     323            7549 :     }
     324                 : 
     325            1965 :     pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
     326            1965 :         let key = expected.layer_desc().key();
     327            1965 :         let other = new.layer_desc().key();
     328            1965 : 
     329            1965 :         let expected_l0 = LayerMap::is_l0(expected.layer_desc());
     330            1965 :         let new_l0 = LayerMap::is_l0(new.layer_desc());
     331            1965 : 
     332            1965 :         fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
     333               1 :             "layermap-replace-notfound"
     334            1965 :         ));
     335                 : 
     336            1964 :         anyhow::ensure!(
     337            1964 :             key == other,
     338 UBC           0 :             "expected and new layer have different keys: {key:?} != {other:?}"
     339                 :         );
     340                 : 
     341 CBC        1964 :         anyhow::ensure!(
     342            1964 :             expected_l0 == new_l0,
     343 UBC           0 :             "one layer is l0 while the other is not: {expected_l0} != {new_l0}"
     344                 :         );
     345                 : 
     346 CBC        1964 :         if let Some(layer) = self.0.get_mut(&key) {
     347            1963 :             anyhow::ensure!(
     348            1963 :                 compare_arced_layers(&expected, layer),
     349               1 :                 "another layer was found instead of expected, expected={expected:?}, new={new:?}",
     350               1 :                 expected = Arc::as_ptr(&expected),
     351               1 :                 new = Arc::as_ptr(layer),
     352                 :             );
     353            1962 :             *layer = new;
     354            1962 :             Ok(())
     355                 :         } else {
     356               1 :             anyhow::bail!("layer was not found");
     357                 :         }
     358            1965 :     }
     359                 : }
        

Generated by: LCOV version 2.1-beta