LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 60.6 % 1687 1023
Test Date: 2024-09-19 12:04:32 Functions: 39.2 % 130 51

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::key::KEY_SIZE;
      23              : use pageserver_api::keyspace::ShardedRange;
      24              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      25              : use serde::Serialize;
      26              : use tokio_util::sync::CancellationToken;
      27              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      28              : use utils::id::TimelineId;
      29              : 
      30              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      31              : use crate::page_cache;
      32              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      33              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      34              : use crate::tenant::storage_layer::split_writer::{
      35              :     SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
      36              : };
      37              : use crate::tenant::storage_layer::{
      38              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      39              : };
      40              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      41              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      42              : use crate::tenant::timeline::{Layer, ResidentLayer};
      43              : use crate::tenant::DeltaLayer;
      44              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      45              : use pageserver_api::config::tenant_conf_defaults::{
      46              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      47              : };
      48              : 
      49              : use crate::keyspace::KeySpace;
      50              : use crate::repository::{Key, Value};
      51              : use crate::walrecord::NeonWalRecord;
      52              : 
      53              : use utils::lsn::Lsn;
      54              : 
      55              : use pageserver_compaction::helpers::overlaps_with;
      56              : use pageserver_compaction::interface::*;
      57              : 
      58              : use super::CompactionError;
      59              : 
      60              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      61              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      62              : 
      63              : /// The result of bottom-most compaction for a single key at each LSN.
      64              : #[derive(Debug)]
      65              : #[cfg_attr(test, derive(PartialEq))]
      66              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
      67              : 
      68              : /// The result of bottom-most compaction.
      69              : #[derive(Debug)]
      70              : #[cfg_attr(test, derive(PartialEq))]
      71              : pub(crate) struct KeyHistoryRetention {
      72              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
      73              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
      74              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
      75              :     pub(crate) above_horizon: KeyLogAtLsn,
      76              : }
      77              : 
      78              : impl KeyHistoryRetention {
      79              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
      80              :     ///
      81              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
      82              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
      83              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
      84              :     /// Then we delete branch @ 0x20.
      85              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
      86              :     /// And that wouldnt' change the shape of the layer.
      87              :     ///
      88              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
      89              :     ///
      90              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
      91          114 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
      92          114 :         if dry_run {
      93            0 :             return true;
      94          114 :         }
      95          114 :         let guard = tline.layers.read().await;
      96          114 :         if !guard.contains_key(key) {
      97           66 :             return false;
      98           48 :         }
      99           48 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     100           48 :         drop(guard);
     101           48 :         if layer_generation == tline.generation {
     102           48 :             info!(
     103              :                 key=%key,
     104              :                 ?layer_generation,
     105            0 :                 "discard layer due to duplicated layer key in the same generation",
     106              :             );
     107           48 :             true
     108              :         } else {
     109            0 :             false
     110              :         }
     111          114 :     }
     112              : 
     113              :     /// Pipe a history of a single key to the writers.
     114              :     ///
     115              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     116              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     117              :     #[allow(clippy::too_many_arguments)]
     118         1266 :     async fn pipe_to(
     119         1266 :         self,
     120         1266 :         key: Key,
     121         1266 :         tline: &Arc<Timeline>,
     122         1266 :         delta_writer: &mut SplitDeltaLayerWriter,
     123         1266 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     124         1266 :         stat: &mut CompactionStatistics,
     125         1266 :         dry_run: bool,
     126         1266 :         ctx: &RequestContext,
     127         1266 :     ) -> anyhow::Result<()> {
     128         1266 :         let mut first_batch = true;
     129         1266 :         let discard = |key: &PersistentLayerKey| {
     130            0 :             let key = key.clone();
     131            0 :             async move { Self::discard_key(&key, tline, dry_run).await }
     132            0 :         };
     133         4206 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     134         2940 :             if first_batch {
     135         1266 :                 if logs.len() == 1 && logs[0].1.is_image() {
     136         1224 :                     let Value::Image(img) = &logs[0].1 else {
     137            0 :                         unreachable!()
     138              :                     };
     139         1224 :                     stat.produce_image_key(img);
     140         1224 :                     if let Some(image_writer) = image_writer.as_mut() {
     141         1224 :                         image_writer
     142         1224 :                             .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
     143         1242 :                             .await?;
     144              :                     } else {
     145            0 :                         delta_writer
     146            0 :                             .put_value_with_discard_fn(
     147            0 :                                 key,
     148            0 :                                 cutoff_lsn,
     149            0 :                                 Value::Image(img.clone()),
     150            0 :                                 tline,
     151            0 :                                 ctx,
     152            0 :                                 discard,
     153            0 :                             )
     154            0 :                             .await?;
     155              :                     }
     156              :                 } else {
     157           84 :                     for (lsn, val) in logs {
     158           42 :                         stat.produce_key(&val);
     159           42 :                         delta_writer
     160           42 :                             .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     161            3 :                             .await?;
     162              :                     }
     163              :                 }
     164         1266 :                 first_batch = false;
     165              :             } else {
     166         1920 :                 for (lsn, val) in logs {
     167          246 :                     stat.produce_key(&val);
     168          246 :                     delta_writer
     169          246 :                         .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     170           24 :                         .await?;
     171              :                 }
     172              :             }
     173              :         }
     174         1266 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     175         1362 :         for (lsn, val) in above_horizon_logs {
     176           96 :             stat.produce_key(&val);
     177           96 :             delta_writer
     178           96 :                 .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     179            6 :                 .await?;
     180              :         }
     181         1266 :         Ok(())
     182         1266 :     }
     183              : }
     184              : 
     185              : #[derive(Debug, Serialize, Default)]
     186              : struct CompactionStatisticsNumSize {
     187              :     num: u64,
     188              :     size: u64,
     189              : }
     190              : 
     191              : #[derive(Debug, Serialize, Default)]
     192              : pub struct CompactionStatistics {
     193              :     delta_layer_visited: CompactionStatisticsNumSize,
     194              :     image_layer_visited: CompactionStatisticsNumSize,
     195              :     delta_layer_produced: CompactionStatisticsNumSize,
     196              :     image_layer_produced: CompactionStatisticsNumSize,
     197              :     num_delta_layer_discarded: usize,
     198              :     num_image_layer_discarded: usize,
     199              :     num_unique_keys_visited: usize,
     200              :     wal_keys_visited: CompactionStatisticsNumSize,
     201              :     image_keys_visited: CompactionStatisticsNumSize,
     202              :     wal_produced: CompactionStatisticsNumSize,
     203              :     image_produced: CompactionStatisticsNumSize,
     204              : }
     205              : 
     206              : impl CompactionStatistics {
     207         2058 :     fn estimated_size_of_value(val: &Value) -> usize {
     208          798 :         match val {
     209         1260 :             Value::Image(img) => img.len(),
     210            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     211          798 :             _ => std::mem::size_of::<NeonWalRecord>(),
     212              :         }
     213         2058 :     }
     214         3288 :     fn estimated_size_of_key() -> usize {
     215         3288 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     216         3288 :     }
     217          138 :     fn visit_delta_layer(&mut self, size: u64) {
     218          138 :         self.delta_layer_visited.num += 1;
     219          138 :         self.delta_layer_visited.size += size;
     220          138 :     }
     221          108 :     fn visit_image_layer(&mut self, size: u64) {
     222          108 :         self.image_layer_visited.num += 1;
     223          108 :         self.image_layer_visited.size += size;
     224          108 :     }
     225         1266 :     fn on_unique_key_visited(&mut self) {
     226         1266 :         self.num_unique_keys_visited += 1;
     227         1266 :     }
     228          420 :     fn visit_wal_key(&mut self, val: &Value) {
     229          420 :         self.wal_keys_visited.num += 1;
     230          420 :         self.wal_keys_visited.size +=
     231          420 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     232          420 :     }
     233         1260 :     fn visit_image_key(&mut self, val: &Value) {
     234         1260 :         self.image_keys_visited.num += 1;
     235         1260 :         self.image_keys_visited.size +=
     236         1260 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     237         1260 :     }
     238          384 :     fn produce_key(&mut self, val: &Value) {
     239          384 :         match val {
     240            6 :             Value::Image(img) => self.produce_image_key(img),
     241          378 :             Value::WalRecord(_) => self.produce_wal_key(val),
     242              :         }
     243          384 :     }
     244          378 :     fn produce_wal_key(&mut self, val: &Value) {
     245          378 :         self.wal_produced.num += 1;
     246          378 :         self.wal_produced.size +=
     247          378 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     248          378 :     }
     249         1230 :     fn produce_image_key(&mut self, val: &Bytes) {
     250         1230 :         self.image_produced.num += 1;
     251         1230 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     252         1230 :     }
     253           24 :     fn discard_delta_layer(&mut self) {
     254           24 :         self.num_delta_layer_discarded += 1;
     255           24 :     }
     256           24 :     fn discard_image_layer(&mut self) {
     257           24 :         self.num_image_layer_discarded += 1;
     258           24 :     }
     259           30 :     fn produce_delta_layer(&mut self, size: u64) {
     260           30 :         self.delta_layer_produced.num += 1;
     261           30 :         self.delta_layer_produced.size += size;
     262           30 :     }
     263           36 :     fn produce_image_layer(&mut self, size: u64) {
     264           36 :         self.image_layer_produced.num += 1;
     265           36 :         self.image_layer_produced.size += size;
     266           36 :     }
     267              : }
     268              : 
     269              : impl Timeline {
     270              :     /// TODO: cancellation
     271              :     ///
     272              :     /// Returns whether the compaction has pending tasks.
     273         1092 :     pub(crate) async fn compact_legacy(
     274         1092 :         self: &Arc<Self>,
     275         1092 :         cancel: &CancellationToken,
     276         1092 :         flags: EnumSet<CompactFlags>,
     277         1092 :         ctx: &RequestContext,
     278         1092 :     ) -> Result<bool, CompactionError> {
     279         1092 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
     280            0 :             self.compact_with_gc(cancel, flags, ctx)
     281            0 :                 .await
     282            0 :                 .map_err(CompactionError::Other)?;
     283            0 :             return Ok(false);
     284         1092 :         }
     285         1092 : 
     286         1092 :         if flags.contains(CompactFlags::DryRun) {
     287            0 :             return Err(CompactionError::Other(anyhow!(
     288            0 :                 "dry-run mode is not supported for legacy compaction for now"
     289            0 :             )));
     290         1092 :         }
     291         1092 : 
     292         1092 :         // High level strategy for compaction / image creation:
     293         1092 :         //
     294         1092 :         // 1. First, calculate the desired "partitioning" of the
     295         1092 :         // currently in-use key space. The goal is to partition the
     296         1092 :         // key space into roughly fixed-size chunks, but also take into
     297         1092 :         // account any existing image layers, and try to align the
     298         1092 :         // chunk boundaries with the existing image layers to avoid
     299         1092 :         // too much churn. Also try to align chunk boundaries with
     300         1092 :         // relation boundaries.  In principle, we don't know about
     301         1092 :         // relation boundaries here, we just deal with key-value
     302         1092 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     303         1092 :         // map relations into key-value pairs. But in practice we know
     304         1092 :         // that 'field6' is the block number, and the fields 1-5
     305         1092 :         // identify a relation. This is just an optimization,
     306         1092 :         // though.
     307         1092 :         //
     308         1092 :         // 2. Once we know the partitioning, for each partition,
     309         1092 :         // decide if it's time to create a new image layer. The
     310         1092 :         // criteria is: there has been too much "churn" since the last
     311         1092 :         // image layer? The "churn" is fuzzy concept, it's a
     312         1092 :         // combination of too many delta files, or too much WAL in
     313         1092 :         // total in the delta file. Or perhaps: if creating an image
     314         1092 :         // file would allow to delete some older files.
     315         1092 :         //
     316         1092 :         // 3. After that, we compact all level0 delta files if there
     317         1092 :         // are too many of them.  While compacting, we also garbage
     318         1092 :         // collect any page versions that are no longer needed because
     319         1092 :         // of the new image layers we created in step 2.
     320         1092 :         //
     321         1092 :         // TODO: This high level strategy hasn't been implemented yet.
     322         1092 :         // Below are functions compact_level0() and create_image_layers()
     323         1092 :         // but they are a bit ad hoc and don't quite work like it's explained
     324         1092 :         // above. Rewrite it.
     325         1092 : 
     326         1092 :         // Is the timeline being deleted?
     327         1092 :         if self.is_stopping() {
     328            0 :             trace!("Dropping out of compaction on timeline shutdown");
     329            0 :             return Err(CompactionError::ShuttingDown);
     330         1092 :         }
     331         1092 : 
     332         1092 :         let target_file_size = self.get_checkpoint_distance();
     333              : 
     334              :         // Define partitioning schema if needed
     335              : 
     336              :         // FIXME: the match should only cover repartitioning, not the next steps
     337         1092 :         let (partition_count, has_pending_tasks) = match self
     338         1092 :             .repartition(
     339         1092 :                 self.get_last_record_lsn(),
     340         1092 :                 self.get_compaction_target_size(),
     341         1092 :                 flags,
     342         1092 :                 ctx,
     343         1092 :             )
     344        47982 :             .await
     345              :         {
     346         1092 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     347         1092 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     348         1092 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     349         1092 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     350         1092 :                     .build();
     351         1092 : 
     352         1092 :                 // 2. Compact
     353         1092 :                 let timer = self.metrics.compact_time_histo.start_timer();
     354        29656 :                 let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
     355         1092 :                 timer.stop_and_record();
     356         1092 : 
     357         1092 :                 let mut partitioning = dense_partitioning;
     358         1092 :                 partitioning
     359         1092 :                     .parts
     360         1092 :                     .extend(sparse_partitioning.into_dense().parts);
     361         1092 : 
     362         1092 :                 // 3. Create new image layers for partitions that have been modified
     363         1092 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     364         1092 :                 if fully_compacted {
     365         1092 :                     let image_layers = self
     366         1092 :                         .create_image_layers(
     367         1092 :                             &partitioning,
     368         1092 :                             lsn,
     369         1092 :                             if flags.contains(CompactFlags::ForceImageLayerCreation) {
     370           42 :                                 ImageLayerCreationMode::Force
     371              :                             } else {
     372         1050 :                                 ImageLayerCreationMode::Try
     373              :                             },
     374         1092 :                             &image_ctx,
     375              :                         )
     376        40849 :                         .await?;
     377              : 
     378         1092 :                     self.upload_new_image_layers(image_layers)?;
     379              :                 } else {
     380            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     381              :                 }
     382         1092 :                 (partitioning.parts.len(), !fully_compacted)
     383              :             }
     384            0 :             Err(err) => {
     385            0 :                 // no partitioning? This is normal, if the timeline was just created
     386            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     387            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     388            0 :                 // error but continue.
     389            0 :                 //
     390            0 :                 // Suppress error when it's due to cancellation
     391            0 :                 if !self.cancel.is_cancelled() {
     392            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     393            0 :                 }
     394            0 :                 (1, false)
     395              :             }
     396              :         };
     397              : 
     398         1092 :         if self.shard_identity.count >= ShardCount::new(2) {
     399              :             // Limit the number of layer rewrites to the number of partitions: this means its
     400              :             // runtime should be comparable to a full round of image layer creations, rather than
     401              :             // being potentially much longer.
     402            0 :             let rewrite_max = partition_count;
     403            0 : 
     404            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     405         1092 :         }
     406              : 
     407         1092 :         Ok(has_pending_tasks)
     408         1092 :     }
     409              : 
     410              :     /// Check for layers that are elegible to be rewritten:
     411              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     412              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     413              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     414              :     ///   rather not maintain compatibility with indefinitely.
     415              :     ///
     416              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     417              :     /// how much work it will try to do in each compaction pass.
     418            0 :     async fn compact_shard_ancestors(
     419            0 :         self: &Arc<Self>,
     420            0 :         rewrite_max: usize,
     421            0 :         ctx: &RequestContext,
     422            0 :     ) -> Result<(), CompactionError> {
     423            0 :         let mut drop_layers = Vec::new();
     424            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     425            0 : 
     426            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     427            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     428            0 :         // pitr_interval, for example because a branchpoint references it.
     429            0 :         //
     430            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     431            0 :         // are rewriting layers.
     432            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     433            0 : 
     434            0 :         tracing::info!(
     435            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     436            0 :             *latest_gc_cutoff,
     437            0 :             self.gc_info.read().unwrap().cutoffs.time
     438              :         );
     439              : 
     440            0 :         let layers = self.layers.read().await;
     441            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     442            0 :             let layer = layers.get_from_desc(&layer_desc);
     443            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     444              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     445            0 :                 continue;
     446            0 :             }
     447            0 : 
     448            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     449            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     450            0 :             let layer_local_page_count = sharded_range.page_count();
     451            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     452            0 :             if layer_local_page_count == 0 {
     453              :                 // This ancestral layer only covers keys that belong to other shards.
     454              :                 // We include the full metadata in the log: if we had some critical bug that caused
     455              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     456            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     457            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     458              :                 );
     459              : 
     460            0 :                 if cfg!(debug_assertions) {
     461              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     462              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     463              :                     // should be !is_key_disposable()
     464            0 :                     let range = layer_desc.get_key_range();
     465            0 :                     let mut key = range.start;
     466            0 :                     while key < range.end {
     467            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     468            0 :                         key = key.next();
     469              :                     }
     470            0 :                 }
     471              : 
     472            0 :                 drop_layers.push(layer);
     473            0 :                 continue;
     474            0 :             } else if layer_local_page_count != u32::MAX
     475            0 :                 && layer_local_page_count == layer_raw_page_count
     476              :             {
     477            0 :                 debug!(%layer,
     478            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     479              :                     layer_local_page_count
     480              :                 );
     481            0 :                 continue;
     482            0 :             }
     483            0 : 
     484            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     485            0 :             if layer_local_page_count != u32::MAX
     486            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     487              :             {
     488            0 :                 debug!(%layer,
     489            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     490              :                     layer_local_page_count,
     491              :                     layer_raw_page_count
     492              :                 );
     493            0 :             }
     494              : 
     495              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     496              :             // without incurring the I/O cost of a rewrite.
     497            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     498            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     499            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     500            0 :                 continue;
     501            0 :             }
     502            0 : 
     503            0 :             if layer_desc.is_delta() {
     504              :                 // We do not yet implement rewrite of delta layers
     505            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     506            0 :                 continue;
     507            0 :             }
     508            0 : 
     509            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     510            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     511            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     512            0 :             if layer.metadata().generation == self.generation {
     513            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     514            0 :                 continue;
     515            0 :             }
     516            0 : 
     517            0 :             if layers_to_rewrite.len() >= rewrite_max {
     518            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     519            0 :                     layers_to_rewrite.len()
     520              :                 );
     521            0 :                 continue;
     522            0 :             }
     523            0 : 
     524            0 :             // Fall through: all our conditions for doing a rewrite passed.
     525            0 :             layers_to_rewrite.push(layer);
     526              :         }
     527              : 
     528              :         // Drop read lock on layer map before we start doing time-consuming I/O
     529            0 :         drop(layers);
     530            0 : 
     531            0 :         let mut replace_image_layers = Vec::new();
     532              : 
     533            0 :         for layer in layers_to_rewrite {
     534            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     535            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     536            0 :                 self.conf,
     537            0 :                 self.timeline_id,
     538            0 :                 self.tenant_shard_id,
     539            0 :                 &layer.layer_desc().key_range,
     540            0 :                 layer.layer_desc().image_layer_lsn(),
     541            0 :                 ctx,
     542            0 :             )
     543            0 :             .await
     544            0 :             .map_err(CompactionError::Other)?;
     545              : 
     546              :             // Safety of layer rewrites:
     547              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     548              :             //   cannot interfere with the new one.
     549              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     550              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     551              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     552              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     553              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     554              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     555              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     556              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     557              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     558              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     559            0 :             let resident = layer.download_and_keep_resident().await?;
     560              : 
     561            0 :             let keys_written = resident
     562            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     563            0 :                 .await?;
     564              : 
     565            0 :             if keys_written > 0 {
     566            0 :                 let new_layer = image_layer_writer
     567            0 :                     .finish(self, ctx)
     568            0 :                     .await
     569            0 :                     .map_err(CompactionError::Other)?;
     570            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     571            0 :                     layer.metadata().file_size,
     572            0 :                     new_layer.metadata().file_size);
     573              : 
     574            0 :                 replace_image_layers.push((layer, new_layer));
     575            0 :             } else {
     576            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     577            0 :                 // the layer has no data for us with the ShardedRange check above, but
     578            0 :                 drop_layers.push(layer);
     579            0 :             }
     580              :         }
     581              : 
     582              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     583              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     584              :         // to remote index) and be removed. This is inefficient but safe.
     585            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     586            0 : 
     587            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     588            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     589            0 :             .await?;
     590              : 
     591            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     592            0 : 
     593            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     594            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     595            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     596            0 :         // load.
     597            0 :         match self.remote_client.wait_completion().await {
     598            0 :             Ok(()) => (),
     599            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     600              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     601            0 :                 return Err(CompactionError::ShuttingDown)
     602              :             }
     603              :         }
     604              : 
     605            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     606            0 : 
     607            0 :         Ok(())
     608            0 :     }
     609              : 
     610              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     611              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     612              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     613              :     ///
     614              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     615              :     /// that we know won't be needed for reads.
     616         2148 :     pub(super) async fn update_layer_visibility(
     617         2148 :         &self,
     618         2148 :     ) -> Result<(), super::layer_manager::Shutdown> {
     619         2148 :         let head_lsn = self.get_last_record_lsn();
     620              : 
     621              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     622              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     623              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     624              :         // they will be subject to L0->L1 compaction in the near future.
     625         2148 :         let layer_manager = self.layers.read().await;
     626         2148 :         let layer_map = layer_manager.layer_map()?;
     627              : 
     628         2148 :         let readable_points = {
     629         2148 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     630         2148 : 
     631         2148 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     632         2148 :             for (child_lsn, _child_timeline_id) in &children {
     633            0 :                 readable_points.push(*child_lsn);
     634            0 :             }
     635         2148 :             readable_points.push(head_lsn);
     636         2148 :             readable_points
     637         2148 :         };
     638         2148 : 
     639         2148 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     640        10218 :         for (layer_desc, visibility) in layer_visibility {
     641         8070 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     642         8070 :             let layer = layer_manager.get_from_desc(&layer_desc);
     643         8070 :             layer.set_visibility(visibility);
     644         8070 :         }
     645              : 
     646              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     647              :         // avoid assuming that everything at a branch point is visible.
     648         2148 :         drop(covered);
     649         2148 :         Ok(())
     650         2148 :     }
     651              : 
     652              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     653              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     654         1092 :     async fn compact_level0(
     655         1092 :         self: &Arc<Self>,
     656         1092 :         target_file_size: u64,
     657         1092 :         ctx: &RequestContext,
     658         1092 :     ) -> Result<bool, CompactionError> {
     659              :         let CompactLevel0Phase1Result {
     660         1092 :             new_layers,
     661         1092 :             deltas_to_compact,
     662         1092 :             fully_compacted,
     663              :         } = {
     664         1092 :             let phase1_span = info_span!("compact_level0_phase1");
     665         1092 :             let ctx = ctx.attached_child();
     666         1092 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     667         1092 :                 version: Some(2),
     668         1092 :                 tenant_id: Some(self.tenant_shard_id),
     669         1092 :                 timeline_id: Some(self.timeline_id),
     670         1092 :                 ..Default::default()
     671         1092 :             };
     672         1092 : 
     673         1092 :             let begin = tokio::time::Instant::now();
     674         1092 :             let phase1_layers_locked = self.layers.read().await;
     675         1092 :             let now = tokio::time::Instant::now();
     676         1092 :             stats.read_lock_acquisition_micros =
     677         1092 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     678         1092 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     679         1092 :                 .instrument(phase1_span)
     680        29651 :                 .await?
     681              :         };
     682              : 
     683         1092 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     684              :             // nothing to do
     685         1008 :             return Ok(true);
     686           84 :         }
     687           84 : 
     688           84 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     689            3 :             .await?;
     690           84 :         Ok(fully_compacted)
     691         1092 :     }
     692              : 
     693              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     694         1092 :     async fn compact_level0_phase1<'a>(
     695         1092 :         self: &'a Arc<Self>,
     696         1092 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     697         1092 :         mut stats: CompactLevel0Phase1StatsBuilder,
     698         1092 :         target_file_size: u64,
     699         1092 :         ctx: &RequestContext,
     700         1092 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     701         1092 :         stats.read_lock_held_spawn_blocking_startup_micros =
     702         1092 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     703         1092 :         let layers = guard.layer_map()?;
     704         1092 :         let level0_deltas = layers.level0_deltas();
     705         1092 :         stats.level0_deltas_count = Some(level0_deltas.len());
     706         1092 : 
     707         1092 :         // Only compact if enough layers have accumulated.
     708         1092 :         let threshold = self.get_compaction_threshold();
     709         1092 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     710         1008 :             debug!(
     711            0 :                 level0_deltas = level0_deltas.len(),
     712            0 :                 threshold, "too few deltas to compact"
     713              :             );
     714         1008 :             return Ok(CompactLevel0Phase1Result::default());
     715           84 :         }
     716           84 : 
     717           84 :         let mut level0_deltas = level0_deltas
     718           84 :             .iter()
     719         1206 :             .map(|x| guard.get_from_desc(x))
     720           84 :             .collect::<Vec<_>>();
     721           84 : 
     722           84 :         // Gather the files to compact in this iteration.
     723           84 :         //
     724           84 :         // Start with the oldest Level 0 delta file, and collect any other
     725           84 :         // level 0 files that form a contiguous sequence, such that the end
     726           84 :         // LSN of previous file matches the start LSN of the next file.
     727           84 :         //
     728           84 :         // Note that if the files don't form such a sequence, we might
     729           84 :         // "compact" just a single file. That's a bit pointless, but it allows
     730           84 :         // us to get rid of the level 0 file, and compact the other files on
     731           84 :         // the next iteration. This could probably made smarter, but such
     732           84 :         // "gaps" in the sequence of level 0 files should only happen in case
     733           84 :         // of a crash, partial download from cloud storage, or something like
     734           84 :         // that, so it's not a big deal in practice.
     735         2244 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     736           84 :         let mut level0_deltas_iter = level0_deltas.iter();
     737           84 : 
     738           84 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     739           84 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     740           84 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     741           84 : 
     742           84 :         // Accumulate the size of layers in `deltas_to_compact`
     743           84 :         let mut deltas_to_compact_bytes = 0;
     744           84 : 
     745           84 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     746           84 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     747           84 :         // work in this function to only operate on this much delta data at once.
     748           84 :         //
     749           84 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     750           84 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     751           84 :         // still let them compact a full stack of L0s in one go.
     752           84 :         let delta_size_limit = std::cmp::max(
     753           84 :             self.get_compaction_threshold(),
     754           84 :             DEFAULT_COMPACTION_THRESHOLD,
     755           84 :         ) as u64
     756           84 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     757           84 : 
     758           84 :         let mut fully_compacted = true;
     759           84 : 
     760           84 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     761         1206 :         for l in level0_deltas_iter {
     762         1122 :             let lsn_range = &l.layer_desc().lsn_range;
     763         1122 : 
     764         1122 :             if lsn_range.start != prev_lsn_end {
     765            0 :                 break;
     766         1122 :             }
     767         1122 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     768         1122 :             deltas_to_compact_bytes += l.metadata().file_size;
     769         1122 :             prev_lsn_end = lsn_range.end;
     770         1122 : 
     771         1122 :             if deltas_to_compact_bytes >= delta_size_limit {
     772            0 :                 info!(
     773            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     774            0 :                     l0_deltas_total = level0_deltas.len(),
     775            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     776              :                     delta_size_limit
     777              :                 );
     778            0 :                 fully_compacted = false;
     779            0 : 
     780            0 :                 // Proceed with compaction, but only a subset of L0s
     781            0 :                 break;
     782         1122 :             }
     783              :         }
     784           84 :         let lsn_range = Range {
     785           84 :             start: deltas_to_compact
     786           84 :                 .first()
     787           84 :                 .unwrap()
     788           84 :                 .layer_desc()
     789           84 :                 .lsn_range
     790           84 :                 .start,
     791           84 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     792           84 :         };
     793           84 : 
     794           84 :         info!(
     795            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     796            0 :             lsn_range.start,
     797            0 :             lsn_range.end,
     798            0 :             deltas_to_compact.len(),
     799            0 :             level0_deltas.len()
     800              :         );
     801              : 
     802         1206 :         for l in deltas_to_compact.iter() {
     803         1206 :             info!("compact includes {l}");
     804              :         }
     805              : 
     806              :         // We don't need the original list of layers anymore. Drop it so that
     807              :         // we don't accidentally use it later in the function.
     808           84 :         drop(level0_deltas);
     809           84 : 
     810           84 :         stats.read_lock_held_prerequisites_micros = stats
     811           84 :             .read_lock_held_spawn_blocking_startup_micros
     812           84 :             .till_now();
     813              : 
     814              :         // TODO: replace with streaming k-merge
     815           84 :         let all_keys = {
     816           84 :             let mut all_keys = Vec::new();
     817         1206 :             for l in deltas_to_compact.iter() {
     818         1206 :                 if self.cancel.is_cancelled() {
     819            0 :                     return Err(CompactionError::ShuttingDown);
     820         1206 :                 }
     821         7083 :                 all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
     822              :             }
     823              :             // The current stdlib sorting implementation is designed in a way where it is
     824              :             // particularly fast where the slice is made up of sorted sub-ranges.
     825     13271354 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     826           84 :             all_keys
     827           84 :         };
     828           84 : 
     829           84 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     830              : 
     831              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     832              :         //
     833              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     834              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     835              :         // cover the hole, but actually don't contain any WAL records for that key range.
     836              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     837              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     838              :         //
     839              :         // The algorithm chooses holes as follows.
     840              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     841              :         // - Filter: min threshold on range length
     842              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     843              :         //
     844              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     845              :         #[derive(PartialEq, Eq)]
     846              :         struct Hole {
     847              :             key_range: Range<Key>,
     848              :             coverage_size: usize,
     849              :         }
     850           84 :         let holes: Vec<Hole> = {
     851              :             use std::cmp::Ordering;
     852              :             impl Ord for Hole {
     853            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     854            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     855            0 :                 }
     856              :             }
     857              :             impl PartialOrd for Hole {
     858            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     859            0 :                     Some(self.cmp(other))
     860            0 :                 }
     861              :             }
     862           84 :             let max_holes = deltas_to_compact.len();
     863           84 :             let last_record_lsn = self.get_last_record_lsn();
     864           84 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     865           84 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     866           84 :                                             // min-heap (reserve space for one more element added before eviction)
     867           84 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     868           84 :             let mut prev: Option<Key> = None;
     869              : 
     870      6192114 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     871      6192114 :                 if let Some(prev_key) = prev {
     872              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     873              :                     // compaction is the gap between data key and metadata keys.
     874      6192030 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     875            0 :                         && !Key::is_metadata_key(&prev_key)
     876              :                     {
     877            0 :                         let key_range = prev_key..next_key;
     878            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     879            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     880            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     881            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     882            0 :                         let coverage_size =
     883            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     884            0 :                         if coverage_size >= min_hole_coverage_size {
     885            0 :                             heap.push(Hole {
     886            0 :                                 key_range,
     887            0 :                                 coverage_size,
     888            0 :                             });
     889            0 :                             if heap.len() > max_holes {
     890            0 :                                 heap.pop(); // remove smallest hole
     891            0 :                             }
     892            0 :                         }
     893      6192030 :                     }
     894           84 :                 }
     895      6192114 :                 prev = Some(next_key.next());
     896              :             }
     897           84 :             let mut holes = heap.into_vec();
     898           84 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
     899           84 :             holes
     900           84 :         };
     901           84 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     902           84 :         drop_rlock(guard);
     903           84 : 
     904           84 :         if self.cancel.is_cancelled() {
     905            0 :             return Err(CompactionError::ShuttingDown);
     906           84 :         }
     907           84 : 
     908           84 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     909              : 
     910              :         // This iterator walks through all key-value pairs from all the layers
     911              :         // we're compacting, in key, LSN order.
     912              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
     913              :         // then the Value::Image is ordered before Value::WalRecord.
     914           84 :         let mut all_values_iter = {
     915           84 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
     916         1206 :             for l in deltas_to_compact.iter() {
     917         1206 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     918         1206 :                 deltas.push(l);
     919              :             }
     920           84 :             MergeIterator::create(&deltas, &[], ctx)
     921           84 :         };
     922           84 : 
     923           84 :         // This iterator walks through all keys and is needed to calculate size used by each key
     924           84 :         let mut all_keys_iter = all_keys
     925           84 :             .iter()
     926      6192114 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     927      6192030 :             .coalesce(|mut prev, cur| {
     928      6192030 :                 // Coalesce keys that belong to the same key pair.
     929      6192030 :                 // This ensures that compaction doesn't put them
     930      6192030 :                 // into different layer files.
     931      6192030 :                 // Still limit this by the target file size,
     932      6192030 :                 // so that we keep the size of the files in
     933      6192030 :                 // check.
     934      6192030 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     935       120114 :                     prev.2 += cur.2;
     936       120114 :                     Ok(prev)
     937              :                 } else {
     938      6071916 :                     Err((prev, cur))
     939              :                 }
     940      6192030 :             });
     941           84 : 
     942           84 :         // Merge the contents of all the input delta layers into a new set
     943           84 :         // of delta layers, based on the current partitioning.
     944           84 :         //
     945           84 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     946           84 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     947           84 :         // would be too large. In that case, we also split on the LSN dimension.
     948           84 :         //
     949           84 :         // LSN
     950           84 :         //  ^
     951           84 :         //  |
     952           84 :         //  | +-----------+            +--+--+--+--+
     953           84 :         //  | |           |            |  |  |  |  |
     954           84 :         //  | +-----------+            |  |  |  |  |
     955           84 :         //  | |           |            |  |  |  |  |
     956           84 :         //  | +-----------+     ==>    |  |  |  |  |
     957           84 :         //  | |           |            |  |  |  |  |
     958           84 :         //  | +-----------+            |  |  |  |  |
     959           84 :         //  | |           |            |  |  |  |  |
     960           84 :         //  | +-----------+            +--+--+--+--+
     961           84 :         //  |
     962           84 :         //  +--------------> key
     963           84 :         //
     964           84 :         //
     965           84 :         // If one key (X) has a lot of page versions:
     966           84 :         //
     967           84 :         // LSN
     968           84 :         //  ^
     969           84 :         //  |                                 (X)
     970           84 :         //  | +-----------+            +--+--+--+--+
     971           84 :         //  | |           |            |  |  |  |  |
     972           84 :         //  | +-----------+            |  |  +--+  |
     973           84 :         //  | |           |            |  |  |  |  |
     974           84 :         //  | +-----------+     ==>    |  |  |  |  |
     975           84 :         //  | |           |            |  |  +--+  |
     976           84 :         //  | +-----------+            |  |  |  |  |
     977           84 :         //  | |           |            |  |  |  |  |
     978           84 :         //  | +-----------+            +--+--+--+--+
     979           84 :         //  |
     980           84 :         //  +--------------> key
     981           84 :         // TODO: this actually divides the layers into fixed-size chunks, not
     982           84 :         // based on the partitioning.
     983           84 :         //
     984           84 :         // TODO: we should also opportunistically materialize and
     985           84 :         // garbage collect what we can.
     986           84 :         let mut new_layers = Vec::new();
     987           84 :         let mut prev_key: Option<Key> = None;
     988           84 :         let mut writer: Option<DeltaLayerWriter> = None;
     989           84 :         let mut key_values_total_size = 0u64;
     990           84 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     991           84 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     992           84 :         let mut next_hole = 0; // index of next hole in holes vector
     993           84 : 
     994           84 :         let mut keys = 0;
     995              : 
     996      6192198 :         while let Some((key, lsn, value)) = all_values_iter
     997      6192198 :             .next()
     998        10248 :             .await
     999      6192198 :             .map_err(CompactionError::Other)?
    1000              :         {
    1001      6192114 :             keys += 1;
    1002      6192114 : 
    1003      6192114 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1004              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1005              :                 // shuffling an order of million keys per layer, this means we'll check it
    1006              :                 // around tens of times per layer.
    1007            0 :                 return Err(CompactionError::ShuttingDown);
    1008      6192114 :             }
    1009      6192114 : 
    1010      6192114 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    1011      6192114 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1012      6192114 :             if !same_key || lsn == dup_end_lsn {
    1013      6072000 :                 let mut next_key_size = 0u64;
    1014      6072000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1015      6072000 :                 dup_start_lsn = Lsn::INVALID;
    1016      6072000 :                 if !same_key {
    1017      6072000 :                     dup_end_lsn = Lsn::INVALID;
    1018      6072000 :                 }
    1019              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1020      6072000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1021      6072000 :                     next_key_size = next_size;
    1022      6072000 :                     if key != next_key {
    1023      6071916 :                         if dup_end_lsn.is_valid() {
    1024            0 :                             // We are writting segment with duplicates:
    1025            0 :                             // place all remaining values of this key in separate segment
    1026            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1027            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1028      6071916 :                         }
    1029      6071916 :                         break;
    1030           84 :                     }
    1031           84 :                     key_values_total_size += next_size;
    1032           84 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1033           84 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1034           84 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1035              :                         // Split key between multiple layers: such layer can contain only single key
    1036            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1037            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1038              :                         } else {
    1039            0 :                             lsn // start with the first LSN for this key
    1040              :                         };
    1041            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1042            0 :                         break;
    1043           84 :                     }
    1044              :                 }
    1045              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1046      6072000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1047            0 :                     dup_start_lsn = dup_end_lsn;
    1048            0 :                     dup_end_lsn = lsn_range.end;
    1049      6072000 :                 }
    1050      6072000 :                 if writer.is_some() {
    1051      6071916 :                     let written_size = writer.as_mut().unwrap().size();
    1052      6071916 :                     let contains_hole =
    1053      6071916 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1054              :                     // check if key cause layer overflow or contains hole...
    1055      6071916 :                     if is_dup_layer
    1056      6071916 :                         || dup_end_lsn.is_valid()
    1057      6071916 :                         || written_size + key_values_total_size > target_file_size
    1058      6071076 :                         || contains_hole
    1059              :                     {
    1060              :                         // ... if so, flush previous layer and prepare to write new one
    1061          840 :                         let (desc, path) = writer
    1062          840 :                             .take()
    1063          840 :                             .unwrap()
    1064          840 :                             .finish(prev_key.unwrap().next(), ctx)
    1065         2130 :                             .await
    1066          840 :                             .map_err(CompactionError::Other)?;
    1067          840 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1068          840 :                             .map_err(CompactionError::Other)?;
    1069              : 
    1070          840 :                         new_layers.push(new_delta);
    1071          840 :                         writer = None;
    1072          840 : 
    1073          840 :                         if contains_hole {
    1074            0 :                             // skip hole
    1075            0 :                             next_hole += 1;
    1076          840 :                         }
    1077      6071076 :                     }
    1078           84 :                 }
    1079              :                 // Remember size of key value because at next iteration we will access next item
    1080      6072000 :                 key_values_total_size = next_key_size;
    1081       120114 :             }
    1082      6192114 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1083            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1084            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1085            0 :                 )))
    1086      6192114 :             });
    1087              : 
    1088      6192114 :             if !self.shard_identity.is_key_disposable(&key) {
    1089      6192114 :                 if writer.is_none() {
    1090          924 :                     if self.cancel.is_cancelled() {
    1091              :                         // to be somewhat responsive to cancellation, check for each new layer
    1092            0 :                         return Err(CompactionError::ShuttingDown);
    1093          924 :                     }
    1094              :                     // Create writer if not initiaized yet
    1095          924 :                     writer = Some(
    1096              :                         DeltaLayerWriter::new(
    1097          924 :                             self.conf,
    1098          924 :                             self.timeline_id,
    1099          924 :                             self.tenant_shard_id,
    1100          924 :                             key,
    1101          924 :                             if dup_end_lsn.is_valid() {
    1102              :                                 // this is a layer containing slice of values of the same key
    1103            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1104            0 :                                 dup_start_lsn..dup_end_lsn
    1105              :                             } else {
    1106          924 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1107          924 :                                 lsn_range.clone()
    1108              :                             },
    1109          924 :                             ctx,
    1110              :                         )
    1111          462 :                         .await
    1112          924 :                         .map_err(CompactionError::Other)?,
    1113              :                     );
    1114              : 
    1115          924 :                     keys = 0;
    1116      6191190 :                 }
    1117              : 
    1118      6192114 :                 writer
    1119      6192114 :                     .as_mut()
    1120      6192114 :                     .unwrap()
    1121      6192114 :                     .put_value(key, lsn, value, ctx)
    1122         3676 :                     .await
    1123      6192114 :                     .map_err(CompactionError::Other)?;
    1124              :             } else {
    1125            0 :                 debug!(
    1126            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    1127            0 :                     key,
    1128            0 :                     self.shard_identity.get_shard_number(&key)
    1129              :                 );
    1130              :             }
    1131              : 
    1132      6192114 :             if !new_layers.is_empty() {
    1133        59358 :                 fail_point!("after-timeline-compacted-first-L1");
    1134      6132756 :             }
    1135              : 
    1136      6192114 :             prev_key = Some(key);
    1137              :         }
    1138           84 :         if let Some(writer) = writer {
    1139           84 :             let (desc, path) = writer
    1140           84 :                 .finish(prev_key.unwrap().next(), ctx)
    1141         5968 :                 .await
    1142           84 :                 .map_err(CompactionError::Other)?;
    1143           84 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1144           84 :                 .map_err(CompactionError::Other)?;
    1145           84 :             new_layers.push(new_delta);
    1146            0 :         }
    1147              : 
    1148              :         // Sync layers
    1149           84 :         if !new_layers.is_empty() {
    1150              :             // Print a warning if the created layer is larger than double the target size
    1151              :             // Add two pages for potential overhead. This should in theory be already
    1152              :             // accounted for in the target calculation, but for very small targets,
    1153              :             // we still might easily hit the limit otherwise.
    1154           84 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1155          924 :             for layer in new_layers.iter() {
    1156          924 :                 if layer.layer_desc().file_size > warn_limit {
    1157            0 :                     warn!(
    1158              :                         %layer,
    1159            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1160              :                     );
    1161          924 :                 }
    1162              :             }
    1163              : 
    1164              :             // The writer.finish() above already did the fsync of the inodes.
    1165              :             // We just need to fsync the directory in which these inodes are linked,
    1166              :             // which we know to be the timeline directory.
    1167              :             //
    1168              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1169              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1170              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1171           84 :             let timeline_dir = VirtualFile::open(
    1172           84 :                 &self
    1173           84 :                     .conf
    1174           84 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1175           84 :                 ctx,
    1176           84 :             )
    1177           42 :             .await
    1178           84 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1179           84 :             timeline_dir
    1180           84 :                 .sync_all()
    1181           42 :                 .await
    1182           84 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1183            0 :         }
    1184              : 
    1185           84 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1186           84 :         stats.new_deltas_count = Some(new_layers.len());
    1187          924 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1188           84 : 
    1189           84 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1190           84 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1191              :         {
    1192           84 :             Ok(stats_json) => {
    1193           84 :                 info!(
    1194            0 :                     stats_json = stats_json.as_str(),
    1195            0 :                     "compact_level0_phase1 stats available"
    1196              :                 )
    1197              :             }
    1198            0 :             Err(e) => {
    1199            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1200              :             }
    1201              :         }
    1202              : 
    1203              :         // Without this, rustc complains about deltas_to_compact still
    1204              :         // being borrowed when we `.into_iter()` below.
    1205           84 :         drop(all_values_iter);
    1206           84 : 
    1207           84 :         Ok(CompactLevel0Phase1Result {
    1208           84 :             new_layers,
    1209           84 :             deltas_to_compact: deltas_to_compact
    1210           84 :                 .into_iter()
    1211         1206 :                 .map(|x| x.drop_eviction_guard())
    1212           84 :                 .collect::<Vec<_>>(),
    1213           84 :             fully_compacted,
    1214           84 :         })
    1215         1092 :     }
    1216              : }
    1217              : 
    1218              : #[derive(Default)]
    1219              : struct CompactLevel0Phase1Result {
    1220              :     new_layers: Vec<ResidentLayer>,
    1221              :     deltas_to_compact: Vec<Layer>,
    1222              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1223              :     // L0 compaction size limit.
    1224              :     fully_compacted: bool,
    1225              : }
    1226              : 
    1227              : #[derive(Default)]
    1228              : struct CompactLevel0Phase1StatsBuilder {
    1229              :     version: Option<u64>,
    1230              :     tenant_id: Option<TenantShardId>,
    1231              :     timeline_id: Option<TimelineId>,
    1232              :     read_lock_acquisition_micros: DurationRecorder,
    1233              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1234              :     read_lock_held_key_sort_micros: DurationRecorder,
    1235              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1236              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1237              :     read_lock_drop_micros: DurationRecorder,
    1238              :     write_layer_files_micros: DurationRecorder,
    1239              :     level0_deltas_count: Option<usize>,
    1240              :     new_deltas_count: Option<usize>,
    1241              :     new_deltas_size: Option<u64>,
    1242              : }
    1243              : 
    1244              : #[derive(serde::Serialize)]
    1245              : struct CompactLevel0Phase1Stats {
    1246              :     version: u64,
    1247              :     tenant_id: TenantShardId,
    1248              :     timeline_id: TimelineId,
    1249              :     read_lock_acquisition_micros: RecordedDuration,
    1250              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1251              :     read_lock_held_key_sort_micros: RecordedDuration,
    1252              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1253              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1254              :     read_lock_drop_micros: RecordedDuration,
    1255              :     write_layer_files_micros: RecordedDuration,
    1256              :     level0_deltas_count: usize,
    1257              :     new_deltas_count: usize,
    1258              :     new_deltas_size: u64,
    1259              : }
    1260              : 
    1261              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1262              :     type Error = anyhow::Error;
    1263              : 
    1264           84 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1265           84 :         Ok(Self {
    1266           84 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1267           84 :             tenant_id: value
    1268           84 :                 .tenant_id
    1269           84 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1270           84 :             timeline_id: value
    1271           84 :                 .timeline_id
    1272           84 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1273           84 :             read_lock_acquisition_micros: value
    1274           84 :                 .read_lock_acquisition_micros
    1275           84 :                 .into_recorded()
    1276           84 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1277           84 :             read_lock_held_spawn_blocking_startup_micros: value
    1278           84 :                 .read_lock_held_spawn_blocking_startup_micros
    1279           84 :                 .into_recorded()
    1280           84 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1281           84 :             read_lock_held_key_sort_micros: value
    1282           84 :                 .read_lock_held_key_sort_micros
    1283           84 :                 .into_recorded()
    1284           84 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1285           84 :             read_lock_held_prerequisites_micros: value
    1286           84 :                 .read_lock_held_prerequisites_micros
    1287           84 :                 .into_recorded()
    1288           84 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1289           84 :             read_lock_held_compute_holes_micros: value
    1290           84 :                 .read_lock_held_compute_holes_micros
    1291           84 :                 .into_recorded()
    1292           84 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1293           84 :             read_lock_drop_micros: value
    1294           84 :                 .read_lock_drop_micros
    1295           84 :                 .into_recorded()
    1296           84 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1297           84 :             write_layer_files_micros: value
    1298           84 :                 .write_layer_files_micros
    1299           84 :                 .into_recorded()
    1300           84 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1301           84 :             level0_deltas_count: value
    1302           84 :                 .level0_deltas_count
    1303           84 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1304           84 :             new_deltas_count: value
    1305           84 :                 .new_deltas_count
    1306           84 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1307           84 :             new_deltas_size: value
    1308           84 :                 .new_deltas_size
    1309           84 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1310              :         })
    1311           84 :     }
    1312              : }
    1313              : 
    1314              : impl Timeline {
    1315              :     /// Entry point for new tiered compaction algorithm.
    1316              :     ///
    1317              :     /// All the real work is in the implementation in the pageserver_compaction
    1318              :     /// crate. The code here would apply to any algorithm implemented by the
    1319              :     /// same interface, but tiered is the only one at the moment.
    1320              :     ///
    1321              :     /// TODO: cancellation
    1322            0 :     pub(crate) async fn compact_tiered(
    1323            0 :         self: &Arc<Self>,
    1324            0 :         _cancel: &CancellationToken,
    1325            0 :         ctx: &RequestContext,
    1326            0 :     ) -> Result<(), CompactionError> {
    1327            0 :         let fanout = self.get_compaction_threshold() as u64;
    1328            0 :         let target_file_size = self.get_checkpoint_distance();
    1329              : 
    1330              :         // Find the top of the historical layers
    1331            0 :         let end_lsn = {
    1332            0 :             let guard = self.layers.read().await;
    1333            0 :             let layers = guard.layer_map()?;
    1334              : 
    1335            0 :             let l0_deltas = layers.level0_deltas();
    1336            0 : 
    1337            0 :             // As an optimization, if we find that there are too few L0 layers,
    1338            0 :             // bail out early. We know that the compaction algorithm would do
    1339            0 :             // nothing in that case.
    1340            0 :             if l0_deltas.len() < fanout as usize {
    1341              :                 // doesn't need compacting
    1342            0 :                 return Ok(());
    1343            0 :             }
    1344            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1345            0 :         };
    1346            0 : 
    1347            0 :         // Is the timeline being deleted?
    1348            0 :         if self.is_stopping() {
    1349            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1350            0 :             return Err(CompactionError::ShuttingDown);
    1351            0 :         }
    1352              : 
    1353            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1354              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1355            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1356            0 : 
    1357            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1358            0 :             &mut adaptor,
    1359            0 :             end_lsn,
    1360            0 :             target_file_size,
    1361            0 :             fanout,
    1362            0 :             ctx,
    1363            0 :         )
    1364            0 :         .await
    1365              :         // TODO: compact_tiered needs to return CompactionError
    1366            0 :         .map_err(CompactionError::Other)?;
    1367              : 
    1368            0 :         adaptor.flush_updates().await?;
    1369            0 :         Ok(())
    1370            0 :     }
    1371              : 
    1372              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1373              :     ///
    1374              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1375              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1376              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1377              :     ///
    1378              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1379              :     ///
    1380              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1381              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1382              :     ///
    1383              :     /// The function will produce:
    1384              :     ///
    1385              :     /// ```plain
    1386              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1387              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1388              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1389              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1390              :     /// ```
    1391              :     ///
    1392              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1393         1290 :     pub(crate) async fn generate_key_retention(
    1394         1290 :         self: &Arc<Timeline>,
    1395         1290 :         key: Key,
    1396         1290 :         full_history: &[(Key, Lsn, Value)],
    1397         1290 :         horizon: Lsn,
    1398         1290 :         retain_lsn_below_horizon: &[Lsn],
    1399         1290 :         delta_threshold_cnt: usize,
    1400         1290 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1401         1290 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1402         1290 :         // Pre-checks for the invariants
    1403         1290 :         if cfg!(debug_assertions) {
    1404         3120 :             for (log_key, _, _) in full_history {
    1405         1830 :                 assert_eq!(log_key, &key, "mismatched key");
    1406              :             }
    1407         1290 :             for i in 1..full_history.len() {
    1408          540 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1409          540 :                 if full_history[i - 1].1 == full_history[i].1 {
    1410            0 :                     assert!(
    1411            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1412            0 :                         "unordered delta/image, or duplicated delta"
    1413              :                     );
    1414          540 :                 }
    1415              :             }
    1416              :             // There was an assertion for no base image that checks if the first
    1417              :             // record in the history is `will_init` before, but it was removed.
    1418              :             // This is explained in the test cases for generate_key_retention.
    1419              :             // Search "incomplete history" for more information.
    1420         3000 :             for lsn in retain_lsn_below_horizon {
    1421         1710 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1422              :             }
    1423         1290 :             for i in 1..retain_lsn_below_horizon.len() {
    1424          834 :                 assert!(
    1425          834 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1426            0 :                     "unordered LSN"
    1427              :                 );
    1428              :             }
    1429            0 :         }
    1430         1290 :         let has_ancestor = base_img_from_ancestor.is_some();
    1431              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1432              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1433         1290 :         let (mut split_history, lsn_split_points) = {
    1434         1290 :             let mut split_history = Vec::new();
    1435         1290 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1436         1290 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1437         3000 :             for lsn in retain_lsn_below_horizon {
    1438         1710 :                 lsn_split_points.push(*lsn);
    1439         1710 :             }
    1440         1290 :             lsn_split_points.push(horizon);
    1441         1290 :             let mut current_idx = 0;
    1442         3120 :             for item @ (_, lsn, _) in full_history {
    1443         2316 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1444          486 :                     current_idx += 1;
    1445          486 :                 }
    1446         1830 :                 split_history[current_idx].push(item);
    1447              :             }
    1448         1290 :             (split_history, lsn_split_points)
    1449              :         };
    1450              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1451         5580 :         for split_for_lsn in &mut split_history {
    1452         4290 :             let mut prev_lsn = None;
    1453         4290 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1454         4290 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1455         1830 :                 if let Some(prev_lsn) = &prev_lsn {
    1456          198 :                     if *prev_lsn == lsn {
    1457              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1458              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1459              :                         // drop this delta and keep the image.
    1460              :                         //
    1461              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1462              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1463              :                         // dropped.
    1464              :                         //
    1465              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1466              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1467            0 :                         continue;
    1468          198 :                     }
    1469         1632 :                 }
    1470         1830 :                 prev_lsn = Some(lsn);
    1471         1830 :                 new_split_for_lsn.push(record);
    1472              :             }
    1473         4290 :             *split_for_lsn = new_split_for_lsn;
    1474              :         }
    1475              :         // Step 3: generate images when necessary
    1476         1290 :         let mut retention = Vec::with_capacity(split_history.len());
    1477         1290 :         let mut records_since_last_image = 0;
    1478         1290 :         let batch_cnt = split_history.len();
    1479         1290 :         assert!(
    1480         1290 :             batch_cnt >= 2,
    1481            0 :             "should have at least below + above horizon batches"
    1482              :         );
    1483         1290 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1484         1290 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1485           54 :             replay_history.push((key, lsn, Value::Image(img)));
    1486         1236 :         }
    1487              : 
    1488              :         /// Generate debug information for the replay history
    1489            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1490              :             use std::fmt::Write;
    1491            0 :             let mut output = String::new();
    1492            0 :             if let Some((key, _, _)) = replay_history.first() {
    1493            0 :                 write!(output, "key={} ", key).unwrap();
    1494            0 :                 let mut cnt = 0;
    1495            0 :                 for (_, lsn, val) in replay_history {
    1496            0 :                     if val.is_image() {
    1497            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1498            0 :                     } else if val.will_init() {
    1499            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1500            0 :                     } else {
    1501            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1502            0 :                     }
    1503            0 :                     cnt += 1;
    1504            0 :                     if cnt >= 128 {
    1505            0 :                         write!(output, "... and more").unwrap();
    1506            0 :                         break;
    1507            0 :                     }
    1508              :                 }
    1509            0 :             } else {
    1510            0 :                 write!(output, "<no history>").unwrap();
    1511            0 :             }
    1512            0 :             output
    1513            0 :         }
    1514              : 
    1515            0 :         fn generate_debug_trace(
    1516            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1517            0 :             full_history: &[(Key, Lsn, Value)],
    1518            0 :             lsns: &[Lsn],
    1519            0 :             horizon: Lsn,
    1520            0 :         ) -> String {
    1521              :             use std::fmt::Write;
    1522            0 :             let mut output = String::new();
    1523            0 :             if let Some(replay_history) = replay_history {
    1524            0 :                 writeln!(
    1525            0 :                     output,
    1526            0 :                     "replay_history: {}",
    1527            0 :                     generate_history_trace(replay_history)
    1528            0 :                 )
    1529            0 :                 .unwrap();
    1530            0 :             } else {
    1531            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1532            0 :             }
    1533            0 :             writeln!(
    1534            0 :                 output,
    1535            0 :                 "full_history: {}",
    1536            0 :                 generate_history_trace(full_history)
    1537            0 :             )
    1538            0 :             .unwrap();
    1539            0 :             writeln!(
    1540            0 :                 output,
    1541            0 :                 "when processing: [{}] horizon={}",
    1542            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1543            0 :                 horizon
    1544            0 :             )
    1545            0 :             .unwrap();
    1546            0 :             output
    1547            0 :         }
    1548              : 
    1549         4290 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1550              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1551         4290 :             records_since_last_image += split_for_lsn.len();
    1552         4290 :             let generate_image = if i == 0 && !has_ancestor {
    1553              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1554         1236 :                 true
    1555         3054 :             } else if i == batch_cnt - 1 {
    1556              :                 // Do not generate images for the last batch (above horizon)
    1557         1290 :                 false
    1558         1764 :             } else if records_since_last_image >= delta_threshold_cnt {
    1559              :                 // Generate images when there are too many records
    1560           18 :                 true
    1561              :             } else {
    1562         1746 :                 false
    1563              :             };
    1564         4290 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1565              :             // Only retain the items after the last image record
    1566         5274 :             for idx in (0..replay_history.len()).rev() {
    1567         5274 :                 if replay_history[idx].2.will_init() {
    1568         4290 :                     replay_history = replay_history[idx..].to_vec();
    1569         4290 :                     break;
    1570          984 :                 }
    1571              :             }
    1572         4290 :             if let Some((_, _, val)) = replay_history.first() {
    1573         4290 :                 if !val.will_init() {
    1574            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1575            0 :                         || {
    1576            0 :                             generate_debug_trace(
    1577            0 :                                 Some(&replay_history),
    1578            0 :                                 full_history,
    1579            0 :                                 retain_lsn_below_horizon,
    1580            0 :                                 horizon,
    1581            0 :                             )
    1582            0 :                         },
    1583            0 :                     );
    1584         4290 :                 }
    1585            0 :             }
    1586         4290 :             if generate_image && records_since_last_image > 0 {
    1587         1254 :                 records_since_last_image = 0;
    1588         1254 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1589         1254 :                     Some(replay_history.clone())
    1590              :                 } else {
    1591            0 :                     None
    1592              :                 };
    1593         1254 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1594         1254 :                 let history = std::mem::take(&mut replay_history);
    1595         1254 :                 let mut img = None;
    1596         1254 :                 let mut records = Vec::with_capacity(history.len());
    1597         1254 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1598         1254 :                     img = Some((*lsn, val.clone()));
    1599         1254 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1600          102 :                         let Value::WalRecord(rec) = val else {
    1601            0 :                             return Err(anyhow::anyhow!(
    1602            0 :                                 "invalid record, first record is image, expect walrecords"
    1603            0 :                             ))
    1604            0 :                             .with_context(|| {
    1605            0 :                                 generate_debug_trace(
    1606            0 :                                     replay_history_for_debug_ref,
    1607            0 :                                     full_history,
    1608            0 :                                     retain_lsn_below_horizon,
    1609            0 :                                     horizon,
    1610            0 :                                 )
    1611            0 :                             });
    1612              :                         };
    1613          102 :                         records.push((lsn, rec));
    1614              :                     }
    1615              :                 } else {
    1616            0 :                     for (_, lsn, val) in history.into_iter() {
    1617            0 :                         let Value::WalRecord(rec) = val else {
    1618            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1619            0 :                                 .with_context(|| generate_debug_trace(
    1620            0 :                                     replay_history_for_debug_ref,
    1621            0 :                                     full_history,
    1622            0 :                                     retain_lsn_below_horizon,
    1623            0 :                                     horizon,
    1624            0 :                                 ));
    1625              :                         };
    1626            0 :                         records.push((lsn, rec));
    1627              :                     }
    1628              :                 }
    1629         1254 :                 records.reverse();
    1630         1254 :                 let state = ValueReconstructState { img, records };
    1631         1254 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1632         1254 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1633         1254 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1634         1254 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1635         3036 :             } else {
    1636         3036 :                 let deltas = split_for_lsn
    1637         3036 :                     .iter()
    1638         3036 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1639         3036 :                     .collect_vec();
    1640         3036 :                 retention.push(deltas);
    1641         3036 :             }
    1642              :         }
    1643         1290 :         let mut result = Vec::with_capacity(retention.len());
    1644         1290 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1645         4290 :         for (idx, logs) in retention.into_iter().enumerate() {
    1646         4290 :             if idx == lsn_split_points.len() {
    1647         1290 :                 return Ok(KeyHistoryRetention {
    1648         1290 :                     below_horizon: result,
    1649         1290 :                     above_horizon: KeyLogAtLsn(logs),
    1650         1290 :                 });
    1651         3000 :             } else {
    1652         3000 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1653         3000 :             }
    1654              :         }
    1655            0 :         unreachable!("key retention is empty")
    1656         1290 :     }
    1657              : 
    1658              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1659              :     ///
    1660              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1661              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1662              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1663              :     /// and create delta layers with all deltas >= gc horizon.
    1664           78 :     pub(crate) async fn compact_with_gc(
    1665           78 :         self: &Arc<Self>,
    1666           78 :         cancel: &CancellationToken,
    1667           78 :         flags: EnumSet<CompactFlags>,
    1668           78 :         ctx: &RequestContext,
    1669           78 :     ) -> anyhow::Result<()> {
    1670              :         use std::collections::BTreeSet;
    1671              : 
    1672              :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1673              :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1674              :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1675              : 
    1676           78 :         let gc_lock = async {
    1677           78 :             tokio::select! {
    1678           78 :                 guard = self.gc_lock.lock() => Ok(guard),
    1679              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1680           78 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1681              :             }
    1682           78 :         };
    1683              : 
    1684           78 :         let gc_lock = crate::timed(
    1685           78 :             gc_lock,
    1686           78 :             "acquires gc lock",
    1687           78 :             std::time::Duration::from_secs(5),
    1688           78 :         )
    1689            3 :         .await?;
    1690              : 
    1691           78 :         let dry_run = flags.contains(CompactFlags::DryRun);
    1692           78 : 
    1693           78 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
    1694              : 
    1695           78 :         scopeguard::defer! {
    1696           78 :             info!("done enhanced gc bottom-most compaction");
    1697           78 :         };
    1698           78 : 
    1699           78 :         let mut stat = CompactionStatistics::default();
    1700              : 
    1701              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1702              :         // The layer selection has the following properties:
    1703              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1704              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1705           78 :         let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
    1706           78 :             let guard = self.layers.read().await;
    1707           78 :             let layers = guard.layer_map()?;
    1708           78 :             let gc_info = self.gc_info.read().unwrap();
    1709           78 :             let mut retain_lsns_below_horizon = Vec::new();
    1710           78 :             let gc_cutoff = gc_info.cutoffs.select_min();
    1711          102 :             for (lsn, _timeline_id) in &gc_info.retain_lsns {
    1712          102 :                 if lsn < &gc_cutoff {
    1713          102 :                     retain_lsns_below_horizon.push(*lsn);
    1714          102 :                 }
    1715              :             }
    1716           78 :             for lsn in gc_info.leases.keys() {
    1717            0 :                 if lsn < &gc_cutoff {
    1718            0 :                     retain_lsns_below_horizon.push(*lsn);
    1719            0 :                 }
    1720              :             }
    1721           78 :             let mut selected_layers = Vec::new();
    1722           78 :             drop(gc_info);
    1723              :             // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    1724           78 :             let Some(max_layer_lsn) = layers
    1725           78 :                 .iter_historic_layers()
    1726          300 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    1727          246 :                 .map(|desc| desc.get_lsn_range().end)
    1728           78 :                 .max()
    1729              :             else {
    1730            0 :                 info!("no layers to compact with gc");
    1731            0 :                 return Ok(());
    1732              :             };
    1733              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    1734              :             // layers to compact.
    1735          300 :             for desc in layers.iter_historic_layers() {
    1736          300 :                 if desc.get_lsn_range().end <= max_layer_lsn {
    1737          246 :                     selected_layers.push(guard.get_from_desc(&desc));
    1738          246 :                 }
    1739              :             }
    1740           78 :             if selected_layers.is_empty() {
    1741            0 :                 info!("no layers to compact with gc");
    1742            0 :                 return Ok(());
    1743           78 :             }
    1744           78 :             retain_lsns_below_horizon.sort();
    1745           78 :             (selected_layers, gc_cutoff, retain_lsns_below_horizon)
    1746              :         };
    1747           78 :         let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
    1748            6 :             Lsn(self.ancestor_lsn.0 + 1)
    1749              :         } else {
    1750           72 :             let res = retain_lsns_below_horizon
    1751           72 :                 .first()
    1752           72 :                 .copied()
    1753           72 :                 .unwrap_or(gc_cutoff);
    1754           72 :             if cfg!(debug_assertions) {
    1755           72 :                 assert_eq!(
    1756           72 :                     res,
    1757           72 :                     retain_lsns_below_horizon
    1758           72 :                         .iter()
    1759           72 :                         .min()
    1760           72 :                         .copied()
    1761           72 :                         .unwrap_or(gc_cutoff)
    1762           72 :                 );
    1763            0 :             }
    1764           72 :             res
    1765              :         };
    1766           78 :         info!(
    1767            0 :             "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
    1768            0 :             layer_selection.len(),
    1769              :             gc_cutoff,
    1770              :             lowest_retain_lsn
    1771              :         );
    1772              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1773              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    1774           78 :         let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
    1775          324 :         for layer in &layer_selection {
    1776          246 :             let desc = layer.layer_desc();
    1777          246 :             if desc.is_delta() {
    1778              :                 // ignore single-key layer files
    1779          138 :                 if desc.key_range.start.next() != desc.key_range.end {
    1780          102 :                     let lsn_range = &desc.lsn_range;
    1781          102 :                     lsn_split_point.insert(lsn_range.start);
    1782          102 :                     lsn_split_point.insert(lsn_range.end);
    1783          102 :                 }
    1784          138 :                 stat.visit_delta_layer(desc.file_size());
    1785          108 :             } else {
    1786          108 :                 stat.visit_image_layer(desc.file_size());
    1787          108 :             }
    1788              :         }
    1789          324 :         for layer in &layer_selection {
    1790          246 :             let desc = layer.layer_desc();
    1791          246 :             let key_range = &desc.key_range;
    1792          246 :             if desc.is_delta() && key_range.start.next() != key_range.end {
    1793          102 :                 let lsn_range = desc.lsn_range.clone();
    1794          102 :                 let intersects = lsn_split_point.range(lsn_range).collect_vec();
    1795          102 :                 if intersects.len() > 1 {
    1796            0 :                     bail!(
    1797            0 :                         "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
    1798            0 :                         desc.key(),
    1799            0 :                         intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
    1800            0 :                     );
    1801          102 :                 }
    1802          144 :             }
    1803              :         }
    1804              :         // The maximum LSN we are processing in this compaction loop
    1805           78 :         let end_lsn = layer_selection
    1806           78 :             .iter()
    1807          246 :             .map(|l| l.layer_desc().lsn_range.end)
    1808           78 :             .max()
    1809           78 :             .unwrap();
    1810           78 :         // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
    1811           78 :         // as an L0 layer.
    1812           78 :         let mut delta_layers = Vec::new();
    1813           78 :         let mut image_layers = Vec::new();
    1814           78 :         let mut downloaded_layers = Vec::new();
    1815          324 :         for layer in &layer_selection {
    1816          246 :             let resident_layer = layer.download_and_keep_resident().await?;
    1817          246 :             downloaded_layers.push(resident_layer);
    1818              :         }
    1819          324 :         for resident_layer in &downloaded_layers {
    1820          246 :             if resident_layer.layer_desc().is_delta() {
    1821          138 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    1822          138 :                 delta_layers.push(layer);
    1823              :             } else {
    1824          108 :                 let layer = resident_layer.get_as_image(ctx).await?;
    1825          108 :                 image_layers.push(layer);
    1826              :             }
    1827              :         }
    1828           78 :         let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
    1829           78 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1830           78 :         // Data of the same key.
    1831           78 :         let mut accumulated_values = Vec::new();
    1832           78 :         let mut last_key: Option<Key> = None;
    1833              : 
    1834              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    1835              :         // when some condition meet.
    1836           78 :         let mut image_layer_writer = if self.ancestor_timeline.is_none() {
    1837              :             Some(
    1838           72 :                 SplitImageLayerWriter::new(
    1839           72 :                     self.conf,
    1840           72 :                     self.timeline_id,
    1841           72 :                     self.tenant_shard_id,
    1842           72 :                     Key::MIN,
    1843           72 :                     lowest_retain_lsn,
    1844           72 :                     self.get_compaction_target_size(),
    1845           72 :                     ctx,
    1846           72 :                 )
    1847           36 :                 .await?,
    1848              :             )
    1849              :         } else {
    1850            6 :             None
    1851              :         };
    1852              : 
    1853           78 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    1854           78 :             self.conf,
    1855           78 :             self.timeline_id,
    1856           78 :             self.tenant_shard_id,
    1857           78 :             lowest_retain_lsn..end_lsn,
    1858           78 :             self.get_compaction_target_size(),
    1859           78 :         )
    1860            0 :         .await?;
    1861              : 
    1862              :         /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
    1863              :         ///
    1864              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    1865              :         /// is needed for reconstruction. This should be fixed in the future.
    1866              :         ///
    1867              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    1868              :         /// images.
    1869         1266 :         async fn get_ancestor_image(
    1870         1266 :             tline: &Arc<Timeline>,
    1871         1266 :             key: Key,
    1872         1266 :             ctx: &RequestContext,
    1873         1266 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    1874         1266 :             if tline.ancestor_timeline.is_none() {
    1875         1224 :                 return Ok(None);
    1876           42 :             };
    1877              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    1878              :             // as much existing code as possible.
    1879           42 :             let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
    1880           42 :             Ok(Some((key, tline.ancestor_lsn, img)))
    1881         1266 :         }
    1882              : 
    1883              :         // Actually, we can decide not to write to the image layer at all at this point because
    1884              :         // the key and LSN range are determined. However, to keep things simple here, we still
    1885              :         // create this writer, and discard the writer in the end.
    1886              : 
    1887         1758 :         while let Some((key, lsn, val)) = merge_iter.next().await? {
    1888         1680 :             if cancel.is_cancelled() {
    1889            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    1890         1680 :             }
    1891         1680 :             match val {
    1892         1260 :                 Value::Image(_) => stat.visit_image_key(&val),
    1893          420 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    1894              :             }
    1895         1680 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    1896          492 :                 if last_key.is_none() {
    1897           78 :                     last_key = Some(key);
    1898          414 :                 }
    1899          492 :                 accumulated_values.push((key, lsn, val));
    1900              :             } else {
    1901         1188 :                 let last_key = last_key.as_mut().unwrap();
    1902         1188 :                 stat.on_unique_key_visited();
    1903         1188 :                 let retention = self
    1904         1188 :                     .generate_key_retention(
    1905         1188 :                         *last_key,
    1906         1188 :                         &accumulated_values,
    1907         1188 :                         gc_cutoff,
    1908         1188 :                         &retain_lsns_below_horizon,
    1909         1188 :                         COMPACTION_DELTA_THRESHOLD,
    1910         1188 :                         get_ancestor_image(self, *last_key, ctx).await?,
    1911              :                     )
    1912            0 :                     .await?;
    1913              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1914         1188 :                 retention
    1915         1188 :                     .pipe_to(
    1916         1188 :                         *last_key,
    1917         1188 :                         self,
    1918         1188 :                         &mut delta_layer_writer,
    1919         1188 :                         image_layer_writer.as_mut(),
    1920         1188 :                         &mut stat,
    1921         1188 :                         dry_run,
    1922         1188 :                         ctx,
    1923         1188 :                     )
    1924         1203 :                     .await?;
    1925         1188 :                 accumulated_values.clear();
    1926         1188 :                 *last_key = key;
    1927         1188 :                 accumulated_values.push((key, lsn, val));
    1928              :             }
    1929              :         }
    1930              : 
    1931           78 :         let last_key = last_key.expect("no keys produced during compaction");
    1932           78 :         // TODO: move this part to the loop body
    1933           78 :         stat.on_unique_key_visited();
    1934           78 :         let retention = self
    1935           78 :             .generate_key_retention(
    1936           78 :                 last_key,
    1937           78 :                 &accumulated_values,
    1938           78 :                 gc_cutoff,
    1939           78 :                 &retain_lsns_below_horizon,
    1940           78 :                 COMPACTION_DELTA_THRESHOLD,
    1941           78 :                 get_ancestor_image(self, last_key, ctx).await?,
    1942              :             )
    1943            0 :             .await?;
    1944              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1945           78 :         retention
    1946           78 :             .pipe_to(
    1947           78 :                 last_key,
    1948           78 :                 self,
    1949           78 :                 &mut delta_layer_writer,
    1950           78 :                 image_layer_writer.as_mut(),
    1951           78 :                 &mut stat,
    1952           78 :                 dry_run,
    1953           78 :                 ctx,
    1954           78 :             )
    1955           72 :             .await?;
    1956              : 
    1957          114 :         let discard = |key: &PersistentLayerKey| {
    1958          114 :             let key = key.clone();
    1959          114 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    1960          114 :         };
    1961              : 
    1962           78 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    1963           72 :             if !dry_run {
    1964           60 :                 writer
    1965           60 :                     .finish_with_discard_fn(self, ctx, Key::MAX, discard)
    1966           72 :                     .await?
    1967              :             } else {
    1968           12 :                 let (layers, _) = writer.take()?;
    1969           12 :                 assert!(layers.is_empty(), "image layers produced in dry run mode?");
    1970           12 :                 Vec::new()
    1971              :             }
    1972              :         } else {
    1973            6 :             Vec::new()
    1974              :         };
    1975              : 
    1976           78 :         let produced_delta_layers = if !dry_run {
    1977           66 :             delta_layer_writer
    1978           66 :                 .finish_with_discard_fn(self, ctx, discard)
    1979           78 :                 .await?
    1980              :         } else {
    1981           12 :             let (layers, _) = delta_layer_writer.take()?;
    1982           12 :             assert!(layers.is_empty(), "delta layers produced in dry run mode?");
    1983           12 :             Vec::new()
    1984              :         };
    1985              : 
    1986           78 :         let mut compact_to = Vec::new();
    1987           78 :         let mut keep_layers = HashSet::new();
    1988           78 :         let produced_delta_layers_len = produced_delta_layers.len();
    1989           78 :         let produced_image_layers_len = produced_image_layers.len();
    1990          132 :         for action in produced_delta_layers {
    1991           54 :             match action {
    1992           30 :                 SplitWriterResult::Produced(layer) => {
    1993           30 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    1994           30 :                     compact_to.push(layer);
    1995           30 :                 }
    1996           24 :                 SplitWriterResult::Discarded(l) => {
    1997           24 :                     keep_layers.insert(l);
    1998           24 :                     stat.discard_delta_layer();
    1999           24 :                 }
    2000              :             }
    2001              :         }
    2002          138 :         for action in produced_image_layers {
    2003           60 :             match action {
    2004           36 :                 SplitWriterResult::Produced(layer) => {
    2005           36 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2006           36 :                     compact_to.push(layer);
    2007           36 :                 }
    2008           24 :                 SplitWriterResult::Discarded(l) => {
    2009           24 :                     keep_layers.insert(l);
    2010           24 :                     stat.discard_image_layer();
    2011           24 :                 }
    2012              :             }
    2013              :         }
    2014           78 :         let mut layer_selection = layer_selection;
    2015          246 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2016           78 : 
    2017           78 :         info!(
    2018            0 :             "gc-compaction statistics: {}",
    2019            0 :             serde_json::to_string(&stat)?
    2020              :         );
    2021              : 
    2022           78 :         if dry_run {
    2023           12 :             return Ok(());
    2024           66 :         }
    2025           66 : 
    2026           66 :         info!(
    2027            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2028            0 :             produced_delta_layers_len,
    2029            0 :             produced_image_layers_len,
    2030            0 :             layer_selection.len()
    2031              :         );
    2032              : 
    2033              :         // Step 3: Place back to the layer map.
    2034              :         {
    2035           66 :             let mut guard = self.layers.write().await;
    2036           66 :             guard
    2037           66 :                 .open_mut()?
    2038           66 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2039           66 :         };
    2040           66 :         self.remote_client
    2041           66 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2042              : 
    2043           66 :         drop(gc_lock);
    2044           66 : 
    2045           66 :         Ok(())
    2046           78 :     }
    2047              : }
    2048              : 
    2049              : struct TimelineAdaptor {
    2050              :     timeline: Arc<Timeline>,
    2051              : 
    2052              :     keyspace: (Lsn, KeySpace),
    2053              : 
    2054              :     new_deltas: Vec<ResidentLayer>,
    2055              :     new_images: Vec<ResidentLayer>,
    2056              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2057              : }
    2058              : 
    2059              : impl TimelineAdaptor {
    2060            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2061            0 :         Self {
    2062            0 :             timeline: timeline.clone(),
    2063            0 :             keyspace,
    2064            0 :             new_images: Vec::new(),
    2065            0 :             new_deltas: Vec::new(),
    2066            0 :             layers_to_delete: Vec::new(),
    2067            0 :         }
    2068            0 :     }
    2069              : 
    2070            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2071            0 :         let layers_to_delete = {
    2072            0 :             let guard = self.timeline.layers.read().await;
    2073            0 :             self.layers_to_delete
    2074            0 :                 .iter()
    2075            0 :                 .map(|x| guard.get_from_desc(x))
    2076            0 :                 .collect::<Vec<Layer>>()
    2077            0 :         };
    2078            0 :         self.timeline
    2079            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2080            0 :             .await?;
    2081              : 
    2082            0 :         self.timeline
    2083            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2084              : 
    2085            0 :         self.new_deltas.clear();
    2086            0 :         self.layers_to_delete.clear();
    2087            0 :         Ok(())
    2088            0 :     }
    2089              : }
    2090              : 
    2091              : #[derive(Clone)]
    2092              : struct ResidentDeltaLayer(ResidentLayer);
    2093              : #[derive(Clone)]
    2094              : struct ResidentImageLayer(ResidentLayer);
    2095              : 
    2096              : impl CompactionJobExecutor for TimelineAdaptor {
    2097              :     type Key = crate::repository::Key;
    2098              : 
    2099              :     type Layer = OwnArc<PersistentLayerDesc>;
    2100              :     type DeltaLayer = ResidentDeltaLayer;
    2101              :     type ImageLayer = ResidentImageLayer;
    2102              : 
    2103              :     type RequestContext = crate::context::RequestContext;
    2104              : 
    2105            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2106            0 :         self.timeline.get_shard_identity()
    2107            0 :     }
    2108              : 
    2109            0 :     async fn get_layers(
    2110            0 :         &mut self,
    2111            0 :         key_range: &Range<Key>,
    2112            0 :         lsn_range: &Range<Lsn>,
    2113            0 :         _ctx: &RequestContext,
    2114            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2115            0 :         self.flush_updates().await?;
    2116              : 
    2117            0 :         let guard = self.timeline.layers.read().await;
    2118            0 :         let layer_map = guard.layer_map()?;
    2119              : 
    2120            0 :         let result = layer_map
    2121            0 :             .iter_historic_layers()
    2122            0 :             .filter(|l| {
    2123            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2124            0 :             })
    2125            0 :             .map(OwnArc)
    2126            0 :             .collect();
    2127            0 :         Ok(result)
    2128            0 :     }
    2129              : 
    2130            0 :     async fn get_keyspace(
    2131            0 :         &mut self,
    2132            0 :         key_range: &Range<Key>,
    2133            0 :         lsn: Lsn,
    2134            0 :         _ctx: &RequestContext,
    2135            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2136            0 :         if lsn == self.keyspace.0 {
    2137            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2138            0 :                 &self.keyspace.1.ranges,
    2139            0 :                 key_range,
    2140            0 :             ))
    2141              :         } else {
    2142              :             // The current compaction implementation only ever requests the key space
    2143              :             // at the compaction end LSN.
    2144            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2145              :         }
    2146            0 :     }
    2147              : 
    2148            0 :     async fn downcast_delta_layer(
    2149            0 :         &self,
    2150            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2151            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2152            0 :         // this is a lot more complex than a simple downcast...
    2153            0 :         if layer.is_delta() {
    2154            0 :             let l = {
    2155            0 :                 let guard = self.timeline.layers.read().await;
    2156            0 :                 guard.get_from_desc(layer)
    2157              :             };
    2158            0 :             let result = l.download_and_keep_resident().await?;
    2159              : 
    2160            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2161              :         } else {
    2162            0 :             Ok(None)
    2163              :         }
    2164            0 :     }
    2165              : 
    2166            0 :     async fn create_image(
    2167            0 :         &mut self,
    2168            0 :         lsn: Lsn,
    2169            0 :         key_range: &Range<Key>,
    2170            0 :         ctx: &RequestContext,
    2171            0 :     ) -> anyhow::Result<()> {
    2172            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2173            0 :     }
    2174              : 
    2175            0 :     async fn create_delta(
    2176            0 :         &mut self,
    2177            0 :         lsn_range: &Range<Lsn>,
    2178            0 :         key_range: &Range<Key>,
    2179            0 :         input_layers: &[ResidentDeltaLayer],
    2180            0 :         ctx: &RequestContext,
    2181            0 :     ) -> anyhow::Result<()> {
    2182            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2183              : 
    2184            0 :         let mut all_entries = Vec::new();
    2185            0 :         for dl in input_layers.iter() {
    2186            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2187              :         }
    2188              : 
    2189              :         // The current stdlib sorting implementation is designed in a way where it is
    2190              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2191            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2192              : 
    2193            0 :         let mut writer = DeltaLayerWriter::new(
    2194            0 :             self.timeline.conf,
    2195            0 :             self.timeline.timeline_id,
    2196            0 :             self.timeline.tenant_shard_id,
    2197            0 :             key_range.start,
    2198            0 :             lsn_range.clone(),
    2199            0 :             ctx,
    2200            0 :         )
    2201            0 :         .await?;
    2202              : 
    2203            0 :         let mut dup_values = 0;
    2204            0 : 
    2205            0 :         // This iterator walks through all key-value pairs from all the layers
    2206            0 :         // we're compacting, in key, LSN order.
    2207            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2208              :         for &DeltaEntry {
    2209            0 :             key, lsn, ref val, ..
    2210            0 :         } in all_entries.iter()
    2211              :         {
    2212            0 :             if prev == Some((key, lsn)) {
    2213              :                 // This is a duplicate. Skip it.
    2214              :                 //
    2215              :                 // It can happen if compaction is interrupted after writing some
    2216              :                 // layers but not all, and we are compacting the range again.
    2217              :                 // The calculations in the algorithm assume that there are no
    2218              :                 // duplicates, so the math on targeted file size is likely off,
    2219              :                 // and we will create smaller files than expected.
    2220            0 :                 dup_values += 1;
    2221            0 :                 continue;
    2222            0 :             }
    2223              : 
    2224            0 :             let value = val.load(ctx).await?;
    2225              : 
    2226            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2227              : 
    2228            0 :             prev = Some((key, lsn));
    2229              :         }
    2230              : 
    2231            0 :         if dup_values > 0 {
    2232            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2233            0 :         }
    2234              : 
    2235            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2236            0 :             Err(anyhow::anyhow!(
    2237            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2238            0 :             ))
    2239            0 :         });
    2240              : 
    2241            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2242            0 :         let new_delta_layer =
    2243            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2244              : 
    2245            0 :         self.new_deltas.push(new_delta_layer);
    2246            0 :         Ok(())
    2247            0 :     }
    2248              : 
    2249            0 :     async fn delete_layer(
    2250            0 :         &mut self,
    2251            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2252            0 :         _ctx: &RequestContext,
    2253            0 :     ) -> anyhow::Result<()> {
    2254            0 :         self.layers_to_delete.push(layer.clone().0);
    2255            0 :         Ok(())
    2256            0 :     }
    2257              : }
    2258              : 
    2259              : impl TimelineAdaptor {
    2260            0 :     async fn create_image_impl(
    2261            0 :         &mut self,
    2262            0 :         lsn: Lsn,
    2263            0 :         key_range: &Range<Key>,
    2264            0 :         ctx: &RequestContext,
    2265            0 :     ) -> Result<(), CreateImageLayersError> {
    2266            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2267              : 
    2268            0 :         let image_layer_writer = ImageLayerWriter::new(
    2269            0 :             self.timeline.conf,
    2270            0 :             self.timeline.timeline_id,
    2271            0 :             self.timeline.tenant_shard_id,
    2272            0 :             key_range,
    2273            0 :             lsn,
    2274            0 :             ctx,
    2275            0 :         )
    2276            0 :         .await?;
    2277              : 
    2278            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2279            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2280            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2281            0 :             )))
    2282            0 :         });
    2283              : 
    2284            0 :         let keyspace = KeySpace {
    2285            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2286              :         };
    2287              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2288            0 :         let start = Key::MIN;
    2289              :         let ImageLayerCreationOutcome {
    2290            0 :             image,
    2291              :             next_start_key: _,
    2292            0 :         } = self
    2293            0 :             .timeline
    2294            0 :             .create_image_layer_for_rel_blocks(
    2295            0 :                 &keyspace,
    2296            0 :                 image_layer_writer,
    2297            0 :                 lsn,
    2298            0 :                 ctx,
    2299            0 :                 key_range.clone(),
    2300            0 :                 start,
    2301            0 :             )
    2302            0 :             .await?;
    2303              : 
    2304            0 :         if let Some(image_layer) = image {
    2305            0 :             self.new_images.push(image_layer);
    2306            0 :         }
    2307              : 
    2308            0 :         timer.stop_and_record();
    2309            0 : 
    2310            0 :         Ok(())
    2311            0 :     }
    2312              : }
    2313              : 
    2314              : impl CompactionRequestContext for crate::context::RequestContext {}
    2315              : 
    2316              : #[derive(Debug, Clone)]
    2317              : pub struct OwnArc<T>(pub Arc<T>);
    2318              : 
    2319              : impl<T> Deref for OwnArc<T> {
    2320              :     type Target = <Arc<T> as Deref>::Target;
    2321            0 :     fn deref(&self) -> &Self::Target {
    2322            0 :         &self.0
    2323            0 :     }
    2324              : }
    2325              : 
    2326              : impl<T> AsRef<T> for OwnArc<T> {
    2327            0 :     fn as_ref(&self) -> &T {
    2328            0 :         self.0.as_ref()
    2329            0 :     }
    2330              : }
    2331              : 
    2332              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2333            0 :     fn key_range(&self) -> &Range<Key> {
    2334            0 :         &self.key_range
    2335            0 :     }
    2336            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2337            0 :         &self.lsn_range
    2338            0 :     }
    2339            0 :     fn file_size(&self) -> u64 {
    2340            0 :         self.file_size
    2341            0 :     }
    2342            0 :     fn short_id(&self) -> std::string::String {
    2343            0 :         self.as_ref().short_id().to_string()
    2344            0 :     }
    2345            0 :     fn is_delta(&self) -> bool {
    2346            0 :         self.as_ref().is_delta()
    2347            0 :     }
    2348              : }
    2349              : 
    2350              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2351            0 :     fn key_range(&self) -> &Range<Key> {
    2352            0 :         &self.layer_desc().key_range
    2353            0 :     }
    2354            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2355            0 :         &self.layer_desc().lsn_range
    2356            0 :     }
    2357            0 :     fn file_size(&self) -> u64 {
    2358            0 :         self.layer_desc().file_size
    2359            0 :     }
    2360            0 :     fn short_id(&self) -> std::string::String {
    2361            0 :         self.layer_desc().short_id().to_string()
    2362            0 :     }
    2363            0 :     fn is_delta(&self) -> bool {
    2364            0 :         true
    2365            0 :     }
    2366              : }
    2367              : 
    2368              : use crate::tenant::timeline::DeltaEntry;
    2369              : 
    2370              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2371            0 :     fn key_range(&self) -> &Range<Key> {
    2372            0 :         &self.0.layer_desc().key_range
    2373            0 :     }
    2374            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2375            0 :         &self.0.layer_desc().lsn_range
    2376            0 :     }
    2377            0 :     fn file_size(&self) -> u64 {
    2378            0 :         self.0.layer_desc().file_size
    2379            0 :     }
    2380            0 :     fn short_id(&self) -> std::string::String {
    2381            0 :         self.0.layer_desc().short_id().to_string()
    2382            0 :     }
    2383            0 :     fn is_delta(&self) -> bool {
    2384            0 :         true
    2385            0 :     }
    2386              : }
    2387              : 
    2388              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2389              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2390              : 
    2391            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2392            0 :         self.0.load_keys(ctx).await
    2393            0 :     }
    2394              : }
    2395              : 
    2396              : impl CompactionLayer<Key> for ResidentImageLayer {
    2397            0 :     fn key_range(&self) -> &Range<Key> {
    2398            0 :         &self.0.layer_desc().key_range
    2399            0 :     }
    2400            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2401            0 :         &self.0.layer_desc().lsn_range
    2402            0 :     }
    2403            0 :     fn file_size(&self) -> u64 {
    2404            0 :         self.0.layer_desc().file_size
    2405            0 :     }
    2406            0 :     fn short_id(&self) -> std::string::String {
    2407            0 :         self.0.layer_desc().short_id().to_string()
    2408            0 :     }
    2409            0 :     fn is_delta(&self) -> bool {
    2410            0 :         false
    2411            0 :     }
    2412              : }
    2413              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta