LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 60.6 % 1689 1023
Test Date: 2024-09-20 16:14:12 Functions: 39.2 % 130 51

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::key::KEY_SIZE;
      23              : use pageserver_api::keyspace::ShardedRange;
      24              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      25              : use serde::Serialize;
      26              : use tokio_util::sync::CancellationToken;
      27              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      28              : use utils::id::TimelineId;
      29              : 
      30              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      31              : use crate::page_cache;
      32              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      33              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      34              : use crate::tenant::storage_layer::split_writer::{
      35              :     SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
      36              : };
      37              : use crate::tenant::storage_layer::{
      38              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      39              : };
      40              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      41              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      42              : use crate::tenant::timeline::{Layer, ResidentLayer};
      43              : use crate::tenant::DeltaLayer;
      44              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      45              : use pageserver_api::config::tenant_conf_defaults::{
      46              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      47              : };
      48              : 
      49              : use crate::keyspace::KeySpace;
      50              : use crate::repository::{Key, Value};
      51              : use crate::walrecord::NeonWalRecord;
      52              : 
      53              : use utils::lsn::Lsn;
      54              : 
      55              : use pageserver_compaction::helpers::overlaps_with;
      56              : use pageserver_compaction::interface::*;
      57              : 
      58              : use super::CompactionError;
      59              : 
      60              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      61              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      62              : 
      63              : /// The result of bottom-most compaction for a single key at each LSN.
      64              : #[derive(Debug)]
      65              : #[cfg_attr(test, derive(PartialEq))]
      66              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
      67              : 
      68              : /// The result of bottom-most compaction.
      69              : #[derive(Debug)]
      70              : #[cfg_attr(test, derive(PartialEq))]
      71              : pub(crate) struct KeyHistoryRetention {
      72              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
      73              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
      74              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
      75              :     pub(crate) above_horizon: KeyLogAtLsn,
      76              : }
      77              : 
      78              : impl KeyHistoryRetention {
      79              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
      80              :     ///
      81              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
      82              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
      83              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
      84              :     /// Then we delete branch @ 0x20.
      85              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
      86              :     /// And that wouldnt' change the shape of the layer.
      87              :     ///
      88              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
      89              :     ///
      90              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
      91          114 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
      92          114 :         if dry_run {
      93            0 :             return true;
      94          114 :         }
      95          114 :         let guard = tline.layers.read().await;
      96          114 :         if !guard.contains_key(key) {
      97           66 :             return false;
      98           48 :         }
      99           48 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     100           48 :         drop(guard);
     101           48 :         if layer_generation == tline.generation {
     102           48 :             info!(
     103              :                 key=%key,
     104              :                 ?layer_generation,
     105            0 :                 "discard layer due to duplicated layer key in the same generation",
     106              :             );
     107           48 :             true
     108              :         } else {
     109            0 :             false
     110              :         }
     111          114 :     }
     112              : 
     113              :     /// Pipe a history of a single key to the writers.
     114              :     ///
     115              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     116              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     117              :     #[allow(clippy::too_many_arguments)]
     118         1266 :     async fn pipe_to(
     119         1266 :         self,
     120         1266 :         key: Key,
     121         1266 :         tline: &Arc<Timeline>,
     122         1266 :         delta_writer: &mut SplitDeltaLayerWriter,
     123         1266 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     124         1266 :         stat: &mut CompactionStatistics,
     125         1266 :         dry_run: bool,
     126         1266 :         ctx: &RequestContext,
     127         1266 :     ) -> anyhow::Result<()> {
     128         1266 :         let mut first_batch = true;
     129         1266 :         let discard = |key: &PersistentLayerKey| {
     130            0 :             let key = key.clone();
     131            0 :             async move { Self::discard_key(&key, tline, dry_run).await }
     132            0 :         };
     133         4206 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     134         2940 :             if first_batch {
     135         1266 :                 if logs.len() == 1 && logs[0].1.is_image() {
     136         1224 :                     let Value::Image(img) = &logs[0].1 else {
     137            0 :                         unreachable!()
     138              :                     };
     139         1224 :                     stat.produce_image_key(img);
     140         1224 :                     if let Some(image_writer) = image_writer.as_mut() {
     141         1224 :                         image_writer
     142         1224 :                             .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
     143         1242 :                             .await?;
     144              :                     } else {
     145            0 :                         delta_writer
     146            0 :                             .put_value_with_discard_fn(
     147            0 :                                 key,
     148            0 :                                 cutoff_lsn,
     149            0 :                                 Value::Image(img.clone()),
     150            0 :                                 tline,
     151            0 :                                 ctx,
     152            0 :                                 discard,
     153            0 :                             )
     154            0 :                             .await?;
     155              :                     }
     156              :                 } else {
     157           84 :                     for (lsn, val) in logs {
     158           42 :                         stat.produce_key(&val);
     159           42 :                         delta_writer
     160           42 :                             .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     161            3 :                             .await?;
     162              :                     }
     163              :                 }
     164         1266 :                 first_batch = false;
     165              :             } else {
     166         1920 :                 for (lsn, val) in logs {
     167          246 :                     stat.produce_key(&val);
     168          246 :                     delta_writer
     169          246 :                         .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     170           24 :                         .await?;
     171              :                 }
     172              :             }
     173              :         }
     174         1266 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     175         1362 :         for (lsn, val) in above_horizon_logs {
     176           96 :             stat.produce_key(&val);
     177           96 :             delta_writer
     178           96 :                 .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
     179            6 :                 .await?;
     180              :         }
     181         1266 :         Ok(())
     182         1266 :     }
     183              : }
     184              : 
     185              : #[derive(Debug, Serialize, Default)]
     186              : struct CompactionStatisticsNumSize {
     187              :     num: u64,
     188              :     size: u64,
     189              : }
     190              : 
     191              : #[derive(Debug, Serialize, Default)]
     192              : pub struct CompactionStatistics {
     193              :     delta_layer_visited: CompactionStatisticsNumSize,
     194              :     image_layer_visited: CompactionStatisticsNumSize,
     195              :     delta_layer_produced: CompactionStatisticsNumSize,
     196              :     image_layer_produced: CompactionStatisticsNumSize,
     197              :     num_delta_layer_discarded: usize,
     198              :     num_image_layer_discarded: usize,
     199              :     num_unique_keys_visited: usize,
     200              :     wal_keys_visited: CompactionStatisticsNumSize,
     201              :     image_keys_visited: CompactionStatisticsNumSize,
     202              :     wal_produced: CompactionStatisticsNumSize,
     203              :     image_produced: CompactionStatisticsNumSize,
     204              : }
     205              : 
     206              : impl CompactionStatistics {
     207         2058 :     fn estimated_size_of_value(val: &Value) -> usize {
     208          798 :         match val {
     209         1260 :             Value::Image(img) => img.len(),
     210            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     211          798 :             _ => std::mem::size_of::<NeonWalRecord>(),
     212              :         }
     213         2058 :     }
     214         3288 :     fn estimated_size_of_key() -> usize {
     215         3288 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     216         3288 :     }
     217          138 :     fn visit_delta_layer(&mut self, size: u64) {
     218          138 :         self.delta_layer_visited.num += 1;
     219          138 :         self.delta_layer_visited.size += size;
     220          138 :     }
     221          108 :     fn visit_image_layer(&mut self, size: u64) {
     222          108 :         self.image_layer_visited.num += 1;
     223          108 :         self.image_layer_visited.size += size;
     224          108 :     }
     225         1266 :     fn on_unique_key_visited(&mut self) {
     226         1266 :         self.num_unique_keys_visited += 1;
     227         1266 :     }
     228          420 :     fn visit_wal_key(&mut self, val: &Value) {
     229          420 :         self.wal_keys_visited.num += 1;
     230          420 :         self.wal_keys_visited.size +=
     231          420 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     232          420 :     }
     233         1260 :     fn visit_image_key(&mut self, val: &Value) {
     234         1260 :         self.image_keys_visited.num += 1;
     235         1260 :         self.image_keys_visited.size +=
     236         1260 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     237         1260 :     }
     238          384 :     fn produce_key(&mut self, val: &Value) {
     239          384 :         match val {
     240            6 :             Value::Image(img) => self.produce_image_key(img),
     241          378 :             Value::WalRecord(_) => self.produce_wal_key(val),
     242              :         }
     243          384 :     }
     244          378 :     fn produce_wal_key(&mut self, val: &Value) {
     245          378 :         self.wal_produced.num += 1;
     246          378 :         self.wal_produced.size +=
     247          378 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     248          378 :     }
     249         1230 :     fn produce_image_key(&mut self, val: &Bytes) {
     250         1230 :         self.image_produced.num += 1;
     251         1230 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     252         1230 :     }
     253           24 :     fn discard_delta_layer(&mut self) {
     254           24 :         self.num_delta_layer_discarded += 1;
     255           24 :     }
     256           24 :     fn discard_image_layer(&mut self) {
     257           24 :         self.num_image_layer_discarded += 1;
     258           24 :     }
     259           30 :     fn produce_delta_layer(&mut self, size: u64) {
     260           30 :         self.delta_layer_produced.num += 1;
     261           30 :         self.delta_layer_produced.size += size;
     262           30 :     }
     263           36 :     fn produce_image_layer(&mut self, size: u64) {
     264           36 :         self.image_layer_produced.num += 1;
     265           36 :         self.image_layer_produced.size += size;
     266           36 :     }
     267              : }
     268              : 
     269              : impl Timeline {
     270              :     /// TODO: cancellation
     271              :     ///
     272              :     /// Returns whether the compaction has pending tasks.
     273         1092 :     pub(crate) async fn compact_legacy(
     274         1092 :         self: &Arc<Self>,
     275         1092 :         cancel: &CancellationToken,
     276         1092 :         flags: EnumSet<CompactFlags>,
     277         1092 :         ctx: &RequestContext,
     278         1092 :     ) -> Result<bool, CompactionError> {
     279         1092 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
     280            0 :             self.compact_with_gc(cancel, flags, ctx)
     281            0 :                 .await
     282            0 :                 .map_err(CompactionError::Other)?;
     283            0 :             return Ok(false);
     284         1092 :         }
     285         1092 : 
     286         1092 :         if flags.contains(CompactFlags::DryRun) {
     287            0 :             return Err(CompactionError::Other(anyhow!(
     288            0 :                 "dry-run mode is not supported for legacy compaction for now"
     289            0 :             )));
     290         1092 :         }
     291         1092 : 
     292         1092 :         // High level strategy for compaction / image creation:
     293         1092 :         //
     294         1092 :         // 1. First, calculate the desired "partitioning" of the
     295         1092 :         // currently in-use key space. The goal is to partition the
     296         1092 :         // key space into roughly fixed-size chunks, but also take into
     297         1092 :         // account any existing image layers, and try to align the
     298         1092 :         // chunk boundaries with the existing image layers to avoid
     299         1092 :         // too much churn. Also try to align chunk boundaries with
     300         1092 :         // relation boundaries.  In principle, we don't know about
     301         1092 :         // relation boundaries here, we just deal with key-value
     302         1092 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     303         1092 :         // map relations into key-value pairs. But in practice we know
     304         1092 :         // that 'field6' is the block number, and the fields 1-5
     305         1092 :         // identify a relation. This is just an optimization,
     306         1092 :         // though.
     307         1092 :         //
     308         1092 :         // 2. Once we know the partitioning, for each partition,
     309         1092 :         // decide if it's time to create a new image layer. The
     310         1092 :         // criteria is: there has been too much "churn" since the last
     311         1092 :         // image layer? The "churn" is fuzzy concept, it's a
     312         1092 :         // combination of too many delta files, or too much WAL in
     313         1092 :         // total in the delta file. Or perhaps: if creating an image
     314         1092 :         // file would allow to delete some older files.
     315         1092 :         //
     316         1092 :         // 3. After that, we compact all level0 delta files if there
     317         1092 :         // are too many of them.  While compacting, we also garbage
     318         1092 :         // collect any page versions that are no longer needed because
     319         1092 :         // of the new image layers we created in step 2.
     320         1092 :         //
     321         1092 :         // TODO: This high level strategy hasn't been implemented yet.
     322         1092 :         // Below are functions compact_level0() and create_image_layers()
     323         1092 :         // but they are a bit ad hoc and don't quite work like it's explained
     324         1092 :         // above. Rewrite it.
     325         1092 : 
     326         1092 :         // Is the timeline being deleted?
     327         1092 :         if self.is_stopping() {
     328            0 :             trace!("Dropping out of compaction on timeline shutdown");
     329            0 :             return Err(CompactionError::ShuttingDown);
     330         1092 :         }
     331         1092 : 
     332         1092 :         let target_file_size = self.get_checkpoint_distance();
     333              : 
     334              :         // Define partitioning schema if needed
     335              : 
     336              :         // FIXME: the match should only cover repartitioning, not the next steps
     337         1092 :         let (partition_count, has_pending_tasks) = match self
     338         1092 :             .repartition(
     339         1092 :                 self.get_last_record_lsn(),
     340         1092 :                 self.get_compaction_target_size(),
     341         1092 :                 flags,
     342         1092 :                 ctx,
     343         1092 :             )
     344        47922 :             .await
     345              :         {
     346         1092 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     347         1092 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     348         1092 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     349         1092 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     350         1092 :                     .build();
     351         1092 : 
     352         1092 :                 // 2. Compact
     353         1092 :                 let timer = self.metrics.compact_time_histo.start_timer();
     354        29654 :                 let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
     355         1092 :                 timer.stop_and_record();
     356         1092 : 
     357         1092 :                 let mut partitioning = dense_partitioning;
     358         1092 :                 partitioning
     359         1092 :                     .parts
     360         1092 :                     .extend(sparse_partitioning.into_dense().parts);
     361         1092 : 
     362         1092 :                 // 3. Create new image layers for partitions that have been modified
     363         1092 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     364         1092 :                 if fully_compacted {
     365         1092 :                     let image_layers = self
     366         1092 :                         .create_image_layers(
     367         1092 :                             &partitioning,
     368         1092 :                             lsn,
     369         1092 :                             if flags.contains(CompactFlags::ForceImageLayerCreation) {
     370           42 :                                 ImageLayerCreationMode::Force
     371              :                             } else {
     372         1050 :                                 ImageLayerCreationMode::Try
     373              :                             },
     374         1092 :                             &image_ctx,
     375              :                         )
     376        40939 :                         .await?;
     377              : 
     378         1092 :                     self.upload_new_image_layers(image_layers)?;
     379              :                 } else {
     380            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     381              :                 }
     382         1092 :                 (partitioning.parts.len(), !fully_compacted)
     383              :             }
     384            0 :             Err(err) => {
     385            0 :                 // no partitioning? This is normal, if the timeline was just created
     386            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     387            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     388            0 :                 // error but continue.
     389            0 :                 //
     390            0 :                 // Suppress error when it's due to cancellation
     391            0 :                 if !self.cancel.is_cancelled() {
     392            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     393            0 :                 }
     394            0 :                 (1, false)
     395              :             }
     396              :         };
     397              : 
     398         1092 :         if self.shard_identity.count >= ShardCount::new(2) {
     399              :             // Limit the number of layer rewrites to the number of partitions: this means its
     400              :             // runtime should be comparable to a full round of image layer creations, rather than
     401              :             // being potentially much longer.
     402            0 :             let rewrite_max = partition_count;
     403            0 : 
     404            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     405         1092 :         }
     406              : 
     407         1092 :         Ok(has_pending_tasks)
     408         1092 :     }
     409              : 
     410              :     /// Check for layers that are elegible to be rewritten:
     411              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     412              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     413              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     414              :     ///   rather not maintain compatibility with indefinitely.
     415              :     ///
     416              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     417              :     /// how much work it will try to do in each compaction pass.
     418            0 :     async fn compact_shard_ancestors(
     419            0 :         self: &Arc<Self>,
     420            0 :         rewrite_max: usize,
     421            0 :         ctx: &RequestContext,
     422            0 :     ) -> Result<(), CompactionError> {
     423            0 :         let mut drop_layers = Vec::new();
     424            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     425            0 : 
     426            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     427            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     428            0 :         // pitr_interval, for example because a branchpoint references it.
     429            0 :         //
     430            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     431            0 :         // are rewriting layers.
     432            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     433            0 : 
     434            0 :         tracing::info!(
     435            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     436            0 :             *latest_gc_cutoff,
     437            0 :             self.gc_info.read().unwrap().cutoffs.time
     438              :         );
     439              : 
     440            0 :         let layers = self.layers.read().await;
     441            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     442            0 :             let layer = layers.get_from_desc(&layer_desc);
     443            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     444              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     445            0 :                 continue;
     446            0 :             }
     447            0 : 
     448            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     449            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     450            0 :             let layer_local_page_count = sharded_range.page_count();
     451            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     452            0 :             if layer_local_page_count == 0 {
     453              :                 // This ancestral layer only covers keys that belong to other shards.
     454              :                 // We include the full metadata in the log: if we had some critical bug that caused
     455              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     456            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     457            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     458              :                 );
     459              : 
     460            0 :                 if cfg!(debug_assertions) {
     461              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     462              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     463              :                     // should be !is_key_disposable()
     464            0 :                     let range = layer_desc.get_key_range();
     465            0 :                     let mut key = range.start;
     466            0 :                     while key < range.end {
     467            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     468            0 :                         key = key.next();
     469              :                     }
     470            0 :                 }
     471              : 
     472            0 :                 drop_layers.push(layer);
     473            0 :                 continue;
     474            0 :             } else if layer_local_page_count != u32::MAX
     475            0 :                 && layer_local_page_count == layer_raw_page_count
     476              :             {
     477            0 :                 debug!(%layer,
     478            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     479              :                     layer_local_page_count
     480              :                 );
     481            0 :                 continue;
     482            0 :             }
     483            0 : 
     484            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     485            0 :             if layer_local_page_count != u32::MAX
     486            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     487              :             {
     488            0 :                 debug!(%layer,
     489            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     490              :                     layer_local_page_count,
     491              :                     layer_raw_page_count
     492              :                 );
     493            0 :             }
     494              : 
     495              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     496              :             // without incurring the I/O cost of a rewrite.
     497            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     498            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     499            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     500            0 :                 continue;
     501            0 :             }
     502            0 : 
     503            0 :             if layer_desc.is_delta() {
     504              :                 // We do not yet implement rewrite of delta layers
     505            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     506            0 :                 continue;
     507            0 :             }
     508            0 : 
     509            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     510            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     511            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     512            0 :             if layer.metadata().generation == self.generation {
     513            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     514            0 :                 continue;
     515            0 :             }
     516            0 : 
     517            0 :             if layers_to_rewrite.len() >= rewrite_max {
     518            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     519            0 :                     layers_to_rewrite.len()
     520              :                 );
     521            0 :                 continue;
     522            0 :             }
     523            0 : 
     524            0 :             // Fall through: all our conditions for doing a rewrite passed.
     525            0 :             layers_to_rewrite.push(layer);
     526              :         }
     527              : 
     528              :         // Drop read lock on layer map before we start doing time-consuming I/O
     529            0 :         drop(layers);
     530            0 : 
     531            0 :         let mut replace_image_layers = Vec::new();
     532              : 
     533            0 :         for layer in layers_to_rewrite {
     534            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     535            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     536            0 :                 self.conf,
     537            0 :                 self.timeline_id,
     538            0 :                 self.tenant_shard_id,
     539            0 :                 &layer.layer_desc().key_range,
     540            0 :                 layer.layer_desc().image_layer_lsn(),
     541            0 :                 ctx,
     542            0 :             )
     543            0 :             .await
     544            0 :             .map_err(CompactionError::Other)?;
     545              : 
     546              :             // Safety of layer rewrites:
     547              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     548              :             //   cannot interfere with the new one.
     549              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     550              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     551              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     552              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     553              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     554              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     555              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     556              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     557              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     558              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     559            0 :             let resident = layer.download_and_keep_resident().await?;
     560              : 
     561            0 :             let keys_written = resident
     562            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     563            0 :                 .await?;
     564              : 
     565            0 :             if keys_written > 0 {
     566            0 :                 let (desc, path) = image_layer_writer
     567            0 :                     .finish(ctx)
     568            0 :                     .await
     569            0 :                     .map_err(CompactionError::Other)?;
     570            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     571            0 :                     .map_err(CompactionError::Other)?;
     572            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     573            0 :                     layer.metadata().file_size,
     574            0 :                     new_layer.metadata().file_size);
     575              : 
     576            0 :                 replace_image_layers.push((layer, new_layer));
     577            0 :             } else {
     578            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     579            0 :                 // the layer has no data for us with the ShardedRange check above, but
     580            0 :                 drop_layers.push(layer);
     581            0 :             }
     582              :         }
     583              : 
     584              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     585              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     586              :         // to remote index) and be removed. This is inefficient but safe.
     587            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     588            0 : 
     589            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     590            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     591            0 :             .await?;
     592              : 
     593            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     594            0 : 
     595            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     596            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     597            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     598            0 :         // load.
     599            0 :         match self.remote_client.wait_completion().await {
     600            0 :             Ok(()) => (),
     601            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     602              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     603            0 :                 return Err(CompactionError::ShuttingDown)
     604              :             }
     605              :         }
     606              : 
     607            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     608            0 : 
     609            0 :         Ok(())
     610            0 :     }
     611              : 
     612              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     613              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     614              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     615              :     ///
     616              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     617              :     /// that we know won't be needed for reads.
     618          588 :     pub(super) async fn update_layer_visibility(
     619          588 :         &self,
     620          588 :     ) -> Result<(), super::layer_manager::Shutdown> {
     621          588 :         let head_lsn = self.get_last_record_lsn();
     622              : 
     623              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     624              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     625              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     626              :         // they will be subject to L0->L1 compaction in the near future.
     627          588 :         let layer_manager = self.layers.read().await;
     628          588 :         let layer_map = layer_manager.layer_map()?;
     629              : 
     630          588 :         let readable_points = {
     631          588 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     632          588 : 
     633          588 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     634          588 :             for (child_lsn, _child_timeline_id) in &children {
     635            0 :                 readable_points.push(*child_lsn);
     636            0 :             }
     637          588 :             readable_points.push(head_lsn);
     638          588 :             readable_points
     639          588 :         };
     640          588 : 
     641          588 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     642         1512 :         for (layer_desc, visibility) in layer_visibility {
     643          924 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     644          924 :             let layer = layer_manager.get_from_desc(&layer_desc);
     645          924 :             layer.set_visibility(visibility);
     646          924 :         }
     647              : 
     648              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     649              :         // avoid assuming that everything at a branch point is visible.
     650          588 :         drop(covered);
     651          588 :         Ok(())
     652          588 :     }
     653              : 
     654              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     655              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     656         1092 :     async fn compact_level0(
     657         1092 :         self: &Arc<Self>,
     658         1092 :         target_file_size: u64,
     659         1092 :         ctx: &RequestContext,
     660         1092 :     ) -> Result<bool, CompactionError> {
     661              :         let CompactLevel0Phase1Result {
     662         1092 :             new_layers,
     663         1092 :             deltas_to_compact,
     664         1092 :             fully_compacted,
     665              :         } = {
     666         1092 :             let phase1_span = info_span!("compact_level0_phase1");
     667         1092 :             let ctx = ctx.attached_child();
     668         1092 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     669         1092 :                 version: Some(2),
     670         1092 :                 tenant_id: Some(self.tenant_shard_id),
     671         1092 :                 timeline_id: Some(self.timeline_id),
     672         1092 :                 ..Default::default()
     673         1092 :             };
     674         1092 : 
     675         1092 :             let begin = tokio::time::Instant::now();
     676         1092 :             let phase1_layers_locked = self.layers.read().await;
     677         1092 :             let now = tokio::time::Instant::now();
     678         1092 :             stats.read_lock_acquisition_micros =
     679         1092 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     680         1092 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     681         1092 :                 .instrument(phase1_span)
     682        29651 :                 .await?
     683              :         };
     684              : 
     685         1092 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     686              :             // nothing to do
     687         1008 :             return Ok(true);
     688           84 :         }
     689           84 : 
     690           84 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     691            3 :             .await?;
     692           84 :         Ok(fully_compacted)
     693         1092 :     }
     694              : 
     695              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     696         1092 :     async fn compact_level0_phase1<'a>(
     697         1092 :         self: &'a Arc<Self>,
     698         1092 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     699         1092 :         mut stats: CompactLevel0Phase1StatsBuilder,
     700         1092 :         target_file_size: u64,
     701         1092 :         ctx: &RequestContext,
     702         1092 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     703         1092 :         stats.read_lock_held_spawn_blocking_startup_micros =
     704         1092 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     705         1092 :         let layers = guard.layer_map()?;
     706         1092 :         let level0_deltas = layers.level0_deltas();
     707         1092 :         stats.level0_deltas_count = Some(level0_deltas.len());
     708         1092 : 
     709         1092 :         // Only compact if enough layers have accumulated.
     710         1092 :         let threshold = self.get_compaction_threshold();
     711         1092 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     712         1008 :             debug!(
     713            0 :                 level0_deltas = level0_deltas.len(),
     714            0 :                 threshold, "too few deltas to compact"
     715              :             );
     716         1008 :             return Ok(CompactLevel0Phase1Result::default());
     717           84 :         }
     718           84 : 
     719           84 :         let mut level0_deltas = level0_deltas
     720           84 :             .iter()
     721         1206 :             .map(|x| guard.get_from_desc(x))
     722           84 :             .collect::<Vec<_>>();
     723           84 : 
     724           84 :         // Gather the files to compact in this iteration.
     725           84 :         //
     726           84 :         // Start with the oldest Level 0 delta file, and collect any other
     727           84 :         // level 0 files that form a contiguous sequence, such that the end
     728           84 :         // LSN of previous file matches the start LSN of the next file.
     729           84 :         //
     730           84 :         // Note that if the files don't form such a sequence, we might
     731           84 :         // "compact" just a single file. That's a bit pointless, but it allows
     732           84 :         // us to get rid of the level 0 file, and compact the other files on
     733           84 :         // the next iteration. This could probably made smarter, but such
     734           84 :         // "gaps" in the sequence of level 0 files should only happen in case
     735           84 :         // of a crash, partial download from cloud storage, or something like
     736           84 :         // that, so it's not a big deal in practice.
     737         2244 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     738           84 :         let mut level0_deltas_iter = level0_deltas.iter();
     739           84 : 
     740           84 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     741           84 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     742           84 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     743           84 : 
     744           84 :         // Accumulate the size of layers in `deltas_to_compact`
     745           84 :         let mut deltas_to_compact_bytes = 0;
     746           84 : 
     747           84 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     748           84 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     749           84 :         // work in this function to only operate on this much delta data at once.
     750           84 :         //
     751           84 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     752           84 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     753           84 :         // still let them compact a full stack of L0s in one go.
     754           84 :         let delta_size_limit = std::cmp::max(
     755           84 :             self.get_compaction_threshold(),
     756           84 :             DEFAULT_COMPACTION_THRESHOLD,
     757           84 :         ) as u64
     758           84 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     759           84 : 
     760           84 :         let mut fully_compacted = true;
     761           84 : 
     762           84 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     763         1206 :         for l in level0_deltas_iter {
     764         1122 :             let lsn_range = &l.layer_desc().lsn_range;
     765         1122 : 
     766         1122 :             if lsn_range.start != prev_lsn_end {
     767            0 :                 break;
     768         1122 :             }
     769         1122 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     770         1122 :             deltas_to_compact_bytes += l.metadata().file_size;
     771         1122 :             prev_lsn_end = lsn_range.end;
     772         1122 : 
     773         1122 :             if deltas_to_compact_bytes >= delta_size_limit {
     774            0 :                 info!(
     775            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     776            0 :                     l0_deltas_total = level0_deltas.len(),
     777            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     778              :                     delta_size_limit
     779              :                 );
     780            0 :                 fully_compacted = false;
     781            0 : 
     782            0 :                 // Proceed with compaction, but only a subset of L0s
     783            0 :                 break;
     784         1122 :             }
     785              :         }
     786           84 :         let lsn_range = Range {
     787           84 :             start: deltas_to_compact
     788           84 :                 .first()
     789           84 :                 .unwrap()
     790           84 :                 .layer_desc()
     791           84 :                 .lsn_range
     792           84 :                 .start,
     793           84 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     794           84 :         };
     795           84 : 
     796           84 :         info!(
     797            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     798            0 :             lsn_range.start,
     799            0 :             lsn_range.end,
     800            0 :             deltas_to_compact.len(),
     801            0 :             level0_deltas.len()
     802              :         );
     803              : 
     804         1206 :         for l in deltas_to_compact.iter() {
     805         1206 :             info!("compact includes {l}");
     806              :         }
     807              : 
     808              :         // We don't need the original list of layers anymore. Drop it so that
     809              :         // we don't accidentally use it later in the function.
     810           84 :         drop(level0_deltas);
     811           84 : 
     812           84 :         stats.read_lock_held_prerequisites_micros = stats
     813           84 :             .read_lock_held_spawn_blocking_startup_micros
     814           84 :             .till_now();
     815              : 
     816              :         // TODO: replace with streaming k-merge
     817           84 :         let all_keys = {
     818           84 :             let mut all_keys = Vec::new();
     819         1206 :             for l in deltas_to_compact.iter() {
     820         1206 :                 if self.cancel.is_cancelled() {
     821            0 :                     return Err(CompactionError::ShuttingDown);
     822         1206 :                 }
     823         7083 :                 all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
     824              :             }
     825              :             // The current stdlib sorting implementation is designed in a way where it is
     826              :             // particularly fast where the slice is made up of sorted sub-ranges.
     827     13271402 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     828           84 :             all_keys
     829           84 :         };
     830           84 : 
     831           84 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     832              : 
     833              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     834              :         //
     835              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     836              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     837              :         // cover the hole, but actually don't contain any WAL records for that key range.
     838              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     839              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     840              :         //
     841              :         // The algorithm chooses holes as follows.
     842              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     843              :         // - Filter: min threshold on range length
     844              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     845              :         //
     846              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     847              :         #[derive(PartialEq, Eq)]
     848              :         struct Hole {
     849              :             key_range: Range<Key>,
     850              :             coverage_size: usize,
     851              :         }
     852           84 :         let holes: Vec<Hole> = {
     853              :             use std::cmp::Ordering;
     854              :             impl Ord for Hole {
     855            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     856            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     857            0 :                 }
     858              :             }
     859              :             impl PartialOrd for Hole {
     860            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     861            0 :                     Some(self.cmp(other))
     862            0 :                 }
     863              :             }
     864           84 :             let max_holes = deltas_to_compact.len();
     865           84 :             let last_record_lsn = self.get_last_record_lsn();
     866           84 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     867           84 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     868           84 :                                             // min-heap (reserve space for one more element added before eviction)
     869           84 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     870           84 :             let mut prev: Option<Key> = None;
     871              : 
     872      6192114 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     873      6192114 :                 if let Some(prev_key) = prev {
     874              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     875              :                     // compaction is the gap between data key and metadata keys.
     876      6192030 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     877            0 :                         && !Key::is_metadata_key(&prev_key)
     878              :                     {
     879            0 :                         let key_range = prev_key..next_key;
     880            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     881            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     882            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     883            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     884            0 :                         let coverage_size =
     885            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     886            0 :                         if coverage_size >= min_hole_coverage_size {
     887            0 :                             heap.push(Hole {
     888            0 :                                 key_range,
     889            0 :                                 coverage_size,
     890            0 :                             });
     891            0 :                             if heap.len() > max_holes {
     892            0 :                                 heap.pop(); // remove smallest hole
     893            0 :                             }
     894            0 :                         }
     895      6192030 :                     }
     896           84 :                 }
     897      6192114 :                 prev = Some(next_key.next());
     898              :             }
     899           84 :             let mut holes = heap.into_vec();
     900           84 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
     901           84 :             holes
     902           84 :         };
     903           84 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     904           84 :         drop_rlock(guard);
     905           84 : 
     906           84 :         if self.cancel.is_cancelled() {
     907            0 :             return Err(CompactionError::ShuttingDown);
     908           84 :         }
     909           84 : 
     910           84 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     911              : 
     912              :         // This iterator walks through all key-value pairs from all the layers
     913              :         // we're compacting, in key, LSN order.
     914              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
     915              :         // then the Value::Image is ordered before Value::WalRecord.
     916           84 :         let mut all_values_iter = {
     917           84 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
     918         1206 :             for l in deltas_to_compact.iter() {
     919         1206 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     920         1206 :                 deltas.push(l);
     921              :             }
     922           84 :             MergeIterator::create(&deltas, &[], ctx)
     923           84 :         };
     924           84 : 
     925           84 :         // This iterator walks through all keys and is needed to calculate size used by each key
     926           84 :         let mut all_keys_iter = all_keys
     927           84 :             .iter()
     928      6192114 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     929      6192030 :             .coalesce(|mut prev, cur| {
     930      6192030 :                 // Coalesce keys that belong to the same key pair.
     931      6192030 :                 // This ensures that compaction doesn't put them
     932      6192030 :                 // into different layer files.
     933      6192030 :                 // Still limit this by the target file size,
     934      6192030 :                 // so that we keep the size of the files in
     935      6192030 :                 // check.
     936      6192030 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     937       120114 :                     prev.2 += cur.2;
     938       120114 :                     Ok(prev)
     939              :                 } else {
     940      6071916 :                     Err((prev, cur))
     941              :                 }
     942      6192030 :             });
     943           84 : 
     944           84 :         // Merge the contents of all the input delta layers into a new set
     945           84 :         // of delta layers, based on the current partitioning.
     946           84 :         //
     947           84 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     948           84 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     949           84 :         // would be too large. In that case, we also split on the LSN dimension.
     950           84 :         //
     951           84 :         // LSN
     952           84 :         //  ^
     953           84 :         //  |
     954           84 :         //  | +-----------+            +--+--+--+--+
     955           84 :         //  | |           |            |  |  |  |  |
     956           84 :         //  | +-----------+            |  |  |  |  |
     957           84 :         //  | |           |            |  |  |  |  |
     958           84 :         //  | +-----------+     ==>    |  |  |  |  |
     959           84 :         //  | |           |            |  |  |  |  |
     960           84 :         //  | +-----------+            |  |  |  |  |
     961           84 :         //  | |           |            |  |  |  |  |
     962           84 :         //  | +-----------+            +--+--+--+--+
     963           84 :         //  |
     964           84 :         //  +--------------> key
     965           84 :         //
     966           84 :         //
     967           84 :         // If one key (X) has a lot of page versions:
     968           84 :         //
     969           84 :         // LSN
     970           84 :         //  ^
     971           84 :         //  |                                 (X)
     972           84 :         //  | +-----------+            +--+--+--+--+
     973           84 :         //  | |           |            |  |  |  |  |
     974           84 :         //  | +-----------+            |  |  +--+  |
     975           84 :         //  | |           |            |  |  |  |  |
     976           84 :         //  | +-----------+     ==>    |  |  |  |  |
     977           84 :         //  | |           |            |  |  +--+  |
     978           84 :         //  | +-----------+            |  |  |  |  |
     979           84 :         //  | |           |            |  |  |  |  |
     980           84 :         //  | +-----------+            +--+--+--+--+
     981           84 :         //  |
     982           84 :         //  +--------------> key
     983           84 :         // TODO: this actually divides the layers into fixed-size chunks, not
     984           84 :         // based on the partitioning.
     985           84 :         //
     986           84 :         // TODO: we should also opportunistically materialize and
     987           84 :         // garbage collect what we can.
     988           84 :         let mut new_layers = Vec::new();
     989           84 :         let mut prev_key: Option<Key> = None;
     990           84 :         let mut writer: Option<DeltaLayerWriter> = None;
     991           84 :         let mut key_values_total_size = 0u64;
     992           84 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     993           84 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     994           84 :         let mut next_hole = 0; // index of next hole in holes vector
     995           84 : 
     996           84 :         let mut keys = 0;
     997              : 
     998      6192198 :         while let Some((key, lsn, value)) = all_values_iter
     999      6192198 :             .next()
    1000        10248 :             .await
    1001      6192198 :             .map_err(CompactionError::Other)?
    1002              :         {
    1003      6192114 :             keys += 1;
    1004      6192114 : 
    1005      6192114 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1006              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1007              :                 // shuffling an order of million keys per layer, this means we'll check it
    1008              :                 // around tens of times per layer.
    1009            0 :                 return Err(CompactionError::ShuttingDown);
    1010      6192114 :             }
    1011      6192114 : 
    1012      6192114 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    1013      6192114 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1014      6192114 :             if !same_key || lsn == dup_end_lsn {
    1015      6072000 :                 let mut next_key_size = 0u64;
    1016      6072000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1017      6072000 :                 dup_start_lsn = Lsn::INVALID;
    1018      6072000 :                 if !same_key {
    1019      6072000 :                     dup_end_lsn = Lsn::INVALID;
    1020      6072000 :                 }
    1021              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1022      6072000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1023      6072000 :                     next_key_size = next_size;
    1024      6072000 :                     if key != next_key {
    1025      6071916 :                         if dup_end_lsn.is_valid() {
    1026            0 :                             // We are writting segment with duplicates:
    1027            0 :                             // place all remaining values of this key in separate segment
    1028            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1029            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1030      6071916 :                         }
    1031      6071916 :                         break;
    1032           84 :                     }
    1033           84 :                     key_values_total_size += next_size;
    1034           84 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1035           84 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1036           84 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1037              :                         // Split key between multiple layers: such layer can contain only single key
    1038            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1039            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1040              :                         } else {
    1041            0 :                             lsn // start with the first LSN for this key
    1042              :                         };
    1043            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1044            0 :                         break;
    1045           84 :                     }
    1046              :                 }
    1047              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1048      6072000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1049            0 :                     dup_start_lsn = dup_end_lsn;
    1050            0 :                     dup_end_lsn = lsn_range.end;
    1051      6072000 :                 }
    1052      6072000 :                 if writer.is_some() {
    1053      6071916 :                     let written_size = writer.as_mut().unwrap().size();
    1054      6071916 :                     let contains_hole =
    1055      6071916 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1056              :                     // check if key cause layer overflow or contains hole...
    1057      6071916 :                     if is_dup_layer
    1058      6071916 :                         || dup_end_lsn.is_valid()
    1059      6071916 :                         || written_size + key_values_total_size > target_file_size
    1060      6071076 :                         || contains_hole
    1061              :                     {
    1062              :                         // ... if so, flush previous layer and prepare to write new one
    1063          840 :                         let (desc, path) = writer
    1064          840 :                             .take()
    1065          840 :                             .unwrap()
    1066          840 :                             .finish(prev_key.unwrap().next(), ctx)
    1067         2130 :                             .await
    1068          840 :                             .map_err(CompactionError::Other)?;
    1069          840 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1070          840 :                             .map_err(CompactionError::Other)?;
    1071              : 
    1072          840 :                         new_layers.push(new_delta);
    1073          840 :                         writer = None;
    1074          840 : 
    1075          840 :                         if contains_hole {
    1076            0 :                             // skip hole
    1077            0 :                             next_hole += 1;
    1078          840 :                         }
    1079      6071076 :                     }
    1080           84 :                 }
    1081              :                 // Remember size of key value because at next iteration we will access next item
    1082      6072000 :                 key_values_total_size = next_key_size;
    1083       120114 :             }
    1084      6192114 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1085            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1086            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1087            0 :                 )))
    1088      6192114 :             });
    1089              : 
    1090      6192114 :             if !self.shard_identity.is_key_disposable(&key) {
    1091      6192114 :                 if writer.is_none() {
    1092          924 :                     if self.cancel.is_cancelled() {
    1093              :                         // to be somewhat responsive to cancellation, check for each new layer
    1094            0 :                         return Err(CompactionError::ShuttingDown);
    1095          924 :                     }
    1096              :                     // Create writer if not initiaized yet
    1097          924 :                     writer = Some(
    1098              :                         DeltaLayerWriter::new(
    1099          924 :                             self.conf,
    1100          924 :                             self.timeline_id,
    1101          924 :                             self.tenant_shard_id,
    1102          924 :                             key,
    1103          924 :                             if dup_end_lsn.is_valid() {
    1104              :                                 // this is a layer containing slice of values of the same key
    1105            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1106            0 :                                 dup_start_lsn..dup_end_lsn
    1107              :                             } else {
    1108          924 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1109          924 :                                 lsn_range.clone()
    1110              :                             },
    1111          924 :                             ctx,
    1112              :                         )
    1113          462 :                         .await
    1114          924 :                         .map_err(CompactionError::Other)?,
    1115              :                     );
    1116              : 
    1117          924 :                     keys = 0;
    1118      6191190 :                 }
    1119              : 
    1120      6192114 :                 writer
    1121      6192114 :                     .as_mut()
    1122      6192114 :                     .unwrap()
    1123      6192114 :                     .put_value(key, lsn, value, ctx)
    1124         3675 :                     .await
    1125      6192114 :                     .map_err(CompactionError::Other)?;
    1126              :             } else {
    1127            0 :                 debug!(
    1128            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    1129            0 :                     key,
    1130            0 :                     self.shard_identity.get_shard_number(&key)
    1131              :                 );
    1132              :             }
    1133              : 
    1134      6192114 :             if !new_layers.is_empty() {
    1135        59358 :                 fail_point!("after-timeline-compacted-first-L1");
    1136      6132756 :             }
    1137              : 
    1138      6192114 :             prev_key = Some(key);
    1139              :         }
    1140           84 :         if let Some(writer) = writer {
    1141           84 :             let (desc, path) = writer
    1142           84 :                 .finish(prev_key.unwrap().next(), ctx)
    1143         5969 :                 .await
    1144           84 :                 .map_err(CompactionError::Other)?;
    1145           84 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1146           84 :                 .map_err(CompactionError::Other)?;
    1147           84 :             new_layers.push(new_delta);
    1148            0 :         }
    1149              : 
    1150              :         // Sync layers
    1151           84 :         if !new_layers.is_empty() {
    1152              :             // Print a warning if the created layer is larger than double the target size
    1153              :             // Add two pages for potential overhead. This should in theory be already
    1154              :             // accounted for in the target calculation, but for very small targets,
    1155              :             // we still might easily hit the limit otherwise.
    1156           84 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1157          924 :             for layer in new_layers.iter() {
    1158          924 :                 if layer.layer_desc().file_size > warn_limit {
    1159            0 :                     warn!(
    1160              :                         %layer,
    1161            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1162              :                     );
    1163          924 :                 }
    1164              :             }
    1165              : 
    1166              :             // The writer.finish() above already did the fsync of the inodes.
    1167              :             // We just need to fsync the directory in which these inodes are linked,
    1168              :             // which we know to be the timeline directory.
    1169              :             //
    1170              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1171              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1172              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1173           84 :             let timeline_dir = VirtualFile::open(
    1174           84 :                 &self
    1175           84 :                     .conf
    1176           84 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1177           84 :                 ctx,
    1178           84 :             )
    1179           42 :             .await
    1180           84 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1181           84 :             timeline_dir
    1182           84 :                 .sync_all()
    1183           42 :                 .await
    1184           84 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1185            0 :         }
    1186              : 
    1187           84 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1188           84 :         stats.new_deltas_count = Some(new_layers.len());
    1189          924 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1190           84 : 
    1191           84 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1192           84 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1193              :         {
    1194           84 :             Ok(stats_json) => {
    1195           84 :                 info!(
    1196            0 :                     stats_json = stats_json.as_str(),
    1197            0 :                     "compact_level0_phase1 stats available"
    1198              :                 )
    1199              :             }
    1200            0 :             Err(e) => {
    1201            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1202              :             }
    1203              :         }
    1204              : 
    1205              :         // Without this, rustc complains about deltas_to_compact still
    1206              :         // being borrowed when we `.into_iter()` below.
    1207           84 :         drop(all_values_iter);
    1208           84 : 
    1209           84 :         Ok(CompactLevel0Phase1Result {
    1210           84 :             new_layers,
    1211           84 :             deltas_to_compact: deltas_to_compact
    1212           84 :                 .into_iter()
    1213         1206 :                 .map(|x| x.drop_eviction_guard())
    1214           84 :                 .collect::<Vec<_>>(),
    1215           84 :             fully_compacted,
    1216           84 :         })
    1217         1092 :     }
    1218              : }
    1219              : 
    1220              : #[derive(Default)]
    1221              : struct CompactLevel0Phase1Result {
    1222              :     new_layers: Vec<ResidentLayer>,
    1223              :     deltas_to_compact: Vec<Layer>,
    1224              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1225              :     // L0 compaction size limit.
    1226              :     fully_compacted: bool,
    1227              : }
    1228              : 
    1229              : #[derive(Default)]
    1230              : struct CompactLevel0Phase1StatsBuilder {
    1231              :     version: Option<u64>,
    1232              :     tenant_id: Option<TenantShardId>,
    1233              :     timeline_id: Option<TimelineId>,
    1234              :     read_lock_acquisition_micros: DurationRecorder,
    1235              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1236              :     read_lock_held_key_sort_micros: DurationRecorder,
    1237              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1238              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1239              :     read_lock_drop_micros: DurationRecorder,
    1240              :     write_layer_files_micros: DurationRecorder,
    1241              :     level0_deltas_count: Option<usize>,
    1242              :     new_deltas_count: Option<usize>,
    1243              :     new_deltas_size: Option<u64>,
    1244              : }
    1245              : 
    1246              : #[derive(serde::Serialize)]
    1247              : struct CompactLevel0Phase1Stats {
    1248              :     version: u64,
    1249              :     tenant_id: TenantShardId,
    1250              :     timeline_id: TimelineId,
    1251              :     read_lock_acquisition_micros: RecordedDuration,
    1252              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1253              :     read_lock_held_key_sort_micros: RecordedDuration,
    1254              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1255              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1256              :     read_lock_drop_micros: RecordedDuration,
    1257              :     write_layer_files_micros: RecordedDuration,
    1258              :     level0_deltas_count: usize,
    1259              :     new_deltas_count: usize,
    1260              :     new_deltas_size: u64,
    1261              : }
    1262              : 
    1263              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1264              :     type Error = anyhow::Error;
    1265              : 
    1266           84 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1267           84 :         Ok(Self {
    1268           84 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1269           84 :             tenant_id: value
    1270           84 :                 .tenant_id
    1271           84 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1272           84 :             timeline_id: value
    1273           84 :                 .timeline_id
    1274           84 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1275           84 :             read_lock_acquisition_micros: value
    1276           84 :                 .read_lock_acquisition_micros
    1277           84 :                 .into_recorded()
    1278           84 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1279           84 :             read_lock_held_spawn_blocking_startup_micros: value
    1280           84 :                 .read_lock_held_spawn_blocking_startup_micros
    1281           84 :                 .into_recorded()
    1282           84 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1283           84 :             read_lock_held_key_sort_micros: value
    1284           84 :                 .read_lock_held_key_sort_micros
    1285           84 :                 .into_recorded()
    1286           84 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1287           84 :             read_lock_held_prerequisites_micros: value
    1288           84 :                 .read_lock_held_prerequisites_micros
    1289           84 :                 .into_recorded()
    1290           84 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1291           84 :             read_lock_held_compute_holes_micros: value
    1292           84 :                 .read_lock_held_compute_holes_micros
    1293           84 :                 .into_recorded()
    1294           84 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1295           84 :             read_lock_drop_micros: value
    1296           84 :                 .read_lock_drop_micros
    1297           84 :                 .into_recorded()
    1298           84 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1299           84 :             write_layer_files_micros: value
    1300           84 :                 .write_layer_files_micros
    1301           84 :                 .into_recorded()
    1302           84 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1303           84 :             level0_deltas_count: value
    1304           84 :                 .level0_deltas_count
    1305           84 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1306           84 :             new_deltas_count: value
    1307           84 :                 .new_deltas_count
    1308           84 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1309           84 :             new_deltas_size: value
    1310           84 :                 .new_deltas_size
    1311           84 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1312              :         })
    1313           84 :     }
    1314              : }
    1315              : 
    1316              : impl Timeline {
    1317              :     /// Entry point for new tiered compaction algorithm.
    1318              :     ///
    1319              :     /// All the real work is in the implementation in the pageserver_compaction
    1320              :     /// crate. The code here would apply to any algorithm implemented by the
    1321              :     /// same interface, but tiered is the only one at the moment.
    1322              :     ///
    1323              :     /// TODO: cancellation
    1324            0 :     pub(crate) async fn compact_tiered(
    1325            0 :         self: &Arc<Self>,
    1326            0 :         _cancel: &CancellationToken,
    1327            0 :         ctx: &RequestContext,
    1328            0 :     ) -> Result<(), CompactionError> {
    1329            0 :         let fanout = self.get_compaction_threshold() as u64;
    1330            0 :         let target_file_size = self.get_checkpoint_distance();
    1331              : 
    1332              :         // Find the top of the historical layers
    1333            0 :         let end_lsn = {
    1334            0 :             let guard = self.layers.read().await;
    1335            0 :             let layers = guard.layer_map()?;
    1336              : 
    1337            0 :             let l0_deltas = layers.level0_deltas();
    1338            0 : 
    1339            0 :             // As an optimization, if we find that there are too few L0 layers,
    1340            0 :             // bail out early. We know that the compaction algorithm would do
    1341            0 :             // nothing in that case.
    1342            0 :             if l0_deltas.len() < fanout as usize {
    1343              :                 // doesn't need compacting
    1344            0 :                 return Ok(());
    1345            0 :             }
    1346            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1347            0 :         };
    1348            0 : 
    1349            0 :         // Is the timeline being deleted?
    1350            0 :         if self.is_stopping() {
    1351            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1352            0 :             return Err(CompactionError::ShuttingDown);
    1353            0 :         }
    1354              : 
    1355            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1356              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1357            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1358            0 : 
    1359            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1360            0 :             &mut adaptor,
    1361            0 :             end_lsn,
    1362            0 :             target_file_size,
    1363            0 :             fanout,
    1364            0 :             ctx,
    1365            0 :         )
    1366            0 :         .await
    1367              :         // TODO: compact_tiered needs to return CompactionError
    1368            0 :         .map_err(CompactionError::Other)?;
    1369              : 
    1370            0 :         adaptor.flush_updates().await?;
    1371            0 :         Ok(())
    1372            0 :     }
    1373              : 
    1374              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1375              :     ///
    1376              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1377              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1378              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1379              :     ///
    1380              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1381              :     ///
    1382              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1383              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1384              :     ///
    1385              :     /// The function will produce:
    1386              :     ///
    1387              :     /// ```plain
    1388              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1389              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1390              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1391              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1392              :     /// ```
    1393              :     ///
    1394              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1395         1290 :     pub(crate) async fn generate_key_retention(
    1396         1290 :         self: &Arc<Timeline>,
    1397         1290 :         key: Key,
    1398         1290 :         full_history: &[(Key, Lsn, Value)],
    1399         1290 :         horizon: Lsn,
    1400         1290 :         retain_lsn_below_horizon: &[Lsn],
    1401         1290 :         delta_threshold_cnt: usize,
    1402         1290 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1403         1290 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1404         1290 :         // Pre-checks for the invariants
    1405         1290 :         if cfg!(debug_assertions) {
    1406         3120 :             for (log_key, _, _) in full_history {
    1407         1830 :                 assert_eq!(log_key, &key, "mismatched key");
    1408              :             }
    1409         1290 :             for i in 1..full_history.len() {
    1410          540 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1411          540 :                 if full_history[i - 1].1 == full_history[i].1 {
    1412            0 :                     assert!(
    1413            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1414            0 :                         "unordered delta/image, or duplicated delta"
    1415              :                     );
    1416          540 :                 }
    1417              :             }
    1418              :             // There was an assertion for no base image that checks if the first
    1419              :             // record in the history is `will_init` before, but it was removed.
    1420              :             // This is explained in the test cases for generate_key_retention.
    1421              :             // Search "incomplete history" for more information.
    1422         3000 :             for lsn in retain_lsn_below_horizon {
    1423         1710 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1424              :             }
    1425         1290 :             for i in 1..retain_lsn_below_horizon.len() {
    1426          834 :                 assert!(
    1427          834 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1428            0 :                     "unordered LSN"
    1429              :                 );
    1430              :             }
    1431            0 :         }
    1432         1290 :         let has_ancestor = base_img_from_ancestor.is_some();
    1433              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1434              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1435         1290 :         let (mut split_history, lsn_split_points) = {
    1436         1290 :             let mut split_history = Vec::new();
    1437         1290 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1438         1290 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1439         3000 :             for lsn in retain_lsn_below_horizon {
    1440         1710 :                 lsn_split_points.push(*lsn);
    1441         1710 :             }
    1442         1290 :             lsn_split_points.push(horizon);
    1443         1290 :             let mut current_idx = 0;
    1444         3120 :             for item @ (_, lsn, _) in full_history {
    1445         2316 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1446          486 :                     current_idx += 1;
    1447          486 :                 }
    1448         1830 :                 split_history[current_idx].push(item);
    1449              :             }
    1450         1290 :             (split_history, lsn_split_points)
    1451              :         };
    1452              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1453         5580 :         for split_for_lsn in &mut split_history {
    1454         4290 :             let mut prev_lsn = None;
    1455         4290 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1456         4290 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1457         1830 :                 if let Some(prev_lsn) = &prev_lsn {
    1458          198 :                     if *prev_lsn == lsn {
    1459              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1460              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1461              :                         // drop this delta and keep the image.
    1462              :                         //
    1463              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1464              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1465              :                         // dropped.
    1466              :                         //
    1467              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1468              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1469            0 :                         continue;
    1470          198 :                     }
    1471         1632 :                 }
    1472         1830 :                 prev_lsn = Some(lsn);
    1473         1830 :                 new_split_for_lsn.push(record);
    1474              :             }
    1475         4290 :             *split_for_lsn = new_split_for_lsn;
    1476              :         }
    1477              :         // Step 3: generate images when necessary
    1478         1290 :         let mut retention = Vec::with_capacity(split_history.len());
    1479         1290 :         let mut records_since_last_image = 0;
    1480         1290 :         let batch_cnt = split_history.len();
    1481         1290 :         assert!(
    1482         1290 :             batch_cnt >= 2,
    1483            0 :             "should have at least below + above horizon batches"
    1484              :         );
    1485         1290 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1486         1290 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1487           54 :             replay_history.push((key, lsn, Value::Image(img)));
    1488         1236 :         }
    1489              : 
    1490              :         /// Generate debug information for the replay history
    1491            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1492              :             use std::fmt::Write;
    1493            0 :             let mut output = String::new();
    1494            0 :             if let Some((key, _, _)) = replay_history.first() {
    1495            0 :                 write!(output, "key={} ", key).unwrap();
    1496            0 :                 let mut cnt = 0;
    1497            0 :                 for (_, lsn, val) in replay_history {
    1498            0 :                     if val.is_image() {
    1499            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1500            0 :                     } else if val.will_init() {
    1501            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1502            0 :                     } else {
    1503            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1504            0 :                     }
    1505            0 :                     cnt += 1;
    1506            0 :                     if cnt >= 128 {
    1507            0 :                         write!(output, "... and more").unwrap();
    1508            0 :                         break;
    1509            0 :                     }
    1510              :                 }
    1511            0 :             } else {
    1512            0 :                 write!(output, "<no history>").unwrap();
    1513            0 :             }
    1514            0 :             output
    1515            0 :         }
    1516              : 
    1517            0 :         fn generate_debug_trace(
    1518            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1519            0 :             full_history: &[(Key, Lsn, Value)],
    1520            0 :             lsns: &[Lsn],
    1521            0 :             horizon: Lsn,
    1522            0 :         ) -> String {
    1523              :             use std::fmt::Write;
    1524            0 :             let mut output = String::new();
    1525            0 :             if let Some(replay_history) = replay_history {
    1526            0 :                 writeln!(
    1527            0 :                     output,
    1528            0 :                     "replay_history: {}",
    1529            0 :                     generate_history_trace(replay_history)
    1530            0 :                 )
    1531            0 :                 .unwrap();
    1532            0 :             } else {
    1533            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1534            0 :             }
    1535            0 :             writeln!(
    1536            0 :                 output,
    1537            0 :                 "full_history: {}",
    1538            0 :                 generate_history_trace(full_history)
    1539            0 :             )
    1540            0 :             .unwrap();
    1541            0 :             writeln!(
    1542            0 :                 output,
    1543            0 :                 "when processing: [{}] horizon={}",
    1544            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1545            0 :                 horizon
    1546            0 :             )
    1547            0 :             .unwrap();
    1548            0 :             output
    1549            0 :         }
    1550              : 
    1551         4290 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1552              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1553         4290 :             records_since_last_image += split_for_lsn.len();
    1554         4290 :             let generate_image = if i == 0 && !has_ancestor {
    1555              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1556         1236 :                 true
    1557         3054 :             } else if i == batch_cnt - 1 {
    1558              :                 // Do not generate images for the last batch (above horizon)
    1559         1290 :                 false
    1560         1764 :             } else if records_since_last_image >= delta_threshold_cnt {
    1561              :                 // Generate images when there are too many records
    1562           18 :                 true
    1563              :             } else {
    1564         1746 :                 false
    1565              :             };
    1566         4290 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1567              :             // Only retain the items after the last image record
    1568         5274 :             for idx in (0..replay_history.len()).rev() {
    1569         5274 :                 if replay_history[idx].2.will_init() {
    1570         4290 :                     replay_history = replay_history[idx..].to_vec();
    1571         4290 :                     break;
    1572          984 :                 }
    1573              :             }
    1574         4290 :             if let Some((_, _, val)) = replay_history.first() {
    1575         4290 :                 if !val.will_init() {
    1576            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1577            0 :                         || {
    1578            0 :                             generate_debug_trace(
    1579            0 :                                 Some(&replay_history),
    1580            0 :                                 full_history,
    1581            0 :                                 retain_lsn_below_horizon,
    1582            0 :                                 horizon,
    1583            0 :                             )
    1584            0 :                         },
    1585            0 :                     );
    1586         4290 :                 }
    1587            0 :             }
    1588         4290 :             if generate_image && records_since_last_image > 0 {
    1589         1254 :                 records_since_last_image = 0;
    1590         1254 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1591         1254 :                     Some(replay_history.clone())
    1592              :                 } else {
    1593            0 :                     None
    1594              :                 };
    1595         1254 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1596         1254 :                 let history = std::mem::take(&mut replay_history);
    1597         1254 :                 let mut img = None;
    1598         1254 :                 let mut records = Vec::with_capacity(history.len());
    1599         1254 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1600         1254 :                     img = Some((*lsn, val.clone()));
    1601         1254 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1602          102 :                         let Value::WalRecord(rec) = val else {
    1603            0 :                             return Err(anyhow::anyhow!(
    1604            0 :                                 "invalid record, first record is image, expect walrecords"
    1605            0 :                             ))
    1606            0 :                             .with_context(|| {
    1607            0 :                                 generate_debug_trace(
    1608            0 :                                     replay_history_for_debug_ref,
    1609            0 :                                     full_history,
    1610            0 :                                     retain_lsn_below_horizon,
    1611            0 :                                     horizon,
    1612            0 :                                 )
    1613            0 :                             });
    1614              :                         };
    1615          102 :                         records.push((lsn, rec));
    1616              :                     }
    1617              :                 } else {
    1618            0 :                     for (_, lsn, val) in history.into_iter() {
    1619            0 :                         let Value::WalRecord(rec) = val else {
    1620            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1621            0 :                                 .with_context(|| generate_debug_trace(
    1622            0 :                                     replay_history_for_debug_ref,
    1623            0 :                                     full_history,
    1624            0 :                                     retain_lsn_below_horizon,
    1625            0 :                                     horizon,
    1626            0 :                                 ));
    1627              :                         };
    1628            0 :                         records.push((lsn, rec));
    1629              :                     }
    1630              :                 }
    1631         1254 :                 records.reverse();
    1632         1254 :                 let state = ValueReconstructState { img, records };
    1633         1254 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1634         1254 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1635         1254 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1636         1254 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1637         3036 :             } else {
    1638         3036 :                 let deltas = split_for_lsn
    1639         3036 :                     .iter()
    1640         3036 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1641         3036 :                     .collect_vec();
    1642         3036 :                 retention.push(deltas);
    1643         3036 :             }
    1644              :         }
    1645         1290 :         let mut result = Vec::with_capacity(retention.len());
    1646         1290 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1647         4290 :         for (idx, logs) in retention.into_iter().enumerate() {
    1648         4290 :             if idx == lsn_split_points.len() {
    1649         1290 :                 return Ok(KeyHistoryRetention {
    1650         1290 :                     below_horizon: result,
    1651         1290 :                     above_horizon: KeyLogAtLsn(logs),
    1652         1290 :                 });
    1653         3000 :             } else {
    1654         3000 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1655         3000 :             }
    1656              :         }
    1657            0 :         unreachable!("key retention is empty")
    1658         1290 :     }
    1659              : 
    1660              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1661              :     ///
    1662              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1663              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1664              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1665              :     /// and create delta layers with all deltas >= gc horizon.
    1666           78 :     pub(crate) async fn compact_with_gc(
    1667           78 :         self: &Arc<Self>,
    1668           78 :         cancel: &CancellationToken,
    1669           78 :         flags: EnumSet<CompactFlags>,
    1670           78 :         ctx: &RequestContext,
    1671           78 :     ) -> anyhow::Result<()> {
    1672              :         use std::collections::BTreeSet;
    1673              : 
    1674              :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1675              :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1676              :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1677              : 
    1678           78 :         let gc_lock = async {
    1679           78 :             tokio::select! {
    1680           78 :                 guard = self.gc_lock.lock() => Ok(guard),
    1681              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1682           78 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1683              :             }
    1684           78 :         };
    1685              : 
    1686           78 :         let gc_lock = crate::timed(
    1687           78 :             gc_lock,
    1688           78 :             "acquires gc lock",
    1689           78 :             std::time::Duration::from_secs(5),
    1690           78 :         )
    1691            3 :         .await?;
    1692              : 
    1693           78 :         let dry_run = flags.contains(CompactFlags::DryRun);
    1694           78 : 
    1695           78 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
    1696              : 
    1697           78 :         scopeguard::defer! {
    1698           78 :             info!("done enhanced gc bottom-most compaction");
    1699           78 :         };
    1700           78 : 
    1701           78 :         let mut stat = CompactionStatistics::default();
    1702              : 
    1703              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1704              :         // The layer selection has the following properties:
    1705              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1706              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1707           78 :         let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
    1708           78 :             let guard = self.layers.read().await;
    1709           78 :             let layers = guard.layer_map()?;
    1710           78 :             let gc_info = self.gc_info.read().unwrap();
    1711           78 :             let mut retain_lsns_below_horizon = Vec::new();
    1712           78 :             let gc_cutoff = gc_info.cutoffs.select_min();
    1713          102 :             for (lsn, _timeline_id) in &gc_info.retain_lsns {
    1714          102 :                 if lsn < &gc_cutoff {
    1715          102 :                     retain_lsns_below_horizon.push(*lsn);
    1716          102 :                 }
    1717              :             }
    1718           78 :             for lsn in gc_info.leases.keys() {
    1719            0 :                 if lsn < &gc_cutoff {
    1720            0 :                     retain_lsns_below_horizon.push(*lsn);
    1721            0 :                 }
    1722              :             }
    1723           78 :             let mut selected_layers = Vec::new();
    1724           78 :             drop(gc_info);
    1725              :             // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    1726           78 :             let Some(max_layer_lsn) = layers
    1727           78 :                 .iter_historic_layers()
    1728          300 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    1729          246 :                 .map(|desc| desc.get_lsn_range().end)
    1730           78 :                 .max()
    1731              :             else {
    1732            0 :                 info!("no layers to compact with gc");
    1733            0 :                 return Ok(());
    1734              :             };
    1735              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    1736              :             // layers to compact.
    1737          300 :             for desc in layers.iter_historic_layers() {
    1738          300 :                 if desc.get_lsn_range().end <= max_layer_lsn {
    1739          246 :                     selected_layers.push(guard.get_from_desc(&desc));
    1740          246 :                 }
    1741              :             }
    1742           78 :             if selected_layers.is_empty() {
    1743            0 :                 info!("no layers to compact with gc");
    1744            0 :                 return Ok(());
    1745           78 :             }
    1746           78 :             retain_lsns_below_horizon.sort();
    1747           78 :             (selected_layers, gc_cutoff, retain_lsns_below_horizon)
    1748              :         };
    1749           78 :         let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
    1750            6 :             Lsn(self.ancestor_lsn.0 + 1)
    1751              :         } else {
    1752           72 :             let res = retain_lsns_below_horizon
    1753           72 :                 .first()
    1754           72 :                 .copied()
    1755           72 :                 .unwrap_or(gc_cutoff);
    1756           72 :             if cfg!(debug_assertions) {
    1757           72 :                 assert_eq!(
    1758           72 :                     res,
    1759           72 :                     retain_lsns_below_horizon
    1760           72 :                         .iter()
    1761           72 :                         .min()
    1762           72 :                         .copied()
    1763           72 :                         .unwrap_or(gc_cutoff)
    1764           72 :                 );
    1765            0 :             }
    1766           72 :             res
    1767              :         };
    1768           78 :         info!(
    1769            0 :             "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
    1770            0 :             layer_selection.len(),
    1771              :             gc_cutoff,
    1772              :             lowest_retain_lsn
    1773              :         );
    1774              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1775              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    1776           78 :         let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
    1777          324 :         for layer in &layer_selection {
    1778          246 :             let desc = layer.layer_desc();
    1779          246 :             if desc.is_delta() {
    1780              :                 // ignore single-key layer files
    1781          138 :                 if desc.key_range.start.next() != desc.key_range.end {
    1782          102 :                     let lsn_range = &desc.lsn_range;
    1783          102 :                     lsn_split_point.insert(lsn_range.start);
    1784          102 :                     lsn_split_point.insert(lsn_range.end);
    1785          102 :                 }
    1786          138 :                 stat.visit_delta_layer(desc.file_size());
    1787          108 :             } else {
    1788          108 :                 stat.visit_image_layer(desc.file_size());
    1789          108 :             }
    1790              :         }
    1791          324 :         for layer in &layer_selection {
    1792          246 :             let desc = layer.layer_desc();
    1793          246 :             let key_range = &desc.key_range;
    1794          246 :             if desc.is_delta() && key_range.start.next() != key_range.end {
    1795          102 :                 let lsn_range = desc.lsn_range.clone();
    1796          102 :                 let intersects = lsn_split_point.range(lsn_range).collect_vec();
    1797          102 :                 if intersects.len() > 1 {
    1798            0 :                     bail!(
    1799            0 :                         "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
    1800            0 :                         desc.key(),
    1801            0 :                         intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
    1802            0 :                     );
    1803          102 :                 }
    1804          144 :             }
    1805              :         }
    1806              :         // The maximum LSN we are processing in this compaction loop
    1807           78 :         let end_lsn = layer_selection
    1808           78 :             .iter()
    1809          246 :             .map(|l| l.layer_desc().lsn_range.end)
    1810           78 :             .max()
    1811           78 :             .unwrap();
    1812           78 :         // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
    1813           78 :         // as an L0 layer.
    1814           78 :         let mut delta_layers = Vec::new();
    1815           78 :         let mut image_layers = Vec::new();
    1816           78 :         let mut downloaded_layers = Vec::new();
    1817          324 :         for layer in &layer_selection {
    1818          246 :             let resident_layer = layer.download_and_keep_resident().await?;
    1819          246 :             downloaded_layers.push(resident_layer);
    1820              :         }
    1821          324 :         for resident_layer in &downloaded_layers {
    1822          246 :             if resident_layer.layer_desc().is_delta() {
    1823          138 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    1824          138 :                 delta_layers.push(layer);
    1825              :             } else {
    1826          108 :                 let layer = resident_layer.get_as_image(ctx).await?;
    1827          108 :                 image_layers.push(layer);
    1828              :             }
    1829              :         }
    1830           78 :         let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
    1831           78 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1832           78 :         // Data of the same key.
    1833           78 :         let mut accumulated_values = Vec::new();
    1834           78 :         let mut last_key: Option<Key> = None;
    1835              : 
    1836              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    1837              :         // when some condition meet.
    1838           78 :         let mut image_layer_writer = if self.ancestor_timeline.is_none() {
    1839              :             Some(
    1840           72 :                 SplitImageLayerWriter::new(
    1841           72 :                     self.conf,
    1842           72 :                     self.timeline_id,
    1843           72 :                     self.tenant_shard_id,
    1844           72 :                     Key::MIN,
    1845           72 :                     lowest_retain_lsn,
    1846           72 :                     self.get_compaction_target_size(),
    1847           72 :                     ctx,
    1848           72 :                 )
    1849           36 :                 .await?,
    1850              :             )
    1851              :         } else {
    1852            6 :             None
    1853              :         };
    1854              : 
    1855           78 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    1856           78 :             self.conf,
    1857           78 :             self.timeline_id,
    1858           78 :             self.tenant_shard_id,
    1859           78 :             lowest_retain_lsn..end_lsn,
    1860           78 :             self.get_compaction_target_size(),
    1861           78 :         )
    1862            0 :         .await?;
    1863              : 
    1864              :         /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
    1865              :         ///
    1866              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    1867              :         /// is needed for reconstruction. This should be fixed in the future.
    1868              :         ///
    1869              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    1870              :         /// images.
    1871         1266 :         async fn get_ancestor_image(
    1872         1266 :             tline: &Arc<Timeline>,
    1873         1266 :             key: Key,
    1874         1266 :             ctx: &RequestContext,
    1875         1266 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    1876         1266 :             if tline.ancestor_timeline.is_none() {
    1877         1224 :                 return Ok(None);
    1878           42 :             };
    1879              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    1880              :             // as much existing code as possible.
    1881           42 :             let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
    1882           42 :             Ok(Some((key, tline.ancestor_lsn, img)))
    1883         1266 :         }
    1884              : 
    1885              :         // Actually, we can decide not to write to the image layer at all at this point because
    1886              :         // the key and LSN range are determined. However, to keep things simple here, we still
    1887              :         // create this writer, and discard the writer in the end.
    1888              : 
    1889         1758 :         while let Some((key, lsn, val)) = merge_iter.next().await? {
    1890         1680 :             if cancel.is_cancelled() {
    1891            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    1892         1680 :             }
    1893         1680 :             match val {
    1894         1260 :                 Value::Image(_) => stat.visit_image_key(&val),
    1895          420 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    1896              :             }
    1897         1680 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    1898          492 :                 if last_key.is_none() {
    1899           78 :                     last_key = Some(key);
    1900          414 :                 }
    1901          492 :                 accumulated_values.push((key, lsn, val));
    1902              :             } else {
    1903         1188 :                 let last_key = last_key.as_mut().unwrap();
    1904         1188 :                 stat.on_unique_key_visited();
    1905         1188 :                 let retention = self
    1906         1188 :                     .generate_key_retention(
    1907         1188 :                         *last_key,
    1908         1188 :                         &accumulated_values,
    1909         1188 :                         gc_cutoff,
    1910         1188 :                         &retain_lsns_below_horizon,
    1911         1188 :                         COMPACTION_DELTA_THRESHOLD,
    1912         1188 :                         get_ancestor_image(self, *last_key, ctx).await?,
    1913              :                     )
    1914            0 :                     .await?;
    1915              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1916         1188 :                 retention
    1917         1188 :                     .pipe_to(
    1918         1188 :                         *last_key,
    1919         1188 :                         self,
    1920         1188 :                         &mut delta_layer_writer,
    1921         1188 :                         image_layer_writer.as_mut(),
    1922         1188 :                         &mut stat,
    1923         1188 :                         dry_run,
    1924         1188 :                         ctx,
    1925         1188 :                     )
    1926         1203 :                     .await?;
    1927         1188 :                 accumulated_values.clear();
    1928         1188 :                 *last_key = key;
    1929         1188 :                 accumulated_values.push((key, lsn, val));
    1930              :             }
    1931              :         }
    1932              : 
    1933           78 :         let last_key = last_key.expect("no keys produced during compaction");
    1934           78 :         // TODO: move this part to the loop body
    1935           78 :         stat.on_unique_key_visited();
    1936           78 :         let retention = self
    1937           78 :             .generate_key_retention(
    1938           78 :                 last_key,
    1939           78 :                 &accumulated_values,
    1940           78 :                 gc_cutoff,
    1941           78 :                 &retain_lsns_below_horizon,
    1942           78 :                 COMPACTION_DELTA_THRESHOLD,
    1943           78 :                 get_ancestor_image(self, last_key, ctx).await?,
    1944              :             )
    1945            0 :             .await?;
    1946              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1947           78 :         retention
    1948           78 :             .pipe_to(
    1949           78 :                 last_key,
    1950           78 :                 self,
    1951           78 :                 &mut delta_layer_writer,
    1952           78 :                 image_layer_writer.as_mut(),
    1953           78 :                 &mut stat,
    1954           78 :                 dry_run,
    1955           78 :                 ctx,
    1956           78 :             )
    1957           72 :             .await?;
    1958              : 
    1959          114 :         let discard = |key: &PersistentLayerKey| {
    1960          114 :             let key = key.clone();
    1961          114 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    1962          114 :         };
    1963              : 
    1964           78 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    1965           72 :             if !dry_run {
    1966           60 :                 writer
    1967           60 :                     .finish_with_discard_fn(self, ctx, Key::MAX, discard)
    1968           72 :                     .await?
    1969              :             } else {
    1970           12 :                 let (layers, _) = writer.take()?;
    1971           12 :                 assert!(layers.is_empty(), "image layers produced in dry run mode?");
    1972           12 :                 Vec::new()
    1973              :             }
    1974              :         } else {
    1975            6 :             Vec::new()
    1976              :         };
    1977              : 
    1978           78 :         let produced_delta_layers = if !dry_run {
    1979           66 :             delta_layer_writer
    1980           66 :                 .finish_with_discard_fn(self, ctx, discard)
    1981           78 :                 .await?
    1982              :         } else {
    1983           12 :             let (layers, _) = delta_layer_writer.take()?;
    1984           12 :             assert!(layers.is_empty(), "delta layers produced in dry run mode?");
    1985           12 :             Vec::new()
    1986              :         };
    1987              : 
    1988           78 :         let mut compact_to = Vec::new();
    1989           78 :         let mut keep_layers = HashSet::new();
    1990           78 :         let produced_delta_layers_len = produced_delta_layers.len();
    1991           78 :         let produced_image_layers_len = produced_image_layers.len();
    1992          132 :         for action in produced_delta_layers {
    1993           54 :             match action {
    1994           30 :                 SplitWriterResult::Produced(layer) => {
    1995           30 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    1996           30 :                     compact_to.push(layer);
    1997           30 :                 }
    1998           24 :                 SplitWriterResult::Discarded(l) => {
    1999           24 :                     keep_layers.insert(l);
    2000           24 :                     stat.discard_delta_layer();
    2001           24 :                 }
    2002              :             }
    2003              :         }
    2004          138 :         for action in produced_image_layers {
    2005           60 :             match action {
    2006           36 :                 SplitWriterResult::Produced(layer) => {
    2007           36 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2008           36 :                     compact_to.push(layer);
    2009           36 :                 }
    2010           24 :                 SplitWriterResult::Discarded(l) => {
    2011           24 :                     keep_layers.insert(l);
    2012           24 :                     stat.discard_image_layer();
    2013           24 :                 }
    2014              :             }
    2015              :         }
    2016           78 :         let mut layer_selection = layer_selection;
    2017          246 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2018           78 : 
    2019           78 :         info!(
    2020            0 :             "gc-compaction statistics: {}",
    2021            0 :             serde_json::to_string(&stat)?
    2022              :         );
    2023              : 
    2024           78 :         if dry_run {
    2025           12 :             return Ok(());
    2026           66 :         }
    2027           66 : 
    2028           66 :         info!(
    2029            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2030            0 :             produced_delta_layers_len,
    2031            0 :             produced_image_layers_len,
    2032            0 :             layer_selection.len()
    2033              :         );
    2034              : 
    2035              :         // Step 3: Place back to the layer map.
    2036              :         {
    2037           66 :             let mut guard = self.layers.write().await;
    2038           66 :             guard
    2039           66 :                 .open_mut()?
    2040           66 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2041           66 :         };
    2042           66 :         self.remote_client
    2043           66 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2044              : 
    2045           66 :         drop(gc_lock);
    2046           66 : 
    2047           66 :         Ok(())
    2048           78 :     }
    2049              : }
    2050              : 
    2051              : struct TimelineAdaptor {
    2052              :     timeline: Arc<Timeline>,
    2053              : 
    2054              :     keyspace: (Lsn, KeySpace),
    2055              : 
    2056              :     new_deltas: Vec<ResidentLayer>,
    2057              :     new_images: Vec<ResidentLayer>,
    2058              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2059              : }
    2060              : 
    2061              : impl TimelineAdaptor {
    2062            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2063            0 :         Self {
    2064            0 :             timeline: timeline.clone(),
    2065            0 :             keyspace,
    2066            0 :             new_images: Vec::new(),
    2067            0 :             new_deltas: Vec::new(),
    2068            0 :             layers_to_delete: Vec::new(),
    2069            0 :         }
    2070            0 :     }
    2071              : 
    2072            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2073            0 :         let layers_to_delete = {
    2074            0 :             let guard = self.timeline.layers.read().await;
    2075            0 :             self.layers_to_delete
    2076            0 :                 .iter()
    2077            0 :                 .map(|x| guard.get_from_desc(x))
    2078            0 :                 .collect::<Vec<Layer>>()
    2079            0 :         };
    2080            0 :         self.timeline
    2081            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2082            0 :             .await?;
    2083              : 
    2084            0 :         self.timeline
    2085            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2086              : 
    2087            0 :         self.new_deltas.clear();
    2088            0 :         self.layers_to_delete.clear();
    2089            0 :         Ok(())
    2090            0 :     }
    2091              : }
    2092              : 
    2093              : #[derive(Clone)]
    2094              : struct ResidentDeltaLayer(ResidentLayer);
    2095              : #[derive(Clone)]
    2096              : struct ResidentImageLayer(ResidentLayer);
    2097              : 
    2098              : impl CompactionJobExecutor for TimelineAdaptor {
    2099              :     type Key = crate::repository::Key;
    2100              : 
    2101              :     type Layer = OwnArc<PersistentLayerDesc>;
    2102              :     type DeltaLayer = ResidentDeltaLayer;
    2103              :     type ImageLayer = ResidentImageLayer;
    2104              : 
    2105              :     type RequestContext = crate::context::RequestContext;
    2106              : 
    2107            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2108            0 :         self.timeline.get_shard_identity()
    2109            0 :     }
    2110              : 
    2111            0 :     async fn get_layers(
    2112            0 :         &mut self,
    2113            0 :         key_range: &Range<Key>,
    2114            0 :         lsn_range: &Range<Lsn>,
    2115            0 :         _ctx: &RequestContext,
    2116            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2117            0 :         self.flush_updates().await?;
    2118              : 
    2119            0 :         let guard = self.timeline.layers.read().await;
    2120            0 :         let layer_map = guard.layer_map()?;
    2121              : 
    2122            0 :         let result = layer_map
    2123            0 :             .iter_historic_layers()
    2124            0 :             .filter(|l| {
    2125            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2126            0 :             })
    2127            0 :             .map(OwnArc)
    2128            0 :             .collect();
    2129            0 :         Ok(result)
    2130            0 :     }
    2131              : 
    2132            0 :     async fn get_keyspace(
    2133            0 :         &mut self,
    2134            0 :         key_range: &Range<Key>,
    2135            0 :         lsn: Lsn,
    2136            0 :         _ctx: &RequestContext,
    2137            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2138            0 :         if lsn == self.keyspace.0 {
    2139            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2140            0 :                 &self.keyspace.1.ranges,
    2141            0 :                 key_range,
    2142            0 :             ))
    2143              :         } else {
    2144              :             // The current compaction implementation only ever requests the key space
    2145              :             // at the compaction end LSN.
    2146            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2147              :         }
    2148            0 :     }
    2149              : 
    2150            0 :     async fn downcast_delta_layer(
    2151            0 :         &self,
    2152            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2153            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2154            0 :         // this is a lot more complex than a simple downcast...
    2155            0 :         if layer.is_delta() {
    2156            0 :             let l = {
    2157            0 :                 let guard = self.timeline.layers.read().await;
    2158            0 :                 guard.get_from_desc(layer)
    2159              :             };
    2160            0 :             let result = l.download_and_keep_resident().await?;
    2161              : 
    2162            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2163              :         } else {
    2164            0 :             Ok(None)
    2165              :         }
    2166            0 :     }
    2167              : 
    2168            0 :     async fn create_image(
    2169            0 :         &mut self,
    2170            0 :         lsn: Lsn,
    2171            0 :         key_range: &Range<Key>,
    2172            0 :         ctx: &RequestContext,
    2173            0 :     ) -> anyhow::Result<()> {
    2174            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2175            0 :     }
    2176              : 
    2177            0 :     async fn create_delta(
    2178            0 :         &mut self,
    2179            0 :         lsn_range: &Range<Lsn>,
    2180            0 :         key_range: &Range<Key>,
    2181            0 :         input_layers: &[ResidentDeltaLayer],
    2182            0 :         ctx: &RequestContext,
    2183            0 :     ) -> anyhow::Result<()> {
    2184            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2185              : 
    2186            0 :         let mut all_entries = Vec::new();
    2187            0 :         for dl in input_layers.iter() {
    2188            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2189              :         }
    2190              : 
    2191              :         // The current stdlib sorting implementation is designed in a way where it is
    2192              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2193            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2194              : 
    2195            0 :         let mut writer = DeltaLayerWriter::new(
    2196            0 :             self.timeline.conf,
    2197            0 :             self.timeline.timeline_id,
    2198            0 :             self.timeline.tenant_shard_id,
    2199            0 :             key_range.start,
    2200            0 :             lsn_range.clone(),
    2201            0 :             ctx,
    2202            0 :         )
    2203            0 :         .await?;
    2204              : 
    2205            0 :         let mut dup_values = 0;
    2206            0 : 
    2207            0 :         // This iterator walks through all key-value pairs from all the layers
    2208            0 :         // we're compacting, in key, LSN order.
    2209            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2210              :         for &DeltaEntry {
    2211            0 :             key, lsn, ref val, ..
    2212            0 :         } in all_entries.iter()
    2213              :         {
    2214            0 :             if prev == Some((key, lsn)) {
    2215              :                 // This is a duplicate. Skip it.
    2216              :                 //
    2217              :                 // It can happen if compaction is interrupted after writing some
    2218              :                 // layers but not all, and we are compacting the range again.
    2219              :                 // The calculations in the algorithm assume that there are no
    2220              :                 // duplicates, so the math on targeted file size is likely off,
    2221              :                 // and we will create smaller files than expected.
    2222            0 :                 dup_values += 1;
    2223            0 :                 continue;
    2224            0 :             }
    2225              : 
    2226            0 :             let value = val.load(ctx).await?;
    2227              : 
    2228            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2229              : 
    2230            0 :             prev = Some((key, lsn));
    2231              :         }
    2232              : 
    2233            0 :         if dup_values > 0 {
    2234            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2235            0 :         }
    2236              : 
    2237            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2238            0 :             Err(anyhow::anyhow!(
    2239            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2240            0 :             ))
    2241            0 :         });
    2242              : 
    2243            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2244            0 :         let new_delta_layer =
    2245            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2246              : 
    2247            0 :         self.new_deltas.push(new_delta_layer);
    2248            0 :         Ok(())
    2249            0 :     }
    2250              : 
    2251            0 :     async fn delete_layer(
    2252            0 :         &mut self,
    2253            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2254            0 :         _ctx: &RequestContext,
    2255            0 :     ) -> anyhow::Result<()> {
    2256            0 :         self.layers_to_delete.push(layer.clone().0);
    2257            0 :         Ok(())
    2258            0 :     }
    2259              : }
    2260              : 
    2261              : impl TimelineAdaptor {
    2262            0 :     async fn create_image_impl(
    2263            0 :         &mut self,
    2264            0 :         lsn: Lsn,
    2265            0 :         key_range: &Range<Key>,
    2266            0 :         ctx: &RequestContext,
    2267            0 :     ) -> Result<(), CreateImageLayersError> {
    2268            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2269              : 
    2270            0 :         let image_layer_writer = ImageLayerWriter::new(
    2271            0 :             self.timeline.conf,
    2272            0 :             self.timeline.timeline_id,
    2273            0 :             self.timeline.tenant_shard_id,
    2274            0 :             key_range,
    2275            0 :             lsn,
    2276            0 :             ctx,
    2277            0 :         )
    2278            0 :         .await?;
    2279              : 
    2280            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2281            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2282            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2283            0 :             )))
    2284            0 :         });
    2285              : 
    2286            0 :         let keyspace = KeySpace {
    2287            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2288              :         };
    2289              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2290            0 :         let start = Key::MIN;
    2291              :         let ImageLayerCreationOutcome {
    2292            0 :             image,
    2293              :             next_start_key: _,
    2294            0 :         } = self
    2295            0 :             .timeline
    2296            0 :             .create_image_layer_for_rel_blocks(
    2297            0 :                 &keyspace,
    2298            0 :                 image_layer_writer,
    2299            0 :                 lsn,
    2300            0 :                 ctx,
    2301            0 :                 key_range.clone(),
    2302            0 :                 start,
    2303            0 :             )
    2304            0 :             .await?;
    2305              : 
    2306            0 :         if let Some(image_layer) = image {
    2307            0 :             self.new_images.push(image_layer);
    2308            0 :         }
    2309              : 
    2310            0 :         timer.stop_and_record();
    2311            0 : 
    2312            0 :         Ok(())
    2313            0 :     }
    2314              : }
    2315              : 
    2316              : impl CompactionRequestContext for crate::context::RequestContext {}
    2317              : 
    2318              : #[derive(Debug, Clone)]
    2319              : pub struct OwnArc<T>(pub Arc<T>);
    2320              : 
    2321              : impl<T> Deref for OwnArc<T> {
    2322              :     type Target = <Arc<T> as Deref>::Target;
    2323            0 :     fn deref(&self) -> &Self::Target {
    2324            0 :         &self.0
    2325            0 :     }
    2326              : }
    2327              : 
    2328              : impl<T> AsRef<T> for OwnArc<T> {
    2329            0 :     fn as_ref(&self) -> &T {
    2330            0 :         self.0.as_ref()
    2331            0 :     }
    2332              : }
    2333              : 
    2334              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2335            0 :     fn key_range(&self) -> &Range<Key> {
    2336            0 :         &self.key_range
    2337            0 :     }
    2338            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2339            0 :         &self.lsn_range
    2340            0 :     }
    2341            0 :     fn file_size(&self) -> u64 {
    2342            0 :         self.file_size
    2343            0 :     }
    2344            0 :     fn short_id(&self) -> std::string::String {
    2345            0 :         self.as_ref().short_id().to_string()
    2346            0 :     }
    2347            0 :     fn is_delta(&self) -> bool {
    2348            0 :         self.as_ref().is_delta()
    2349            0 :     }
    2350              : }
    2351              : 
    2352              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2353            0 :     fn key_range(&self) -> &Range<Key> {
    2354            0 :         &self.layer_desc().key_range
    2355            0 :     }
    2356            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2357            0 :         &self.layer_desc().lsn_range
    2358            0 :     }
    2359            0 :     fn file_size(&self) -> u64 {
    2360            0 :         self.layer_desc().file_size
    2361            0 :     }
    2362            0 :     fn short_id(&self) -> std::string::String {
    2363            0 :         self.layer_desc().short_id().to_string()
    2364            0 :     }
    2365            0 :     fn is_delta(&self) -> bool {
    2366            0 :         true
    2367            0 :     }
    2368              : }
    2369              : 
    2370              : use crate::tenant::timeline::DeltaEntry;
    2371              : 
    2372              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2373            0 :     fn key_range(&self) -> &Range<Key> {
    2374            0 :         &self.0.layer_desc().key_range
    2375            0 :     }
    2376            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2377            0 :         &self.0.layer_desc().lsn_range
    2378            0 :     }
    2379            0 :     fn file_size(&self) -> u64 {
    2380            0 :         self.0.layer_desc().file_size
    2381            0 :     }
    2382            0 :     fn short_id(&self) -> std::string::String {
    2383            0 :         self.0.layer_desc().short_id().to_string()
    2384            0 :     }
    2385            0 :     fn is_delta(&self) -> bool {
    2386            0 :         true
    2387            0 :     }
    2388              : }
    2389              : 
    2390              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2391              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2392              : 
    2393            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2394            0 :         self.0.load_keys(ctx).await
    2395            0 :     }
    2396              : }
    2397              : 
    2398              : impl CompactionLayer<Key> for ResidentImageLayer {
    2399            0 :     fn key_range(&self) -> &Range<Key> {
    2400            0 :         &self.0.layer_desc().key_range
    2401            0 :     }
    2402            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2403            0 :         &self.0.layer_desc().lsn_range
    2404            0 :     }
    2405            0 :     fn file_size(&self) -> u64 {
    2406            0 :         self.0.layer_desc().file_size
    2407            0 :     }
    2408            0 :     fn short_id(&self) -> std::string::String {
    2409            0 :         self.0.layer_desc().short_id().to_string()
    2410            0 :     }
    2411            0 :     fn is_delta(&self) -> bool {
    2412            0 :         false
    2413            0 :     }
    2414              : }
    2415              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta