LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 54.5 % 826 450
Test Date: 2024-04-18 15:32:49 Functions: 20.0 % 100 20

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};
      13              : 
      14              : use anyhow::{anyhow, Context};
      15              : use enumset::EnumSet;
      16              : use fail::fail_point;
      17              : use itertools::Itertools;
      18              : use pageserver_api::shard::TenantShardId;
      19              : use tokio_util::sync::CancellationToken;
      20              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      21              : use utils::id::TimelineId;
      22              : 
      23              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      24              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      25              : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
      26              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      27              : use crate::tenant::timeline::{Layer, ResidentLayer};
      28              : use crate::tenant::DeltaLayer;
      29              : use crate::tenant::PageReconstructError;
      30              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      31              : use crate::{page_cache, ZERO_PAGE};
      32              : 
      33              : use crate::keyspace::KeySpace;
      34              : use crate::repository::Key;
      35              : 
      36              : use utils::lsn::Lsn;
      37              : 
      38              : use pageserver_compaction::helpers::overlaps_with;
      39              : use pageserver_compaction::interface::*;
      40              : 
      41              : use super::CompactionError;
      42              : 
      43              : impl Timeline {
      44              :     /// TODO: cancellation
      45          510 :     pub(crate) async fn compact_legacy(
      46          510 :         self: &Arc<Self>,
      47          510 :         _cancel: &CancellationToken,
      48          510 :         flags: EnumSet<CompactFlags>,
      49          510 :         ctx: &RequestContext,
      50          510 :     ) -> Result<(), CompactionError> {
      51          510 :         // High level strategy for compaction / image creation:
      52          510 :         //
      53          510 :         // 1. First, calculate the desired "partitioning" of the
      54          510 :         // currently in-use key space. The goal is to partition the
      55          510 :         // key space into roughly fixed-size chunks, but also take into
      56          510 :         // account any existing image layers, and try to align the
      57          510 :         // chunk boundaries with the existing image layers to avoid
      58          510 :         // too much churn. Also try to align chunk boundaries with
      59          510 :         // relation boundaries.  In principle, we don't know about
      60          510 :         // relation boundaries here, we just deal with key-value
      61          510 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      62          510 :         // map relations into key-value pairs. But in practice we know
      63          510 :         // that 'field6' is the block number, and the fields 1-5
      64          510 :         // identify a relation. This is just an optimization,
      65          510 :         // though.
      66          510 :         //
      67          510 :         // 2. Once we know the partitioning, for each partition,
      68          510 :         // decide if it's time to create a new image layer. The
      69          510 :         // criteria is: there has been too much "churn" since the last
      70          510 :         // image layer? The "churn" is fuzzy concept, it's a
      71          510 :         // combination of too many delta files, or too much WAL in
      72          510 :         // total in the delta file. Or perhaps: if creating an image
      73          510 :         // file would allow to delete some older files.
      74          510 :         //
      75          510 :         // 3. After that, we compact all level0 delta files if there
      76          510 :         // are too many of them.  While compacting, we also garbage
      77          510 :         // collect any page versions that are no longer needed because
      78          510 :         // of the new image layers we created in step 2.
      79          510 :         //
      80          510 :         // TODO: This high level strategy hasn't been implemented yet.
      81          510 :         // Below are functions compact_level0() and create_image_layers()
      82          510 :         // but they are a bit ad hoc and don't quite work like it's explained
      83          510 :         // above. Rewrite it.
      84          510 : 
      85          510 :         // Is the timeline being deleted?
      86          510 :         if self.is_stopping() {
      87            0 :             trace!("Dropping out of compaction on timeline shutdown");
      88            0 :             return Err(CompactionError::ShuttingDown);
      89          510 :         }
      90          510 : 
      91          510 :         let target_file_size = self.get_checkpoint_distance();
      92          510 : 
      93          510 :         // Define partitioning schema if needed
      94          510 : 
      95          510 :         // FIXME: the match should only cover repartitioning, not the next steps
      96          510 :         match self
      97          510 :             .repartition(
      98          510 :                 self.get_last_record_lsn(),
      99          510 :                 self.get_compaction_target_size(),
     100          510 :                 flags,
     101          510 :                 ctx,
     102          510 :             )
     103        13300 :             .await
     104              :         {
     105          510 :             Ok((partitioning, lsn)) => {
     106          510 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     107          510 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     108          510 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     109          510 :                     .build();
     110          510 : 
     111          510 :                 // 2. Compact
     112          510 :                 let timer = self.metrics.compact_time_histo.start_timer();
     113        76358 :                 self.compact_level0(target_file_size, ctx).await?;
     114          510 :                 timer.stop_and_record();
     115              : 
     116              :                 // 3. Create new image layers for partitions that have been modified
     117              :                 // "enough".
     118          510 :                 let layers = self
     119          510 :                     .create_image_layers(
     120          510 :                         &partitioning,
     121          510 :                         lsn,
     122          510 :                         flags.contains(CompactFlags::ForceImageLayerCreation),
     123          510 :                         &image_ctx,
     124          510 :                     )
     125            1 :                     .await
     126          510 :                     .map_err(anyhow::Error::from)?;
     127              : 
     128          510 :                 self.upload_new_image_layers(layers)?;
     129              :             }
     130            0 :             Err(err) => {
     131            0 :                 // no partitioning? This is normal, if the timeline was just created
     132            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     133            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     134            0 :                 // error but continue.
     135            0 :                 //
     136            0 :                 // Suppress error when it's due to cancellation
     137            0 :                 if !self.cancel.is_cancelled() {
     138            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     139            0 :                 }
     140              :             }
     141              :         };
     142              : 
     143          510 :         Ok(())
     144          510 :     }
     145              : 
     146              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     147              :     /// as Level 1 files.
     148          510 :     async fn compact_level0(
     149          510 :         self: &Arc<Self>,
     150          510 :         target_file_size: u64,
     151          510 :         ctx: &RequestContext,
     152          510 :     ) -> Result<(), CompactionError> {
     153              :         let CompactLevel0Phase1Result {
     154          510 :             new_layers,
     155          510 :             deltas_to_compact,
     156              :         } = {
     157          510 :             let phase1_span = info_span!("compact_level0_phase1");
     158          510 :             let ctx = ctx.attached_child();
     159          510 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     160          510 :                 version: Some(2),
     161          510 :                 tenant_id: Some(self.tenant_shard_id),
     162          510 :                 timeline_id: Some(self.timeline_id),
     163          510 :                 ..Default::default()
     164          510 :             };
     165          510 : 
     166          510 :             let begin = tokio::time::Instant::now();
     167          510 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     168          510 :             let now = tokio::time::Instant::now();
     169          510 :             stats.read_lock_acquisition_micros =
     170          510 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     171          510 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     172          510 :                 .instrument(phase1_span)
     173        76358 :                 .await?
     174              :         };
     175              : 
     176          510 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     177              :             // nothing to do
     178          468 :             return Ok(());
     179           42 :         }
     180           42 : 
     181           42 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     182            0 :             .await?;
     183           42 :         Ok(())
     184          510 :     }
     185              : 
     186              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     187          510 :     async fn compact_level0_phase1(
     188          510 :         self: &Arc<Self>,
     189          510 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     190          510 :         mut stats: CompactLevel0Phase1StatsBuilder,
     191          510 :         target_file_size: u64,
     192          510 :         ctx: &RequestContext,
     193          510 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     194          510 :         stats.read_lock_held_spawn_blocking_startup_micros =
     195          510 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     196          510 :         let layers = guard.layer_map();
     197          510 :         let level0_deltas = layers.get_level0_deltas()?;
     198          510 :         let mut level0_deltas = level0_deltas
     199          510 :             .into_iter()
     200         2362 :             .map(|x| guard.get_from_desc(&x))
     201          510 :             .collect_vec();
     202          510 :         stats.level0_deltas_count = Some(level0_deltas.len());
     203          510 :         // Only compact if enough layers have accumulated.
     204          510 :         let threshold = self.get_compaction_threshold();
     205          510 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     206          468 :             debug!(
     207            0 :                 level0_deltas = level0_deltas.len(),
     208            0 :                 threshold, "too few deltas to compact"
     209            0 :             );
     210          468 :             return Ok(CompactLevel0Phase1Result::default());
     211           42 :         }
     212           42 : 
     213           42 :         // This failpoint is used together with `test_duplicate_layers` integration test.
     214           42 :         // It returns the compaction result exactly the same layers as input to compaction.
     215           42 :         // We want to ensure that this will not cause any problem when updating the layer map
     216           42 :         // after the compaction is finished.
     217           42 :         //
     218           42 :         // Currently, there are two rare edge cases that will cause duplicated layers being
     219           42 :         // inserted.
     220           42 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
     221           42 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
     222           42 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
     223           42 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
     224           42 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
     225           42 :         //    layer replace instead of the normal remove / upload process.
     226           42 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
     227           42 :         //    size length. Compaction will likely create the same set of n files afterwards.
     228           42 :         //
     229           42 :         // This failpoint is a superset of both of the cases.
     230           42 :         if cfg!(feature = "testing") {
     231           42 :             let active = (|| {
     232           42 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
     233           42 :                 false
     234           42 :             })();
     235           42 : 
     236           42 :             if active {
     237            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
     238            0 :                 for delta in &level0_deltas {
     239              :                     // we are just faking these layers as being produced again for this failpoint
     240            0 :                     new_layers.push(
     241            0 :                         delta
     242            0 :                             .download_and_keep_resident()
     243            0 :                             .await
     244            0 :                             .context("download layer for failpoint")?,
     245              :                     );
     246              :                 }
     247            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
     248            0 :                 return Ok(CompactLevel0Phase1Result {
     249            0 :                     new_layers,
     250            0 :                     deltas_to_compact: level0_deltas,
     251            0 :                 });
     252           42 :             }
     253            0 :         }
     254              : 
     255              :         // Gather the files to compact in this iteration.
     256              :         //
     257              :         // Start with the oldest Level 0 delta file, and collect any other
     258              :         // level 0 files that form a contiguous sequence, such that the end
     259              :         // LSN of previous file matches the start LSN of the next file.
     260              :         //
     261              :         // Note that if the files don't form such a sequence, we might
     262              :         // "compact" just a single file. That's a bit pointless, but it allows
     263              :         // us to get rid of the level 0 file, and compact the other files on
     264              :         // the next iteration. This could probably made smarter, but such
     265              :         // "gaps" in the sequence of level 0 files should only happen in case
     266              :         // of a crash, partial download from cloud storage, or something like
     267              :         // that, so it's not a big deal in practice.
     268          800 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     269           42 :         let mut level0_deltas_iter = level0_deltas.iter();
     270           42 : 
     271           42 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     272           42 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     273           42 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     274           42 : 
     275           42 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     276          442 :         for l in level0_deltas_iter {
     277          400 :             let lsn_range = &l.layer_desc().lsn_range;
     278          400 : 
     279          400 :             if lsn_range.start != prev_lsn_end {
     280            0 :                 break;
     281          400 :             }
     282          400 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     283          400 :             prev_lsn_end = lsn_range.end;
     284              :         }
     285           42 :         let lsn_range = Range {
     286           42 :             start: deltas_to_compact
     287           42 :                 .first()
     288           42 :                 .unwrap()
     289           42 :                 .layer_desc()
     290           42 :                 .lsn_range
     291           42 :                 .start,
     292           42 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     293           42 :         };
     294           42 : 
     295           42 :         info!(
     296           42 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     297           42 :             lsn_range.start,
     298           42 :             lsn_range.end,
     299           42 :             deltas_to_compact.len(),
     300           42 :             level0_deltas.len()
     301           42 :         );
     302              : 
     303          442 :         for l in deltas_to_compact.iter() {
     304          442 :             info!("compact includes {l}");
     305              :         }
     306              : 
     307              :         // We don't need the original list of layers anymore. Drop it so that
     308              :         // we don't accidentally use it later in the function.
     309           42 :         drop(level0_deltas);
     310           42 : 
     311           42 :         stats.read_lock_held_prerequisites_micros = stats
     312           42 :             .read_lock_held_spawn_blocking_startup_micros
     313           42 :             .till_now();
     314           42 : 
     315           42 :         // Determine N largest holes where N is number of compacted layers.
     316           42 :         let max_holes = deltas_to_compact.len();
     317           42 :         let last_record_lsn = self.get_last_record_lsn();
     318           42 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     319           42 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     320           42 : 
     321           42 :         // min-heap (reserve space for one more element added before eviction)
     322           42 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     323           42 :         let mut prev: Option<Key> = None;
     324           42 : 
     325           42 :         let mut all_keys = Vec::new();
     326              : 
     327          442 :         for l in deltas_to_compact.iter() {
     328         3549 :             all_keys.extend(l.load_keys(ctx).await?);
     329              :         }
     330              : 
     331              :         // FIXME: should spawn_blocking the rest of this function
     332              : 
     333              :         // The current stdlib sorting implementation is designed in a way where it is
     334              :         // particularly fast where the slice is made up of sorted sub-ranges.
     335      6971354 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     336           42 : 
     337           42 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     338              : 
     339      3121998 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     340      3121998 :             if let Some(prev_key) = prev {
     341              :                 // just first fast filter
     342      3121956 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
     343            0 :                     let key_range = prev_key..next_key;
     344            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     345            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     346            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     347            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     348            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     349            0 :                     if coverage_size >= min_hole_coverage_size {
     350            0 :                         heap.push(Hole {
     351            0 :                             key_range,
     352            0 :                             coverage_size,
     353            0 :                         });
     354            0 :                         if heap.len() > max_holes {
     355            0 :                             heap.pop(); // remove smallest hole
     356            0 :                         }
     357            0 :                     }
     358      3121956 :                 }
     359           42 :             }
     360      3121998 :             prev = Some(next_key.next());
     361              :         }
     362           42 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     363           42 :         drop_rlock(guard);
     364           42 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     365           42 :         let mut holes = heap.into_vec();
     366           42 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     367           42 :         let mut next_hole = 0; // index of next hole in holes vector
     368           42 : 
     369           42 :         // This iterator walks through all key-value pairs from all the layers
     370           42 :         // we're compacting, in key, LSN order.
     371           42 :         let all_values_iter = all_keys.iter();
     372           42 : 
     373           42 :         // This iterator walks through all keys and is needed to calculate size used by each key
     374           42 :         let mut all_keys_iter = all_keys
     375           42 :             .iter()
     376      3121998 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     377      3121956 :             .coalesce(|mut prev, cur| {
     378      3121956 :                 // Coalesce keys that belong to the same key pair.
     379      3121956 :                 // This ensures that compaction doesn't put them
     380      3121956 :                 // into different layer files.
     381      3121956 :                 // Still limit this by the target file size,
     382      3121956 :                 // so that we keep the size of the files in
     383      3121956 :                 // check.
     384      3121956 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     385        92000 :                     prev.2 += cur.2;
     386        92000 :                     Ok(prev)
     387              :                 } else {
     388      3029956 :                     Err((prev, cur))
     389              :                 }
     390      3121956 :             });
     391           42 : 
     392           42 :         // Merge the contents of all the input delta layers into a new set
     393           42 :         // of delta layers, based on the current partitioning.
     394           42 :         //
     395           42 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     396           42 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     397           42 :         // would be too large. In that case, we also split on the LSN dimension.
     398           42 :         //
     399           42 :         // LSN
     400           42 :         //  ^
     401           42 :         //  |
     402           42 :         //  | +-----------+            +--+--+--+--+
     403           42 :         //  | |           |            |  |  |  |  |
     404           42 :         //  | +-----------+            |  |  |  |  |
     405           42 :         //  | |           |            |  |  |  |  |
     406           42 :         //  | +-----------+     ==>    |  |  |  |  |
     407           42 :         //  | |           |            |  |  |  |  |
     408           42 :         //  | +-----------+            |  |  |  |  |
     409           42 :         //  | |           |            |  |  |  |  |
     410           42 :         //  | +-----------+            +--+--+--+--+
     411           42 :         //  |
     412           42 :         //  +--------------> key
     413           42 :         //
     414           42 :         //
     415           42 :         // If one key (X) has a lot of page versions:
     416           42 :         //
     417           42 :         // LSN
     418           42 :         //  ^
     419           42 :         //  |                                 (X)
     420           42 :         //  | +-----------+            +--+--+--+--+
     421           42 :         //  | |           |            |  |  |  |  |
     422           42 :         //  | +-----------+            |  |  +--+  |
     423           42 :         //  | |           |            |  |  |  |  |
     424           42 :         //  | +-----------+     ==>    |  |  |  |  |
     425           42 :         //  | |           |            |  |  +--+  |
     426           42 :         //  | +-----------+            |  |  |  |  |
     427           42 :         //  | |           |            |  |  |  |  |
     428           42 :         //  | +-----------+            +--+--+--+--+
     429           42 :         //  |
     430           42 :         //  +--------------> key
     431           42 :         // TODO: this actually divides the layers into fixed-size chunks, not
     432           42 :         // based on the partitioning.
     433           42 :         //
     434           42 :         // TODO: we should also opportunistically materialize and
     435           42 :         // garbage collect what we can.
     436           42 :         let mut new_layers = Vec::new();
     437           42 :         let mut prev_key: Option<Key> = None;
     438           42 :         let mut writer: Option<DeltaLayerWriter> = None;
     439           42 :         let mut key_values_total_size = 0u64;
     440           42 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     441           42 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     442              : 
     443              :         for &DeltaEntry {
     444      3121998 :             key, lsn, ref val, ..
     445      3122040 :         } in all_values_iter
     446              :         {
     447      3121998 :             let value = val.load(ctx).await?;
     448      3121998 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     449      3121998 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     450      3121998 :             if !same_key || lsn == dup_end_lsn {
     451      3029998 :                 let mut next_key_size = 0u64;
     452      3029998 :                 let is_dup_layer = dup_end_lsn.is_valid();
     453      3029998 :                 dup_start_lsn = Lsn::INVALID;
     454      3029998 :                 if !same_key {
     455      3029998 :                     dup_end_lsn = Lsn::INVALID;
     456      3029998 :                 }
     457              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     458      3029998 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     459      3029998 :                     next_key_size = next_size;
     460      3029998 :                     if key != next_key {
     461      3029956 :                         if dup_end_lsn.is_valid() {
     462            0 :                             // We are writting segment with duplicates:
     463            0 :                             // place all remaining values of this key in separate segment
     464            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     465            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     466      3029956 :                         }
     467      3029956 :                         break;
     468           42 :                     }
     469           42 :                     key_values_total_size += next_size;
     470           42 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     471           42 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     472           42 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     473              :                         // Split key between multiple layers: such layer can contain only single key
     474            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     475            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     476              :                         } else {
     477            0 :                             lsn // start with the first LSN for this key
     478              :                         };
     479            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     480            0 :                         break;
     481           42 :                     }
     482              :                 }
     483              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     484      3029998 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     485            0 :                     dup_start_lsn = dup_end_lsn;
     486            0 :                     dup_end_lsn = lsn_range.end;
     487      3029998 :                 }
     488      3029998 :                 if writer.is_some() {
     489      3029956 :                     let written_size = writer.as_mut().unwrap().size();
     490      3029956 :                     let contains_hole =
     491      3029956 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     492              :                     // check if key cause layer overflow or contains hole...
     493      3029956 :                     if is_dup_layer
     494      3029956 :                         || dup_end_lsn.is_valid()
     495      3029956 :                         || written_size + key_values_total_size > target_file_size
     496      3029756 :                         || contains_hole
     497              :                     {
     498              :                         // ... if so, flush previous layer and prepare to write new one
     499          200 :                         new_layers.push(
     500          200 :                             writer
     501          200 :                                 .take()
     502          200 :                                 .unwrap()
     503          200 :                                 .finish(prev_key.unwrap().next(), self)
     504          508 :                                 .await?,
     505              :                         );
     506          200 :                         writer = None;
     507          200 : 
     508          200 :                         if contains_hole {
     509            0 :                             // skip hole
     510            0 :                             next_hole += 1;
     511          200 :                         }
     512      3029756 :                     }
     513           42 :                 }
     514              :                 // Remember size of key value because at next iteration we will access next item
     515      3029998 :                 key_values_total_size = next_key_size;
     516        92000 :             }
     517      3121998 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     518            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     519            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     520            0 :                 )))
     521      3121998 :             });
     522              : 
     523      3121998 :             if !self.shard_identity.is_key_disposable(&key) {
     524      3121998 :                 if writer.is_none() {
     525          242 :                     // Create writer if not initiaized yet
     526          242 :                     writer = Some(
     527              :                         DeltaLayerWriter::new(
     528          242 :                             self.conf,
     529          242 :                             self.timeline_id,
     530          242 :                             self.tenant_shard_id,
     531          242 :                             key,
     532          242 :                             if dup_end_lsn.is_valid() {
     533              :                                 // this is a layer containing slice of values of the same key
     534            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     535            0 :                                 dup_start_lsn..dup_end_lsn
     536              :                             } else {
     537          242 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     538          242 :                                 lsn_range.clone()
     539              :                             },
     540              :                         )
     541          121 :                         .await?,
     542              :                     );
     543      3121756 :                 }
     544              : 
     545      3121998 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
     546              :             } else {
     547            0 :                 debug!(
     548            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     549            0 :                     key,
     550            0 :                     self.shard_identity.get_shard_number(&key)
     551            0 :                 );
     552              :             }
     553              : 
     554      3121998 :             if !new_layers.is_empty() {
     555        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     556      3102212 :             }
     557              : 
     558      3121998 :             prev_key = Some(key);
     559              :         }
     560           42 :         if let Some(writer) = writer {
     561         3009 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
     562            0 :         }
     563              : 
     564              :         // Sync layers
     565           42 :         if !new_layers.is_empty() {
     566              :             // Print a warning if the created layer is larger than double the target size
     567              :             // Add two pages for potential overhead. This should in theory be already
     568              :             // accounted for in the target calculation, but for very small targets,
     569              :             // we still might easily hit the limit otherwise.
     570           42 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     571          242 :             for layer in new_layers.iter() {
     572          242 :                 if layer.layer_desc().file_size > warn_limit {
     573            0 :                     warn!(
     574            0 :                         %layer,
     575            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     576            0 :                     );
     577          242 :                 }
     578              :             }
     579              : 
     580              :             // The writer.finish() above already did the fsync of the inodes.
     581              :             // We just need to fsync the directory in which these inodes are linked,
     582              :             // which we know to be the timeline directory.
     583              :             //
     584              :             // We use fatal_err() below because the after writer.finish() returns with success,
     585              :             // the in-memory state of the filesystem already has the layer file in its final place,
     586              :             // and subsequent pageserver code could think it's durable while it really isn't.
     587           42 :             let timeline_dir = VirtualFile::open(
     588           42 :                 &self
     589           42 :                     .conf
     590           42 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     591           42 :             )
     592           21 :             .await
     593           42 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     594           42 :             timeline_dir
     595           42 :                 .sync_all()
     596           21 :                 .await
     597           42 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     598            0 :         }
     599              : 
     600           42 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     601           42 :         stats.new_deltas_count = Some(new_layers.len());
     602          242 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     603           42 : 
     604           42 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     605           42 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     606              :         {
     607           42 :             Ok(stats_json) => {
     608           42 :                 info!(
     609           42 :                     stats_json = stats_json.as_str(),
     610           42 :                     "compact_level0_phase1 stats available"
     611           42 :                 )
     612              :             }
     613            0 :             Err(e) => {
     614            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     615              :             }
     616              :         }
     617              : 
     618           42 :         Ok(CompactLevel0Phase1Result {
     619           42 :             new_layers,
     620           42 :             deltas_to_compact: deltas_to_compact
     621           42 :                 .into_iter()
     622          442 :                 .map(|x| x.drop_eviction_guard())
     623           42 :                 .collect::<Vec<_>>(),
     624           42 :         })
     625          510 :     }
     626              : }
     627              : 
     628              : #[derive(Default)]
     629              : struct CompactLevel0Phase1Result {
     630              :     new_layers: Vec<ResidentLayer>,
     631              :     deltas_to_compact: Vec<Layer>,
     632              : }
     633              : 
     634              : #[derive(Default)]
     635              : struct CompactLevel0Phase1StatsBuilder {
     636              :     version: Option<u64>,
     637              :     tenant_id: Option<TenantShardId>,
     638              :     timeline_id: Option<TimelineId>,
     639              :     read_lock_acquisition_micros: DurationRecorder,
     640              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     641              :     read_lock_held_key_sort_micros: DurationRecorder,
     642              :     read_lock_held_prerequisites_micros: DurationRecorder,
     643              :     read_lock_held_compute_holes_micros: DurationRecorder,
     644              :     read_lock_drop_micros: DurationRecorder,
     645              :     write_layer_files_micros: DurationRecorder,
     646              :     level0_deltas_count: Option<usize>,
     647              :     new_deltas_count: Option<usize>,
     648              :     new_deltas_size: Option<u64>,
     649              : }
     650              : 
     651              : #[derive(serde::Serialize)]
     652              : struct CompactLevel0Phase1Stats {
     653              :     version: u64,
     654              :     tenant_id: TenantShardId,
     655              :     timeline_id: TimelineId,
     656              :     read_lock_acquisition_micros: RecordedDuration,
     657              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     658              :     read_lock_held_key_sort_micros: RecordedDuration,
     659              :     read_lock_held_prerequisites_micros: RecordedDuration,
     660              :     read_lock_held_compute_holes_micros: RecordedDuration,
     661              :     read_lock_drop_micros: RecordedDuration,
     662              :     write_layer_files_micros: RecordedDuration,
     663              :     level0_deltas_count: usize,
     664              :     new_deltas_count: usize,
     665              :     new_deltas_size: u64,
     666              : }
     667              : 
     668              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     669              :     type Error = anyhow::Error;
     670              : 
     671           42 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     672           42 :         Ok(Self {
     673           42 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     674           42 :             tenant_id: value
     675           42 :                 .tenant_id
     676           42 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     677           42 :             timeline_id: value
     678           42 :                 .timeline_id
     679           42 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     680           42 :             read_lock_acquisition_micros: value
     681           42 :                 .read_lock_acquisition_micros
     682           42 :                 .into_recorded()
     683           42 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     684           42 :             read_lock_held_spawn_blocking_startup_micros: value
     685           42 :                 .read_lock_held_spawn_blocking_startup_micros
     686           42 :                 .into_recorded()
     687           42 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     688           42 :             read_lock_held_key_sort_micros: value
     689           42 :                 .read_lock_held_key_sort_micros
     690           42 :                 .into_recorded()
     691           42 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     692           42 :             read_lock_held_prerequisites_micros: value
     693           42 :                 .read_lock_held_prerequisites_micros
     694           42 :                 .into_recorded()
     695           42 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     696           42 :             read_lock_held_compute_holes_micros: value
     697           42 :                 .read_lock_held_compute_holes_micros
     698           42 :                 .into_recorded()
     699           42 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     700           42 :             read_lock_drop_micros: value
     701           42 :                 .read_lock_drop_micros
     702           42 :                 .into_recorded()
     703           42 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     704           42 :             write_layer_files_micros: value
     705           42 :                 .write_layer_files_micros
     706           42 :                 .into_recorded()
     707           42 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     708           42 :             level0_deltas_count: value
     709           42 :                 .level0_deltas_count
     710           42 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     711           42 :             new_deltas_count: value
     712           42 :                 .new_deltas_count
     713           42 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     714           42 :             new_deltas_size: value
     715           42 :                 .new_deltas_size
     716           42 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     717              :         })
     718           42 :     }
     719              : }
     720              : 
     721              : impl Timeline {
     722              :     /// Entry point for new tiered compaction algorithm.
     723              :     ///
     724              :     /// All the real work is in the implementation in the pageserver_compaction
     725              :     /// crate. The code here would apply to any algorithm implemented by the
     726              :     /// same interface, but tiered is the only one at the moment.
     727              :     ///
     728              :     /// TODO: cancellation
     729            0 :     pub(crate) async fn compact_tiered(
     730            0 :         self: &Arc<Self>,
     731            0 :         _cancel: &CancellationToken,
     732            0 :         ctx: &RequestContext,
     733            0 :     ) -> Result<(), CompactionError> {
     734            0 :         let fanout = self.get_compaction_threshold() as u64;
     735            0 :         let target_file_size = self.get_checkpoint_distance();
     736              : 
     737              :         // Find the top of the historical layers
     738            0 :         let end_lsn = {
     739            0 :             let guard = self.layers.read().await;
     740            0 :             let layers = guard.layer_map();
     741              : 
     742            0 :             let l0_deltas = layers.get_level0_deltas()?;
     743            0 :             drop(guard);
     744            0 : 
     745            0 :             // As an optimization, if we find that there are too few L0 layers,
     746            0 :             // bail out early. We know that the compaction algorithm would do
     747            0 :             // nothing in that case.
     748            0 :             if l0_deltas.len() < fanout as usize {
     749              :                 // doesn't need compacting
     750            0 :                 return Ok(());
     751            0 :             }
     752            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     753            0 :         };
     754            0 : 
     755            0 :         // Is the timeline being deleted?
     756            0 :         if self.is_stopping() {
     757            0 :             trace!("Dropping out of compaction on timeline shutdown");
     758            0 :             return Err(CompactionError::ShuttingDown);
     759            0 :         }
     760              : 
     761            0 :         let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
     762            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
     763            0 : 
     764            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     765            0 :             &mut adaptor,
     766            0 :             end_lsn,
     767            0 :             target_file_size,
     768            0 :             fanout,
     769            0 :             ctx,
     770            0 :         )
     771            0 :         .await?;
     772              : 
     773            0 :         adaptor.flush_updates().await?;
     774            0 :         Ok(())
     775            0 :     }
     776              : }
     777              : 
     778              : struct TimelineAdaptor {
     779              :     timeline: Arc<Timeline>,
     780              : 
     781              :     keyspace: (Lsn, KeySpace),
     782              : 
     783              :     new_deltas: Vec<ResidentLayer>,
     784              :     new_images: Vec<ResidentLayer>,
     785              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
     786              : }
     787              : 
     788              : impl TimelineAdaptor {
     789            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
     790            0 :         Self {
     791            0 :             timeline: timeline.clone(),
     792            0 :             keyspace,
     793            0 :             new_images: Vec::new(),
     794            0 :             new_deltas: Vec::new(),
     795            0 :             layers_to_delete: Vec::new(),
     796            0 :         }
     797            0 :     }
     798              : 
     799            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
     800            0 :         let layers_to_delete = {
     801            0 :             let guard = self.timeline.layers.read().await;
     802            0 :             self.layers_to_delete
     803            0 :                 .iter()
     804            0 :                 .map(|x| guard.get_from_desc(x))
     805            0 :                 .collect::<Vec<Layer>>()
     806            0 :         };
     807            0 :         self.timeline
     808            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
     809            0 :             .await?;
     810              : 
     811            0 :         self.timeline
     812            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
     813              : 
     814            0 :         self.new_deltas.clear();
     815            0 :         self.layers_to_delete.clear();
     816            0 :         Ok(())
     817            0 :     }
     818              : }
     819              : 
     820              : #[derive(Clone)]
     821              : struct ResidentDeltaLayer(ResidentLayer);
     822              : #[derive(Clone)]
     823              : struct ResidentImageLayer(ResidentLayer);
     824              : 
     825              : impl CompactionJobExecutor for TimelineAdaptor {
     826              :     type Key = crate::repository::Key;
     827              : 
     828              :     type Layer = OwnArc<PersistentLayerDesc>;
     829              :     type DeltaLayer = ResidentDeltaLayer;
     830              :     type ImageLayer = ResidentImageLayer;
     831              : 
     832              :     type RequestContext = crate::context::RequestContext;
     833              : 
     834            0 :     async fn get_layers(
     835            0 :         &mut self,
     836            0 :         key_range: &Range<Key>,
     837            0 :         lsn_range: &Range<Lsn>,
     838            0 :         _ctx: &RequestContext,
     839            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
     840            0 :         self.flush_updates().await?;
     841              : 
     842            0 :         let guard = self.timeline.layers.read().await;
     843            0 :         let layer_map = guard.layer_map();
     844            0 : 
     845            0 :         let result = layer_map
     846            0 :             .iter_historic_layers()
     847            0 :             .filter(|l| {
     848            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
     849            0 :             })
     850            0 :             .map(OwnArc)
     851            0 :             .collect();
     852            0 :         Ok(result)
     853            0 :     }
     854              : 
     855            0 :     async fn get_keyspace(
     856            0 :         &mut self,
     857            0 :         key_range: &Range<Key>,
     858            0 :         lsn: Lsn,
     859            0 :         _ctx: &RequestContext,
     860            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
     861            0 :         if lsn == self.keyspace.0 {
     862            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
     863            0 :                 &self.keyspace.1.ranges,
     864            0 :                 key_range,
     865            0 :             ))
     866              :         } else {
     867              :             // The current compaction implementatin only ever requests the key space
     868              :             // at the compaction end LSN.
     869            0 :             anyhow::bail!("keyspace not available for requested lsn");
     870              :         }
     871            0 :     }
     872              : 
     873            0 :     async fn downcast_delta_layer(
     874            0 :         &self,
     875            0 :         layer: &OwnArc<PersistentLayerDesc>,
     876            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
     877            0 :         // this is a lot more complex than a simple downcast...
     878            0 :         if layer.is_delta() {
     879            0 :             let l = {
     880            0 :                 let guard = self.timeline.layers.read().await;
     881            0 :                 guard.get_from_desc(layer)
     882              :             };
     883            0 :             let result = l.download_and_keep_resident().await?;
     884              : 
     885            0 :             Ok(Some(ResidentDeltaLayer(result)))
     886              :         } else {
     887            0 :             Ok(None)
     888              :         }
     889            0 :     }
     890              : 
     891            0 :     async fn create_image(
     892            0 :         &mut self,
     893            0 :         lsn: Lsn,
     894            0 :         key_range: &Range<Key>,
     895            0 :         ctx: &RequestContext,
     896            0 :     ) -> anyhow::Result<()> {
     897            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
     898            0 :     }
     899              : 
     900            0 :     async fn create_delta(
     901            0 :         &mut self,
     902            0 :         lsn_range: &Range<Lsn>,
     903            0 :         key_range: &Range<Key>,
     904            0 :         input_layers: &[ResidentDeltaLayer],
     905            0 :         ctx: &RequestContext,
     906            0 :     ) -> anyhow::Result<()> {
     907            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     908              : 
     909            0 :         let mut all_entries = Vec::new();
     910            0 :         for dl in input_layers.iter() {
     911            0 :             all_entries.extend(dl.load_keys(ctx).await?);
     912              :         }
     913              : 
     914              :         // The current stdlib sorting implementation is designed in a way where it is
     915              :         // particularly fast where the slice is made up of sorted sub-ranges.
     916            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     917              : 
     918            0 :         let mut writer = DeltaLayerWriter::new(
     919            0 :             self.timeline.conf,
     920            0 :             self.timeline.timeline_id,
     921            0 :             self.timeline.tenant_shard_id,
     922            0 :             key_range.start,
     923            0 :             lsn_range.clone(),
     924            0 :         )
     925            0 :         .await?;
     926              : 
     927            0 :         let mut dup_values = 0;
     928            0 : 
     929            0 :         // This iterator walks through all key-value pairs from all the layers
     930            0 :         // we're compacting, in key, LSN order.
     931            0 :         let mut prev: Option<(Key, Lsn)> = None;
     932              :         for &DeltaEntry {
     933            0 :             key, lsn, ref val, ..
     934            0 :         } in all_entries.iter()
     935              :         {
     936            0 :             if prev == Some((key, lsn)) {
     937              :                 // This is a duplicate. Skip it.
     938              :                 //
     939              :                 // It can happen if compaction is interrupted after writing some
     940              :                 // layers but not all, and we are compacting the range again.
     941              :                 // The calculations in the algorithm assume that there are no
     942              :                 // duplicates, so the math on targeted file size is likely off,
     943              :                 // and we will create smaller files than expected.
     944            0 :                 dup_values += 1;
     945            0 :                 continue;
     946            0 :             }
     947              : 
     948            0 :             let value = val.load(ctx).await?;
     949              : 
     950            0 :             writer.put_value(key, lsn, value).await?;
     951              : 
     952            0 :             prev = Some((key, lsn));
     953              :         }
     954              : 
     955            0 :         if dup_values > 0 {
     956            0 :             warn!("delta layer created with {} duplicate values", dup_values);
     957            0 :         }
     958              : 
     959            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
     960            0 :             Err(anyhow::anyhow!(
     961            0 :                 "failpoint delta-layer-writer-fail-before-finish"
     962            0 :             ))
     963            0 :         });
     964              : 
     965            0 :         let new_delta_layer = writer
     966            0 :             .finish(prev.unwrap().0.next(), &self.timeline)
     967            0 :             .await?;
     968              : 
     969            0 :         self.new_deltas.push(new_delta_layer);
     970            0 :         Ok(())
     971            0 :     }
     972              : 
     973            0 :     async fn delete_layer(
     974            0 :         &mut self,
     975            0 :         layer: &OwnArc<PersistentLayerDesc>,
     976            0 :         _ctx: &RequestContext,
     977            0 :     ) -> anyhow::Result<()> {
     978            0 :         self.layers_to_delete.push(layer.clone().0);
     979            0 :         Ok(())
     980            0 :     }
     981              : }
     982              : 
     983              : impl TimelineAdaptor {
     984            0 :     async fn create_image_impl(
     985            0 :         &mut self,
     986            0 :         lsn: Lsn,
     987            0 :         key_range: &Range<Key>,
     988            0 :         ctx: &RequestContext,
     989            0 :     ) -> Result<(), PageReconstructError> {
     990            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
     991              : 
     992            0 :         let mut image_layer_writer = ImageLayerWriter::new(
     993            0 :             self.timeline.conf,
     994            0 :             self.timeline.timeline_id,
     995            0 :             self.timeline.tenant_shard_id,
     996            0 :             key_range,
     997            0 :             lsn,
     998            0 :         )
     999            0 :         .await?;
    1000              : 
    1001            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1002            0 :             Err(PageReconstructError::Other(anyhow::anyhow!(
    1003            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1004            0 :             )))
    1005            0 :         });
    1006            0 :         let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
    1007            0 :         for range in &keyspace_ranges {
    1008            0 :             let mut key = range.start;
    1009            0 :             while key < range.end {
    1010            0 :                 let img = match self.timeline.get(key, lsn, ctx).await {
    1011            0 :                     Ok(img) => img,
    1012            0 :                     Err(err) => {
    1013            0 :                         // If we fail to reconstruct a VM or FSM page, we can zero the
    1014            0 :                         // page without losing any actual user data. That seems better
    1015            0 :                         // than failing repeatedly and getting stuck.
    1016            0 :                         //
    1017            0 :                         // We had a bug at one point, where we truncated the FSM and VM
    1018            0 :                         // in the pageserver, but the Postgres didn't know about that
    1019            0 :                         // and continued to generate incremental WAL records for pages
    1020            0 :                         // that didn't exist in the pageserver. Trying to replay those
    1021            0 :                         // WAL records failed to find the previous image of the page.
    1022            0 :                         // This special case allows us to recover from that situation.
    1023            0 :                         // See https://github.com/neondatabase/neon/issues/2601.
    1024            0 :                         //
    1025            0 :                         // Unfortunately we cannot do this for the main fork, or for
    1026            0 :                         // any metadata keys, keys, as that would lead to actual data
    1027            0 :                         // loss.
    1028            0 :                         if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    1029            0 :                             warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    1030            0 :                             ZERO_PAGE.clone()
    1031              :                         } else {
    1032            0 :                             return Err(err);
    1033              :                         }
    1034              :                     }
    1035              :                 };
    1036            0 :                 image_layer_writer.put_image(key, img).await?;
    1037            0 :                 key = key.next();
    1038              :             }
    1039              :         }
    1040            0 :         let image_layer = image_layer_writer.finish(&self.timeline).await?;
    1041              : 
    1042            0 :         self.new_images.push(image_layer);
    1043            0 : 
    1044            0 :         timer.stop_and_record();
    1045            0 : 
    1046            0 :         Ok(())
    1047            0 :     }
    1048              : }
    1049              : 
    1050              : impl CompactionRequestContext for crate::context::RequestContext {}
    1051              : 
    1052              : #[derive(Debug, Clone)]
    1053              : pub struct OwnArc<T>(pub Arc<T>);
    1054              : 
    1055              : impl<T> Deref for OwnArc<T> {
    1056              :     type Target = <Arc<T> as Deref>::Target;
    1057            0 :     fn deref(&self) -> &Self::Target {
    1058            0 :         &self.0
    1059            0 :     }
    1060              : }
    1061              : 
    1062              : impl<T> AsRef<T> for OwnArc<T> {
    1063            0 :     fn as_ref(&self) -> &T {
    1064            0 :         self.0.as_ref()
    1065            0 :     }
    1066              : }
    1067              : 
    1068              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1069            0 :     fn key_range(&self) -> &Range<Key> {
    1070            0 :         &self.key_range
    1071            0 :     }
    1072            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1073            0 :         &self.lsn_range
    1074            0 :     }
    1075            0 :     fn file_size(&self) -> u64 {
    1076            0 :         self.file_size
    1077            0 :     }
    1078            0 :     fn short_id(&self) -> std::string::String {
    1079            0 :         self.as_ref().short_id().to_string()
    1080            0 :     }
    1081            0 :     fn is_delta(&self) -> bool {
    1082            0 :         self.as_ref().is_delta()
    1083            0 :     }
    1084              : }
    1085              : 
    1086              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1087            0 :     fn key_range(&self) -> &Range<Key> {
    1088            0 :         &self.layer_desc().key_range
    1089            0 :     }
    1090            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1091            0 :         &self.layer_desc().lsn_range
    1092            0 :     }
    1093            0 :     fn file_size(&self) -> u64 {
    1094            0 :         self.layer_desc().file_size
    1095            0 :     }
    1096            0 :     fn short_id(&self) -> std::string::String {
    1097            0 :         self.layer_desc().short_id().to_string()
    1098            0 :     }
    1099            0 :     fn is_delta(&self) -> bool {
    1100            0 :         true
    1101            0 :     }
    1102              : }
    1103              : 
    1104              : use crate::tenant::timeline::DeltaEntry;
    1105              : 
    1106              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1107            0 :     fn key_range(&self) -> &Range<Key> {
    1108            0 :         &self.0.layer_desc().key_range
    1109            0 :     }
    1110            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1111            0 :         &self.0.layer_desc().lsn_range
    1112            0 :     }
    1113            0 :     fn file_size(&self) -> u64 {
    1114            0 :         self.0.layer_desc().file_size
    1115            0 :     }
    1116            0 :     fn short_id(&self) -> std::string::String {
    1117            0 :         self.0.layer_desc().short_id().to_string()
    1118            0 :     }
    1119            0 :     fn is_delta(&self) -> bool {
    1120            0 :         true
    1121            0 :     }
    1122              : }
    1123              : 
    1124              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1125              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1126              : 
    1127            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1128            0 :         self.0.load_keys(ctx).await
    1129            0 :     }
    1130              : }
    1131              : 
    1132              : impl CompactionLayer<Key> for ResidentImageLayer {
    1133            0 :     fn key_range(&self) -> &Range<Key> {
    1134            0 :         &self.0.layer_desc().key_range
    1135            0 :     }
    1136            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1137            0 :         &self.0.layer_desc().lsn_range
    1138            0 :     }
    1139            0 :     fn file_size(&self) -> u64 {
    1140            0 :         self.0.layer_desc().file_size
    1141            0 :     }
    1142            0 :     fn short_id(&self) -> std::string::String {
    1143            0 :         self.0.layer_desc().short_id().to_string()
    1144            0 :     }
    1145            0 :     fn is_delta(&self) -> bool {
    1146            0 :         false
    1147            0 :     }
    1148              : }
    1149              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta