LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - layer_manager.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 92.6 % 242 224
Test Date: 2023-09-06 10:18:01 Functions: 90.0 % 30 27

            Line data    Source code
       1              : use anyhow::{bail, ensure, Context, Result};
       2              : use std::{collections::HashMap, sync::Arc};
       3              : use tracing::trace;
       4              : use utils::{
       5              :     id::{TenantId, TimelineId},
       6              :     lsn::{AtomicLsn, Lsn},
       7              : };
       8              : 
       9              : use crate::{
      10              :     config::PageServerConf,
      11              :     metrics::TimelineMetrics,
      12              :     tenant::{
      13              :         layer_map::{BatchedUpdates, LayerMap},
      14              :         storage_layer::{
      15              :             AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
      16              :             PersistentLayerDesc, PersistentLayerKey,
      17              :         },
      18              :         timeline::compare_arced_layers,
      19              :     },
      20              : };
      21              : 
      22              : /// Provides semantic APIs to manipulate the layer map.
      23              : pub(crate) struct LayerManager {
      24              :     layer_map: LayerMap,
      25              :     layer_fmgr: LayerFileManager,
      26              : }
      27              : 
      28              : /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
      29              : /// scheduling deletes in remote client.
      30              : pub(crate) struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
      31              : 
      32              : impl ApplyGcResultGuard<'_> {
      33            7 :     pub(crate) fn flush(self) {
      34            7 :         self.0.flush();
      35            7 :     }
      36              : }
      37              : 
      38              : impl LayerManager {
      39         1394 :     pub(crate) fn create() -> Self {
      40         1394 :         Self {
      41         1394 :             layer_map: LayerMap::default(),
      42         1394 :             layer_fmgr: LayerFileManager::new(),
      43         1394 :         }
      44         1394 :     }
      45              : 
      46     45716425 :     pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
      47     45716425 :         self.layer_fmgr.get_from_desc(desc)
      48     45716425 :     }
      49              : 
      50              :     /// Get an immutable reference to the layer map.
      51              :     ///
      52              :     /// We expect users only to be able to get an immutable layer map. If users want to make modifications,
      53              :     /// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
      54     52824776 :     pub(crate) fn layer_map(&self) -> &LayerMap {
      55     52824776 :         &self.layer_map
      56     52824776 :     }
      57              : 
      58              :     /// Replace layers in the layer file manager, used in evictions and layer downloads.
      59         1217 :     pub(crate) fn replace_and_verify(
      60         1217 :         &mut self,
      61         1217 :         expected: Arc<dyn PersistentLayer>,
      62         1217 :         new: Arc<dyn PersistentLayer>,
      63         1217 :     ) -> Result<()> {
      64         1217 :         self.layer_fmgr.replace_and_verify(expected, new)
      65         1217 :     }
      66              : 
      67              :     /// Called from `load_layer_map`. Initialize the layer manager with:
      68              :     /// 1. all on-disk layers
      69              :     /// 2. next open layer (with disk disk_consistent_lsn LSN)
      70          326 :     pub(crate) fn initialize_local_layers(
      71          326 :         &mut self,
      72          326 :         on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
      73          326 :         next_open_layer_at: Lsn,
      74          326 :     ) {
      75          326 :         let mut updates = self.layer_map.batch_update();
      76         6666 :         for layer in on_disk_layers {
      77         6340 :             Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
      78         6340 :         }
      79          326 :         updates.flush();
      80          326 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      81          326 :     }
      82              : 
      83              :     /// Initialize when creating a new timeline, called in `init_empty_layer_map`.
      84         1042 :     pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
      85         1042 :         self.layer_map.next_open_layer_at = Some(next_open_layer_at);
      86         1042 :     }
      87              : 
      88              :     /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
      89              :     /// called within `get_layer_for_write`.
      90     82563776 :     pub(crate) fn get_layer_for_write(
      91     82563776 :         &mut self,
      92     82563776 :         lsn: Lsn,
      93     82563776 :         last_record_lsn: Lsn,
      94     82563776 :         conf: &'static PageServerConf,
      95     82563776 :         timeline_id: TimelineId,
      96     82563776 :         tenant_id: TenantId,
      97     82563776 :     ) -> Result<Arc<InMemoryLayer>> {
      98     82563776 :         ensure!(lsn.is_aligned());
      99              : 
     100     82563776 :         ensure!(
     101     82563776 :             lsn > last_record_lsn,
     102            0 :             "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
     103              :             lsn,
     104              :             last_record_lsn,
     105              :         );
     106              : 
     107              :         // Do we have a layer open for writing already?
     108     82563776 :         let layer = if let Some(open_layer) = &self.layer_map.open_layer {
     109     82556908 :             if open_layer.get_lsn_range().start > lsn {
     110            0 :                 bail!(
     111            0 :                     "unexpected open layer in the future: open layers starts at {}, write lsn {}",
     112            0 :                     open_layer.get_lsn_range().start,
     113            0 :                     lsn
     114            0 :                 );
     115     82556908 :             }
     116     82556908 : 
     117     82556908 :             Arc::clone(open_layer)
     118              :         } else {
     119              :             // No writeable layer yet. Create one.
     120         6868 :             let start_lsn = self
     121         6868 :                 .layer_map
     122         6868 :                 .next_open_layer_at
     123         6868 :                 .context("No next open layer found")?;
     124              : 
     125         6868 :             trace!(
     126            0 :                 "creating in-memory layer at {}/{} for record at {}",
     127            0 :                 timeline_id,
     128            0 :                 start_lsn,
     129            0 :                 lsn
     130            0 :             );
     131              : 
     132         6868 :             let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
     133         6868 :             let layer = Arc::new(new_layer);
     134         6868 : 
     135         6868 :             self.layer_map.open_layer = Some(layer.clone());
     136         6868 :             self.layer_map.next_open_layer_at = None;
     137         6868 : 
     138         6868 :             layer
     139              :         };
     140              : 
     141     82563776 :         Ok(layer)
     142     82563776 :     }
     143              : 
     144              :     /// Called from `freeze_inmem_layer`, returns true if successfully frozen.
     145         6753 :     pub(crate) async fn try_freeze_in_memory_layer(
     146         6753 :         &mut self,
     147         6753 :         Lsn(last_record_lsn): Lsn,
     148         6753 :         last_freeze_at: &AtomicLsn,
     149         6753 :     ) {
     150         6753 :         let end_lsn = Lsn(last_record_lsn + 1);
     151              : 
     152         6753 :         if let Some(open_layer) = &self.layer_map.open_layer {
     153         6562 :             let open_layer_rc = Arc::clone(open_layer);
     154         6562 :             // Does this layer need freezing?
     155         6562 :             open_layer.freeze(end_lsn).await;
     156              : 
     157              :             // The layer is no longer open, update the layer map to reflect this.
     158              :             // We will replace it with on-disk historics below.
     159         6562 :             self.layer_map.frozen_layers.push_back(open_layer_rc);
     160         6562 :             self.layer_map.open_layer = None;
     161         6562 :             self.layer_map.next_open_layer_at = Some(end_lsn);
     162         6562 :             last_freeze_at.store(end_lsn);
     163          191 :         }
     164         6753 :     }
     165              : 
     166              :     /// Add image layers to the layer map, called from `create_image_layers`.
     167         1435 :     pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
     168         1435 :         let mut updates = self.layer_map.batch_update();
     169         2547 :         for layer in image_layers {
     170         1112 :             Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
     171         1112 :         }
     172         1435 :         updates.flush();
     173         1435 :     }
     174              : 
     175              :     /// Flush a frozen layer and add the written delta layer to the layer map.
     176         6513 :     pub(crate) fn finish_flush_l0_layer(
     177         6513 :         &mut self,
     178         6513 :         delta_layer: Option<DeltaLayer>,
     179         6513 :         frozen_layer_for_check: &Arc<InMemoryLayer>,
     180         6513 :     ) {
     181         6513 :         let l = self.layer_map.frozen_layers.pop_front();
     182         6513 :         let mut updates = self.layer_map.batch_update();
     183         6513 : 
     184         6513 :         // Only one thread may call this function at a time (for this
     185         6513 :         // timeline). If two threads tried to flush the same frozen
     186         6513 :         // layer to disk at the same time, that would not work.
     187         6513 :         assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
     188              : 
     189         6513 :         if let Some(delta_layer) = delta_layer {
     190         6476 :             Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
     191         6476 :         }
     192         6513 :         updates.flush();
     193         6513 :     }
     194              : 
     195              :     /// Called when compaction is completed.
     196          321 :     pub(crate) fn finish_compact_l0(
     197          321 :         &mut self,
     198          321 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     199          321 :         compact_from: Vec<Arc<dyn PersistentLayer>>,
     200          321 :         compact_to: Vec<Arc<dyn PersistentLayer>>,
     201          321 :         metrics: &TimelineMetrics,
     202          321 :     ) -> Result<()> {
     203          321 :         let mut updates = self.layer_map.batch_update();
     204         8523 :         for l in compact_to {
     205         8202 :             Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
     206         8202 :         }
     207         4717 :         for l in compact_from {
     208              :             // NB: the layer file identified by descriptor `l` is guaranteed to be present
     209              :             // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
     210              :             // time, even though we dropped `Timeline::layers` inbetween.
     211         4396 :             Self::delete_historic_layer(
     212         4396 :                 layer_removal_cs.clone(),
     213         4396 :                 l,
     214         4396 :                 &mut updates,
     215         4396 :                 metrics,
     216         4396 :                 &mut self.layer_fmgr,
     217         4396 :             )?;
     218              :         }
     219          321 :         updates.flush();
     220          321 :         Ok(())
     221          321 :     }
     222              : 
     223              :     /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
     224           12 :     pub(crate) fn finish_gc_timeline(
     225           12 :         &mut self,
     226           12 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     227           12 :         gc_layers: Vec<Arc<dyn PersistentLayer>>,
     228           12 :         metrics: &TimelineMetrics,
     229           12 :     ) -> Result<ApplyGcResultGuard> {
     230           12 :         let mut updates = self.layer_map.batch_update();
     231          745 :         for doomed_layer in gc_layers {
     232          733 :             Self::delete_historic_layer(
     233          733 :                 layer_removal_cs.clone(),
     234          733 :                 doomed_layer,
     235          733 :                 &mut updates,
     236          733 :                 metrics,
     237          733 :                 &mut self.layer_fmgr,
     238          733 :             )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
     239              :         }
     240           12 :         Ok(ApplyGcResultGuard(updates))
     241           12 :     }
     242              : 
     243              :     /// Helper function to insert a layer into the layer map and file manager.
     244        22130 :     fn insert_historic_layer(
     245        22130 :         layer: Arc<dyn PersistentLayer>,
     246        22130 :         updates: &mut BatchedUpdates<'_>,
     247        22130 :         mapping: &mut LayerFileManager,
     248        22130 :     ) {
     249        22130 :         updates.insert_historic(layer.layer_desc().clone());
     250        22130 :         mapping.insert(layer);
     251        22130 :     }
     252              : 
     253              :     /// Removes the layer from local FS (if present) and from memory.
     254              :     /// Remote storage is not affected by this operation.
     255         5129 :     fn delete_historic_layer(
     256         5129 :         // we cannot remove layers otherwise, since gc and compaction will race
     257         5129 :         _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
     258         5129 :         layer: Arc<dyn PersistentLayer>,
     259         5129 :         updates: &mut BatchedUpdates<'_>,
     260         5129 :         metrics: &TimelineMetrics,
     261         5129 :         mapping: &mut LayerFileManager,
     262         5129 :     ) -> anyhow::Result<()> {
     263         5129 :         let desc = layer.layer_desc();
     264         5129 :         if !layer.is_remote_layer() {
     265         5127 :             layer.delete_resident_layer_file()?;
     266         5127 :             metrics.resident_physical_size_gauge.sub(desc.file_size);
     267            2 :         }
     268              : 
     269              :         // TODO Removing from the bottom of the layer map is expensive.
     270              :         //      Maybe instead discard all layer map historic versions that
     271              :         //      won't be needed for page reconstruction for this timeline,
     272              :         //      and mark what we can't delete yet as deleted from the layer
     273              :         //      map index without actually rebuilding the index.
     274         5129 :         updates.remove_historic(desc);
     275         5129 :         mapping.remove(layer);
     276         5129 : 
     277         5129 :         Ok(())
     278         5129 :     }
     279              : 
     280        11041 :     pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
     281        11041 :         self.layer_fmgr.contains(layer)
     282        11041 :     }
     283              : }
     284              : 
     285              : pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
     286              :     HashMap<PersistentLayerKey, Arc<T>>,
     287              : );
     288              : 
     289              : impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
     290     45716453 :     fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
     291     45716453 :         // The assumption for the `expect()` is that all code maintains the following invariant:
     292     45716453 :         // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
     293     45716453 :         self.0
     294     45716453 :             .get(&desc.key())
     295     45716453 :             .with_context(|| format!("get layer from desc: {}", desc.filename()))
     296     45716453 :             .expect("not found")
     297     45716453 :             .clone()
     298     45716453 :     }
     299              : 
     300        22133 :     pub(crate) fn insert(&mut self, layer: Arc<T>) {
     301        22133 :         let present = self.0.insert(layer.layer_desc().key(), layer.clone());
     302        22133 :         if present.is_some() && cfg!(debug_assertions) {
     303            0 :             panic!("overwriting a layer: {:?}", layer.layer_desc())
     304        22133 :         }
     305        22133 :     }
     306              : 
     307        11041 :     pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
     308        11041 :         self.0.contains_key(&layer.layer_desc().key())
     309        11041 :     }
     310              : 
     311         1398 :     pub(crate) fn new() -> Self {
     312         1398 :         Self(HashMap::new())
     313         1398 :     }
     314              : 
     315         5129 :     pub(crate) fn remove(&mut self, layer: Arc<T>) {
     316         5129 :         let present = self.0.remove(&layer.layer_desc().key());
     317         5129 :         if present.is_none() && cfg!(debug_assertions) {
     318            0 :             panic!(
     319            0 :                 "removing layer that is not present in layer mapping: {:?}",
     320            0 :                 layer.layer_desc()
     321            0 :             )
     322         5129 :         }
     323         5129 :     }
     324              : 
     325         1221 :     pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
     326         1221 :         let key = expected.layer_desc().key();
     327         1221 :         let other = new.layer_desc().key();
     328         1221 : 
     329         1221 :         let expected_l0 = LayerMap::is_l0(expected.layer_desc());
     330         1221 :         let new_l0 = LayerMap::is_l0(new.layer_desc());
     331         1221 : 
     332         1221 :         fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
     333            1 :             "layermap-replace-notfound"
     334         1221 :         ));
     335              : 
     336         1220 :         anyhow::ensure!(
     337         1220 :             key == other,
     338            0 :             "expected and new layer have different keys: {key:?} != {other:?}"
     339              :         );
     340              : 
     341         1220 :         anyhow::ensure!(
     342         1220 :             expected_l0 == new_l0,
     343            0 :             "one layer is l0 while the other is not: {expected_l0} != {new_l0}"
     344              :         );
     345              : 
     346         1220 :         if let Some(layer) = self.0.get_mut(&key) {
     347         1219 :             anyhow::ensure!(
     348         1219 :                 compare_arced_layers(&expected, layer),
     349            1 :                 "another layer was found instead of expected, expected={expected:?}, new={new:?}",
     350            1 :                 expected = Arc::as_ptr(&expected),
     351            1 :                 new = Arc::as_ptr(layer),
     352              :             );
     353         1218 :             *layer = new;
     354         1218 :             Ok(())
     355              :         } else {
     356            1 :             anyhow::bail!("layer was not found");
     357              :         }
     358         1221 :     }
     359              : }
        

Generated by: LCOV version 2.1-beta