LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 60.8 % 2037 1239
Test Date: 2025-01-07 20:58:07 Functions: 43.5 % 147 64

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::key::KEY_SIZE;
      22              : use pageserver_api::keyspace::ShardedRange;
      23              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      24              : use serde::Serialize;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      27              : use utils::id::TimelineId;
      28              : 
      29              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache;
      31              : use crate::statvfs::Statvfs;
      32              : use crate::tenant::checks::check_valid_layermap;
      33              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      34              : use crate::tenant::storage_layer::batch_split_writer::{
      35              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      36              : };
      37              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      38              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      39              : use crate::tenant::storage_layer::{
      40              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      41              : };
      42              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      43              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      44              : use crate::tenant::timeline::{Layer, ResidentLayer};
      45              : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
      46              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      47              : use pageserver_api::config::tenant_conf_defaults::{
      48              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      49              : };
      50              : 
      51              : use pageserver_api::key::Key;
      52              : use pageserver_api::keyspace::KeySpace;
      53              : use pageserver_api::record::NeonWalRecord;
      54              : use pageserver_api::value::Value;
      55              : 
      56              : use utils::lsn::Lsn;
      57              : 
      58              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      59              : use pageserver_compaction::interface::*;
      60              : 
      61              : use super::CompactionError;
      62              : 
      63              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      64              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      65              : 
      66              : /// A scheduled compaction task.
      67              : pub(crate) struct ScheduledCompactionTask {
      68              :     /// It's unfortunate that we need to store a compact options struct here because the only outer
      69              :     /// API we can call here is `compact_with_options` which does a few setup calls before starting the
      70              :     /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
      71              :     pub options: CompactOptions,
      72              :     /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
      73              :     pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
      74              :     /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
      75              :     pub gc_block: Option<gc_block::Guard>,
      76              : }
      77              : 
      78              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
      79              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
      80              : /// called.
      81              : #[derive(Debug, Clone)]
      82              : pub(crate) struct GcCompactJob {
      83              :     pub dry_run: bool,
      84              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
      85              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
      86              :     pub compact_key_range: Range<Key>,
      87              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
      88              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
      89              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
      90              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
      91              :     pub compact_lsn_range: Range<Lsn>,
      92              : }
      93              : 
      94              : impl GcCompactJob {
      95           54 :     pub fn from_compact_options(options: CompactOptions) -> Self {
      96           54 :         GcCompactJob {
      97           54 :             dry_run: options.flags.contains(CompactFlags::DryRun),
      98           54 :             compact_key_range: options
      99           54 :                 .compact_key_range
     100           54 :                 .map(|x| x.into())
     101           54 :                 .unwrap_or(Key::MIN..Key::MAX),
     102           54 :             compact_lsn_range: options
     103           54 :                 .compact_lsn_range
     104           54 :                 .map(|x| x.into())
     105           54 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     106           54 :         }
     107           54 :     }
     108              : }
     109              : 
     110              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     111              : /// and contains the exact layers we want to compact.
     112              : pub struct GcCompactionJobDescription {
     113              :     /// All layers to read in the compaction job
     114              :     selected_layers: Vec<Layer>,
     115              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     116              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     117              :     gc_cutoff: Lsn,
     118              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     119              :     /// generate an image == this LSN.
     120              :     retain_lsns_below_horizon: Vec<Lsn>,
     121              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     122              :     /// \>= this LSN will be kept and will not be rewritten.
     123              :     max_layer_lsn: Lsn,
     124              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     125              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     126              :     /// k-merge within gc-compaction.
     127              :     min_layer_lsn: Lsn,
     128              :     /// Only compact layers overlapping with this range.
     129              :     compaction_key_range: Range<Key>,
     130              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     131              :     /// This field is here solely for debugging. The field will not be read once the compaction
     132              :     /// description is generated.
     133              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     134              : }
     135              : 
     136              : /// The result of bottom-most compaction for a single key at each LSN.
     137              : #[derive(Debug)]
     138              : #[cfg_attr(test, derive(PartialEq))]
     139              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     140              : 
     141              : /// The result of bottom-most compaction.
     142              : #[derive(Debug)]
     143              : #[cfg_attr(test, derive(PartialEq))]
     144              : pub(crate) struct KeyHistoryRetention {
     145              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     146              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     147              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     148              :     pub(crate) above_horizon: KeyLogAtLsn,
     149              : }
     150              : 
     151              : impl KeyHistoryRetention {
     152              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     153              :     ///
     154              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     155              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     156              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     157              :     /// Then we delete branch @ 0x20.
     158              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     159              :     /// And that wouldnt' change the shape of the layer.
     160              :     ///
     161              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     162              :     ///
     163              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     164           74 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     165           74 :         if dry_run {
     166            0 :             return true;
     167           74 :         }
     168           74 :         let guard = tline.layers.read().await;
     169           74 :         if !guard.contains_key(key) {
     170           52 :             return false;
     171           22 :         }
     172           22 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     173           22 :         drop(guard);
     174           22 :         if layer_generation == tline.generation {
     175           22 :             info!(
     176              :                 key=%key,
     177              :                 ?layer_generation,
     178            0 :                 "discard layer due to duplicated layer key in the same generation",
     179              :             );
     180           22 :             true
     181              :         } else {
     182            0 :             false
     183              :         }
     184           74 :     }
     185              : 
     186              :     /// Pipe a history of a single key to the writers.
     187              :     ///
     188              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     189              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     190              :     #[allow(clippy::too_many_arguments)]
     191          622 :     async fn pipe_to(
     192          622 :         self,
     193          622 :         key: Key,
     194          622 :         delta_writer: &mut SplitDeltaLayerWriter,
     195          622 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     196          622 :         stat: &mut CompactionStatistics,
     197          622 :         ctx: &RequestContext,
     198          622 :     ) -> anyhow::Result<()> {
     199          622 :         let mut first_batch = true;
     200         2012 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     201         1390 :             if first_batch {
     202          622 :                 if logs.len() == 1 && logs[0].1.is_image() {
     203          584 :                     let Value::Image(img) = &logs[0].1 else {
     204            0 :                         unreachable!()
     205              :                     };
     206          584 :                     stat.produce_image_key(img);
     207          584 :                     if let Some(image_writer) = image_writer.as_mut() {
     208          584 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     209              :                     } else {
     210            0 :                         delta_writer
     211            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     212            0 :                             .await?;
     213              :                     }
     214              :                 } else {
     215           66 :                     for (lsn, val) in logs {
     216           28 :                         stat.produce_key(&val);
     217           28 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     218              :                     }
     219              :                 }
     220          622 :                 first_batch = false;
     221              :             } else {
     222          884 :                 for (lsn, val) in logs {
     223          116 :                     stat.produce_key(&val);
     224          116 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     225              :                 }
     226              :             }
     227              :         }
     228          622 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     229          680 :         for (lsn, val) in above_horizon_logs {
     230           58 :             stat.produce_key(&val);
     231           58 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     232              :         }
     233          622 :         Ok(())
     234          622 :     }
     235              : }
     236              : 
     237              : #[derive(Debug, Serialize, Default)]
     238              : struct CompactionStatisticsNumSize {
     239              :     num: u64,
     240              :     size: u64,
     241              : }
     242              : 
     243              : #[derive(Debug, Serialize, Default)]
     244              : pub struct CompactionStatistics {
     245              :     delta_layer_visited: CompactionStatisticsNumSize,
     246              :     image_layer_visited: CompactionStatisticsNumSize,
     247              :     delta_layer_produced: CompactionStatisticsNumSize,
     248              :     image_layer_produced: CompactionStatisticsNumSize,
     249              :     num_delta_layer_discarded: usize,
     250              :     num_image_layer_discarded: usize,
     251              :     num_unique_keys_visited: usize,
     252              :     wal_keys_visited: CompactionStatisticsNumSize,
     253              :     image_keys_visited: CompactionStatisticsNumSize,
     254              :     wal_produced: CompactionStatisticsNumSize,
     255              :     image_produced: CompactionStatisticsNumSize,
     256              : }
     257              : 
     258              : impl CompactionStatistics {
     259         1042 :     fn estimated_size_of_value(val: &Value) -> usize {
     260          432 :         match val {
     261          610 :             Value::Image(img) => img.len(),
     262            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     263          432 :             _ => std::mem::size_of::<NeonWalRecord>(),
     264              :         }
     265         1042 :     }
     266         1636 :     fn estimated_size_of_key() -> usize {
     267         1636 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     268         1636 :     }
     269           86 :     fn visit_delta_layer(&mut self, size: u64) {
     270           86 :         self.delta_layer_visited.num += 1;
     271           86 :         self.delta_layer_visited.size += size;
     272           86 :     }
     273           66 :     fn visit_image_layer(&mut self, size: u64) {
     274           66 :         self.image_layer_visited.num += 1;
     275           66 :         self.image_layer_visited.size += size;
     276           66 :     }
     277          622 :     fn on_unique_key_visited(&mut self) {
     278          622 :         self.num_unique_keys_visited += 1;
     279          622 :     }
     280          240 :     fn visit_wal_key(&mut self, val: &Value) {
     281          240 :         self.wal_keys_visited.num += 1;
     282          240 :         self.wal_keys_visited.size +=
     283          240 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     284          240 :     }
     285          610 :     fn visit_image_key(&mut self, val: &Value) {
     286          610 :         self.image_keys_visited.num += 1;
     287          610 :         self.image_keys_visited.size +=
     288          610 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     289          610 :     }
     290          202 :     fn produce_key(&mut self, val: &Value) {
     291          202 :         match val {
     292           10 :             Value::Image(img) => self.produce_image_key(img),
     293          192 :             Value::WalRecord(_) => self.produce_wal_key(val),
     294              :         }
     295          202 :     }
     296          192 :     fn produce_wal_key(&mut self, val: &Value) {
     297          192 :         self.wal_produced.num += 1;
     298          192 :         self.wal_produced.size +=
     299          192 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     300          192 :     }
     301          594 :     fn produce_image_key(&mut self, val: &Bytes) {
     302          594 :         self.image_produced.num += 1;
     303          594 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     304          594 :     }
     305           14 :     fn discard_delta_layer(&mut self) {
     306           14 :         self.num_delta_layer_discarded += 1;
     307           14 :     }
     308            8 :     fn discard_image_layer(&mut self) {
     309            8 :         self.num_image_layer_discarded += 1;
     310            8 :     }
     311           22 :     fn produce_delta_layer(&mut self, size: u64) {
     312           22 :         self.delta_layer_produced.num += 1;
     313           22 :         self.delta_layer_produced.size += size;
     314           22 :     }
     315           30 :     fn produce_image_layer(&mut self, size: u64) {
     316           30 :         self.image_layer_produced.num += 1;
     317           30 :         self.image_layer_produced.size += size;
     318           30 :     }
     319              : }
     320              : 
     321              : impl Timeline {
     322              :     /// TODO: cancellation
     323              :     ///
     324              :     /// Returns whether the compaction has pending tasks.
     325          364 :     pub(crate) async fn compact_legacy(
     326          364 :         self: &Arc<Self>,
     327          364 :         cancel: &CancellationToken,
     328          364 :         options: CompactOptions,
     329          364 :         ctx: &RequestContext,
     330          364 :     ) -> Result<bool, CompactionError> {
     331          364 :         if options
     332          364 :             .flags
     333          364 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     334              :         {
     335            0 :             self.compact_with_gc(cancel, options, ctx)
     336            0 :                 .await
     337            0 :                 .map_err(CompactionError::Other)?;
     338            0 :             return Ok(false);
     339          364 :         }
     340          364 : 
     341          364 :         if options.flags.contains(CompactFlags::DryRun) {
     342            0 :             return Err(CompactionError::Other(anyhow!(
     343            0 :                 "dry-run mode is not supported for legacy compaction for now"
     344            0 :             )));
     345          364 :         }
     346          364 : 
     347          364 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
     348              :             // maybe useful in the future? could implement this at some point
     349            0 :             return Err(CompactionError::Other(anyhow!(
     350            0 :                 "compaction range is not supported for legacy compaction for now"
     351            0 :             )));
     352          364 :         }
     353          364 : 
     354          364 :         // High level strategy for compaction / image creation:
     355          364 :         //
     356          364 :         // 1. First, calculate the desired "partitioning" of the
     357          364 :         // currently in-use key space. The goal is to partition the
     358          364 :         // key space into roughly fixed-size chunks, but also take into
     359          364 :         // account any existing image layers, and try to align the
     360          364 :         // chunk boundaries with the existing image layers to avoid
     361          364 :         // too much churn. Also try to align chunk boundaries with
     362          364 :         // relation boundaries.  In principle, we don't know about
     363          364 :         // relation boundaries here, we just deal with key-value
     364          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     365          364 :         // map relations into key-value pairs. But in practice we know
     366          364 :         // that 'field6' is the block number, and the fields 1-5
     367          364 :         // identify a relation. This is just an optimization,
     368          364 :         // though.
     369          364 :         //
     370          364 :         // 2. Once we know the partitioning, for each partition,
     371          364 :         // decide if it's time to create a new image layer. The
     372          364 :         // criteria is: there has been too much "churn" since the last
     373          364 :         // image layer? The "churn" is fuzzy concept, it's a
     374          364 :         // combination of too many delta files, or too much WAL in
     375          364 :         // total in the delta file. Or perhaps: if creating an image
     376          364 :         // file would allow to delete some older files.
     377          364 :         //
     378          364 :         // 3. After that, we compact all level0 delta files if there
     379          364 :         // are too many of them.  While compacting, we also garbage
     380          364 :         // collect any page versions that are no longer needed because
     381          364 :         // of the new image layers we created in step 2.
     382          364 :         //
     383          364 :         // TODO: This high level strategy hasn't been implemented yet.
     384          364 :         // Below are functions compact_level0() and create_image_layers()
     385          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     386          364 :         // above. Rewrite it.
     387          364 : 
     388          364 :         // Is the timeline being deleted?
     389          364 :         if self.is_stopping() {
     390            0 :             trace!("Dropping out of compaction on timeline shutdown");
     391            0 :             return Err(CompactionError::ShuttingDown);
     392          364 :         }
     393          364 : 
     394          364 :         let target_file_size = self.get_checkpoint_distance();
     395              : 
     396              :         // Define partitioning schema if needed
     397              : 
     398              :         // FIXME: the match should only cover repartitioning, not the next steps
     399          364 :         let (partition_count, has_pending_tasks) = match self
     400          364 :             .repartition(
     401          364 :                 self.get_last_record_lsn(),
     402          364 :                 self.get_compaction_target_size(),
     403          364 :                 options.flags,
     404          364 :                 ctx,
     405          364 :             )
     406          364 :             .await
     407              :         {
     408          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     409          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     410          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     411          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     412          364 :                     .build();
     413          364 : 
     414          364 :                 // 2. Compact
     415          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     416          364 :                 let fully_compacted = self
     417          364 :                     .compact_level0(
     418          364 :                         target_file_size,
     419          364 :                         options.flags.contains(CompactFlags::ForceL0Compaction),
     420          364 :                         ctx,
     421          364 :                     )
     422          364 :                     .await?;
     423          364 :                 timer.stop_and_record();
     424          364 : 
     425          364 :                 let mut partitioning = dense_partitioning;
     426          364 :                 partitioning
     427          364 :                     .parts
     428          364 :                     .extend(sparse_partitioning.into_dense().parts);
     429          364 : 
     430          364 :                 // 3. Create new image layers for partitions that have been modified
     431          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     432          364 :                 if fully_compacted {
     433          364 :                     let image_layers = self
     434          364 :                         .create_image_layers(
     435          364 :                             &partitioning,
     436          364 :                             lsn,
     437          364 :                             if options
     438          364 :                                 .flags
     439          364 :                                 .contains(CompactFlags::ForceImageLayerCreation)
     440              :                             {
     441           14 :                                 ImageLayerCreationMode::Force
     442              :                             } else {
     443          350 :                                 ImageLayerCreationMode::Try
     444              :                             },
     445          364 :                             &image_ctx,
     446          364 :                         )
     447          364 :                         .await?;
     448              : 
     449          364 :                     self.upload_new_image_layers(image_layers)?;
     450              :                 } else {
     451            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     452              :                 }
     453          364 :                 (partitioning.parts.len(), !fully_compacted)
     454              :             }
     455            0 :             Err(err) => {
     456            0 :                 // no partitioning? This is normal, if the timeline was just created
     457            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     458            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     459            0 :                 // error but continue.
     460            0 :                 //
     461            0 :                 // Suppress error when it's due to cancellation
     462            0 :                 if !self.cancel.is_cancelled() && !err.is_cancelled() {
     463            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     464            0 :                 }
     465            0 :                 (1, false)
     466              :             }
     467              :         };
     468              : 
     469          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     470              :             // Limit the number of layer rewrites to the number of partitions: this means its
     471              :             // runtime should be comparable to a full round of image layer creations, rather than
     472              :             // being potentially much longer.
     473            0 :             let rewrite_max = partition_count;
     474            0 : 
     475            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     476          364 :         }
     477              : 
     478          364 :         Ok(has_pending_tasks)
     479          364 :     }
     480              : 
     481              :     /// Check for layers that are elegible to be rewritten:
     482              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     483              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     484              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     485              :     ///   rather not maintain compatibility with indefinitely.
     486              :     ///
     487              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     488              :     /// how much work it will try to do in each compaction pass.
     489            0 :     async fn compact_shard_ancestors(
     490            0 :         self: &Arc<Self>,
     491            0 :         rewrite_max: usize,
     492            0 :         ctx: &RequestContext,
     493            0 :     ) -> Result<(), CompactionError> {
     494            0 :         let mut drop_layers = Vec::new();
     495            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     496            0 : 
     497            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     498            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     499            0 :         // pitr_interval, for example because a branchpoint references it.
     500            0 :         //
     501            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     502            0 :         // are rewriting layers.
     503            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     504            0 : 
     505            0 :         tracing::info!(
     506            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     507            0 :             *latest_gc_cutoff,
     508            0 :             self.gc_info.read().unwrap().cutoffs.time
     509              :         );
     510              : 
     511            0 :         let layers = self.layers.read().await;
     512            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     513            0 :             let layer = layers.get_from_desc(&layer_desc);
     514            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     515              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     516            0 :                 continue;
     517            0 :             }
     518            0 : 
     519            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     520            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     521            0 :             let layer_local_page_count = sharded_range.page_count();
     522            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     523            0 :             if layer_local_page_count == 0 {
     524              :                 // This ancestral layer only covers keys that belong to other shards.
     525              :                 // We include the full metadata in the log: if we had some critical bug that caused
     526              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     527            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     528            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     529              :                 );
     530              : 
     531            0 :                 if cfg!(debug_assertions) {
     532              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     533              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     534              :                     // should be !is_key_disposable()
     535            0 :                     let range = layer_desc.get_key_range();
     536            0 :                     let mut key = range.start;
     537            0 :                     while key < range.end {
     538            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     539            0 :                         key = key.next();
     540              :                     }
     541            0 :                 }
     542              : 
     543            0 :                 drop_layers.push(layer);
     544            0 :                 continue;
     545            0 :             } else if layer_local_page_count != u32::MAX
     546            0 :                 && layer_local_page_count == layer_raw_page_count
     547              :             {
     548            0 :                 debug!(%layer,
     549            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     550              :                     layer_local_page_count
     551              :                 );
     552            0 :                 continue;
     553            0 :             }
     554            0 : 
     555            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     556            0 :             if layer_local_page_count != u32::MAX
     557            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     558              :             {
     559            0 :                 debug!(%layer,
     560            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     561              :                     layer_local_page_count,
     562              :                     layer_raw_page_count
     563              :                 );
     564            0 :             }
     565              : 
     566              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     567              :             // without incurring the I/O cost of a rewrite.
     568            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     569            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     570            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     571            0 :                 continue;
     572            0 :             }
     573            0 : 
     574            0 :             if layer_desc.is_delta() {
     575              :                 // We do not yet implement rewrite of delta layers
     576            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     577            0 :                 continue;
     578            0 :             }
     579            0 : 
     580            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     581            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     582            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     583            0 :             if layer.metadata().generation == self.generation {
     584            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     585            0 :                 continue;
     586            0 :             }
     587            0 : 
     588            0 :             if layers_to_rewrite.len() >= rewrite_max {
     589            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     590            0 :                     layers_to_rewrite.len()
     591              :                 );
     592            0 :                 continue;
     593            0 :             }
     594            0 : 
     595            0 :             // Fall through: all our conditions for doing a rewrite passed.
     596            0 :             layers_to_rewrite.push(layer);
     597              :         }
     598              : 
     599              :         // Drop read lock on layer map before we start doing time-consuming I/O
     600            0 :         drop(layers);
     601            0 : 
     602            0 :         let mut replace_image_layers = Vec::new();
     603              : 
     604            0 :         for layer in layers_to_rewrite {
     605            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     606            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     607            0 :                 self.conf,
     608            0 :                 self.timeline_id,
     609            0 :                 self.tenant_shard_id,
     610            0 :                 &layer.layer_desc().key_range,
     611            0 :                 layer.layer_desc().image_layer_lsn(),
     612            0 :                 ctx,
     613            0 :             )
     614            0 :             .await
     615            0 :             .map_err(CompactionError::Other)?;
     616              : 
     617              :             // Safety of layer rewrites:
     618              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     619              :             //   cannot interfere with the new one.
     620              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     621              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     622              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     623              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     624              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     625              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     626              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     627              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     628              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     629              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     630            0 :             let resident = layer.download_and_keep_resident().await?;
     631              : 
     632            0 :             let keys_written = resident
     633            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     634            0 :                 .await?;
     635              : 
     636            0 :             if keys_written > 0 {
     637            0 :                 let (desc, path) = image_layer_writer
     638            0 :                     .finish(ctx)
     639            0 :                     .await
     640            0 :                     .map_err(CompactionError::Other)?;
     641            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     642            0 :                     .map_err(CompactionError::Other)?;
     643            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     644            0 :                     layer.metadata().file_size,
     645            0 :                     new_layer.metadata().file_size);
     646              : 
     647            0 :                 replace_image_layers.push((layer, new_layer));
     648            0 :             } else {
     649            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     650            0 :                 // the layer has no data for us with the ShardedRange check above, but
     651            0 :                 drop_layers.push(layer);
     652            0 :             }
     653              :         }
     654              : 
     655              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     656              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     657              :         // to remote index) and be removed. This is inefficient but safe.
     658            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     659            0 : 
     660            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     661            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     662            0 :             .await?;
     663              : 
     664            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     665            0 : 
     666            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     667            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     668            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     669            0 :         // load.
     670            0 :         match self.remote_client.wait_completion().await {
     671            0 :             Ok(()) => (),
     672            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     673              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     674            0 :                 return Err(CompactionError::ShuttingDown)
     675              :             }
     676              :         }
     677              : 
     678            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     679            0 : 
     680            0 :         Ok(())
     681            0 :     }
     682              : 
     683              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     684              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     685              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     686              :     ///
     687              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     688              :     /// that we know won't be needed for reads.
     689          202 :     pub(super) async fn update_layer_visibility(
     690          202 :         &self,
     691          202 :     ) -> Result<(), super::layer_manager::Shutdown> {
     692          202 :         let head_lsn = self.get_last_record_lsn();
     693              : 
     694              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     695              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     696              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     697              :         // they will be subject to L0->L1 compaction in the near future.
     698          202 :         let layer_manager = self.layers.read().await;
     699          202 :         let layer_map = layer_manager.layer_map()?;
     700              : 
     701          202 :         let readable_points = {
     702          202 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     703          202 : 
     704          202 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     705          202 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
     706            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
     707            0 :                     continue;
     708            0 :                 }
     709            0 :                 readable_points.push(*child_lsn);
     710              :             }
     711          202 :             readable_points.push(head_lsn);
     712          202 :             readable_points
     713          202 :         };
     714          202 : 
     715          202 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     716          524 :         for (layer_desc, visibility) in layer_visibility {
     717          322 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     718          322 :             let layer = layer_manager.get_from_desc(&layer_desc);
     719          322 :             layer.set_visibility(visibility);
     720          322 :         }
     721              : 
     722              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     723              :         // avoid assuming that everything at a branch point is visible.
     724          202 :         drop(covered);
     725          202 :         Ok(())
     726          202 :     }
     727              : 
     728              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     729              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     730          364 :     async fn compact_level0(
     731          364 :         self: &Arc<Self>,
     732          364 :         target_file_size: u64,
     733          364 :         force_compaction_ignore_threshold: bool,
     734          364 :         ctx: &RequestContext,
     735          364 :     ) -> Result<bool, CompactionError> {
     736              :         let CompactLevel0Phase1Result {
     737          364 :             new_layers,
     738          364 :             deltas_to_compact,
     739          364 :             fully_compacted,
     740              :         } = {
     741          364 :             let phase1_span = info_span!("compact_level0_phase1");
     742          364 :             let ctx = ctx.attached_child();
     743          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     744          364 :                 version: Some(2),
     745          364 :                 tenant_id: Some(self.tenant_shard_id),
     746          364 :                 timeline_id: Some(self.timeline_id),
     747          364 :                 ..Default::default()
     748          364 :             };
     749          364 : 
     750          364 :             let begin = tokio::time::Instant::now();
     751          364 :             let phase1_layers_locked = self.layers.read().await;
     752          364 :             let now = tokio::time::Instant::now();
     753          364 :             stats.read_lock_acquisition_micros =
     754          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     755          364 :             self.compact_level0_phase1(
     756          364 :                 phase1_layers_locked,
     757          364 :                 stats,
     758          364 :                 target_file_size,
     759          364 :                 force_compaction_ignore_threshold,
     760          364 :                 &ctx,
     761          364 :             )
     762          364 :             .instrument(phase1_span)
     763          364 :             .await?
     764              :         };
     765              : 
     766          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     767              :             // nothing to do
     768          336 :             return Ok(true);
     769           28 :         }
     770           28 : 
     771           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     772           28 :             .await?;
     773           28 :         Ok(fully_compacted)
     774          364 :     }
     775              : 
     776              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     777          364 :     async fn compact_level0_phase1<'a>(
     778          364 :         self: &'a Arc<Self>,
     779          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     780          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     781          364 :         target_file_size: u64,
     782          364 :         force_compaction_ignore_threshold: bool,
     783          364 :         ctx: &RequestContext,
     784          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     785          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     786          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     787          364 :         let layers = guard.layer_map()?;
     788          364 :         let level0_deltas = layers.level0_deltas();
     789          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     790          364 : 
     791          364 :         // Only compact if enough layers have accumulated.
     792          364 :         let threshold = self.get_compaction_threshold();
     793          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     794          336 :             if force_compaction_ignore_threshold {
     795            0 :                 if !level0_deltas.is_empty() {
     796            0 :                     info!(
     797            0 :                         level0_deltas = level0_deltas.len(),
     798            0 :                         threshold, "too few deltas to compact, but forcing compaction"
     799              :                     );
     800              :                 } else {
     801            0 :                     info!(
     802            0 :                         level0_deltas = level0_deltas.len(),
     803            0 :                         threshold, "too few deltas to compact, cannot force compaction"
     804              :                     );
     805            0 :                     return Ok(CompactLevel0Phase1Result::default());
     806              :                 }
     807              :             } else {
     808          336 :                 debug!(
     809            0 :                     level0_deltas = level0_deltas.len(),
     810            0 :                     threshold, "too few deltas to compact"
     811              :                 );
     812          336 :                 return Ok(CompactLevel0Phase1Result::default());
     813              :             }
     814           28 :         }
     815              : 
     816           28 :         let mut level0_deltas = level0_deltas
     817           28 :             .iter()
     818          402 :             .map(|x| guard.get_from_desc(x))
     819           28 :             .collect::<Vec<_>>();
     820           28 : 
     821           28 :         // Gather the files to compact in this iteration.
     822           28 :         //
     823           28 :         // Start with the oldest Level 0 delta file, and collect any other
     824           28 :         // level 0 files that form a contiguous sequence, such that the end
     825           28 :         // LSN of previous file matches the start LSN of the next file.
     826           28 :         //
     827           28 :         // Note that if the files don't form such a sequence, we might
     828           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     829           28 :         // us to get rid of the level 0 file, and compact the other files on
     830           28 :         // the next iteration. This could probably made smarter, but such
     831           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     832           28 :         // of a crash, partial download from cloud storage, or something like
     833           28 :         // that, so it's not a big deal in practice.
     834          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     835           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     836           28 : 
     837           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     838           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     839           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     840           28 : 
     841           28 :         // Accumulate the size of layers in `deltas_to_compact`
     842           28 :         let mut deltas_to_compact_bytes = 0;
     843           28 : 
     844           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     845           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     846           28 :         // work in this function to only operate on this much delta data at once.
     847           28 :         //
     848           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     849           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     850           28 :         // still let them compact a full stack of L0s in one go.
     851           28 :         let delta_size_limit = std::cmp::max(
     852           28 :             self.get_compaction_threshold(),
     853           28 :             DEFAULT_COMPACTION_THRESHOLD,
     854           28 :         ) as u64
     855           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     856           28 : 
     857           28 :         let mut fully_compacted = true;
     858           28 : 
     859           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     860          402 :         for l in level0_deltas_iter {
     861          374 :             let lsn_range = &l.layer_desc().lsn_range;
     862          374 : 
     863          374 :             if lsn_range.start != prev_lsn_end {
     864            0 :                 break;
     865          374 :             }
     866          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     867          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     868          374 :             prev_lsn_end = lsn_range.end;
     869          374 : 
     870          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     871            0 :                 info!(
     872            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     873            0 :                     l0_deltas_total = level0_deltas.len(),
     874            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     875              :                     delta_size_limit
     876              :                 );
     877            0 :                 fully_compacted = false;
     878            0 : 
     879            0 :                 // Proceed with compaction, but only a subset of L0s
     880            0 :                 break;
     881          374 :             }
     882              :         }
     883           28 :         let lsn_range = Range {
     884           28 :             start: deltas_to_compact
     885           28 :                 .first()
     886           28 :                 .unwrap()
     887           28 :                 .layer_desc()
     888           28 :                 .lsn_range
     889           28 :                 .start,
     890           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     891           28 :         };
     892           28 : 
     893           28 :         info!(
     894            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     895            0 :             lsn_range.start,
     896            0 :             lsn_range.end,
     897            0 :             deltas_to_compact.len(),
     898            0 :             level0_deltas.len()
     899              :         );
     900              : 
     901          402 :         for l in deltas_to_compact.iter() {
     902          402 :             info!("compact includes {l}");
     903              :         }
     904              : 
     905              :         // We don't need the original list of layers anymore. Drop it so that
     906              :         // we don't accidentally use it later in the function.
     907           28 :         drop(level0_deltas);
     908           28 : 
     909           28 :         stats.read_lock_held_prerequisites_micros = stats
     910           28 :             .read_lock_held_spawn_blocking_startup_micros
     911           28 :             .till_now();
     912              : 
     913              :         // TODO: replace with streaming k-merge
     914           28 :         let all_keys = {
     915           28 :             let mut all_keys = Vec::new();
     916          402 :             for l in deltas_to_compact.iter() {
     917          402 :                 if self.cancel.is_cancelled() {
     918            0 :                     return Err(CompactionError::ShuttingDown);
     919          402 :                 }
     920          402 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     921          402 :                 let keys = delta
     922          402 :                     .index_entries(ctx)
     923          402 :                     .await
     924          402 :                     .map_err(CompactionError::Other)?;
     925          402 :                 all_keys.extend(keys);
     926              :             }
     927              :             // The current stdlib sorting implementation is designed in a way where it is
     928              :             // particularly fast where the slice is made up of sorted sub-ranges.
     929      4423784 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     930           28 :             all_keys
     931           28 :         };
     932           28 : 
     933           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     934              : 
     935              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     936              :         //
     937              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     938              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     939              :         // cover the hole, but actually don't contain any WAL records for that key range.
     940              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     941              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     942              :         //
     943              :         // The algorithm chooses holes as follows.
     944              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     945              :         // - Filter: min threshold on range length
     946              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     947              :         //
     948              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     949              :         #[derive(PartialEq, Eq)]
     950              :         struct Hole {
     951              :             key_range: Range<Key>,
     952              :             coverage_size: usize,
     953              :         }
     954           28 :         let holes: Vec<Hole> = {
     955              :             use std::cmp::Ordering;
     956              :             impl Ord for Hole {
     957            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     958            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     959            0 :                 }
     960              :             }
     961              :             impl PartialOrd for Hole {
     962            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     963            0 :                     Some(self.cmp(other))
     964            0 :                 }
     965              :             }
     966           28 :             let max_holes = deltas_to_compact.len();
     967           28 :             let last_record_lsn = self.get_last_record_lsn();
     968           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     969           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     970           28 :                                             // min-heap (reserve space for one more element added before eviction)
     971           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     972           28 :             let mut prev: Option<Key> = None;
     973              : 
     974      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     975      2064038 :                 if let Some(prev_key) = prev {
     976              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     977              :                     // compaction is the gap between data key and metadata keys.
     978      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     979            0 :                         && !Key::is_metadata_key(&prev_key)
     980              :                     {
     981            0 :                         let key_range = prev_key..next_key;
     982            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     983            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     984            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     985            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     986            0 :                         let coverage_size =
     987            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     988            0 :                         if coverage_size >= min_hole_coverage_size {
     989            0 :                             heap.push(Hole {
     990            0 :                                 key_range,
     991            0 :                                 coverage_size,
     992            0 :                             });
     993            0 :                             if heap.len() > max_holes {
     994            0 :                                 heap.pop(); // remove smallest hole
     995            0 :                             }
     996            0 :                         }
     997      2064010 :                     }
     998           28 :                 }
     999      2064038 :                 prev = Some(next_key.next());
    1000              :             }
    1001           28 :             let mut holes = heap.into_vec();
    1002           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1003           28 :             holes
    1004           28 :         };
    1005           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1006           28 :         drop_rlock(guard);
    1007           28 : 
    1008           28 :         if self.cancel.is_cancelled() {
    1009            0 :             return Err(CompactionError::ShuttingDown);
    1010           28 :         }
    1011           28 : 
    1012           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1013              : 
    1014              :         // This iterator walks through all key-value pairs from all the layers
    1015              :         // we're compacting, in key, LSN order.
    1016              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1017              :         // then the Value::Image is ordered before Value::WalRecord.
    1018           28 :         let mut all_values_iter = {
    1019           28 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1020          402 :             for l in deltas_to_compact.iter() {
    1021          402 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1022          402 :                 deltas.push(l);
    1023              :             }
    1024           28 :             MergeIterator::create(&deltas, &[], ctx)
    1025           28 :         };
    1026           28 : 
    1027           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1028           28 :         let mut all_keys_iter = all_keys
    1029           28 :             .iter()
    1030      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1031      2064010 :             .coalesce(|mut prev, cur| {
    1032      2064010 :                 // Coalesce keys that belong to the same key pair.
    1033      2064010 :                 // This ensures that compaction doesn't put them
    1034      2064010 :                 // into different layer files.
    1035      2064010 :                 // Still limit this by the target file size,
    1036      2064010 :                 // so that we keep the size of the files in
    1037      2064010 :                 // check.
    1038      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1039        40038 :                     prev.2 += cur.2;
    1040        40038 :                     Ok(prev)
    1041              :                 } else {
    1042      2023972 :                     Err((prev, cur))
    1043              :                 }
    1044      2064010 :             });
    1045           28 : 
    1046           28 :         // Merge the contents of all the input delta layers into a new set
    1047           28 :         // of delta layers, based on the current partitioning.
    1048           28 :         //
    1049           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1050           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1051           28 :         // would be too large. In that case, we also split on the LSN dimension.
    1052           28 :         //
    1053           28 :         // LSN
    1054           28 :         //  ^
    1055           28 :         //  |
    1056           28 :         //  | +-----------+            +--+--+--+--+
    1057           28 :         //  | |           |            |  |  |  |  |
    1058           28 :         //  | +-----------+            |  |  |  |  |
    1059           28 :         //  | |           |            |  |  |  |  |
    1060           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1061           28 :         //  | |           |            |  |  |  |  |
    1062           28 :         //  | +-----------+            |  |  |  |  |
    1063           28 :         //  | |           |            |  |  |  |  |
    1064           28 :         //  | +-----------+            +--+--+--+--+
    1065           28 :         //  |
    1066           28 :         //  +--------------> key
    1067           28 :         //
    1068           28 :         //
    1069           28 :         // If one key (X) has a lot of page versions:
    1070           28 :         //
    1071           28 :         // LSN
    1072           28 :         //  ^
    1073           28 :         //  |                                 (X)
    1074           28 :         //  | +-----------+            +--+--+--+--+
    1075           28 :         //  | |           |            |  |  |  |  |
    1076           28 :         //  | +-----------+            |  |  +--+  |
    1077           28 :         //  | |           |            |  |  |  |  |
    1078           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1079           28 :         //  | |           |            |  |  +--+  |
    1080           28 :         //  | +-----------+            |  |  |  |  |
    1081           28 :         //  | |           |            |  |  |  |  |
    1082           28 :         //  | +-----------+            +--+--+--+--+
    1083           28 :         //  |
    1084           28 :         //  +--------------> key
    1085           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1086           28 :         // based on the partitioning.
    1087           28 :         //
    1088           28 :         // TODO: we should also opportunistically materialize and
    1089           28 :         // garbage collect what we can.
    1090           28 :         let mut new_layers = Vec::new();
    1091           28 :         let mut prev_key: Option<Key> = None;
    1092           28 :         let mut writer: Option<DeltaLayerWriter> = None;
    1093           28 :         let mut key_values_total_size = 0u64;
    1094           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1095           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1096           28 :         let mut next_hole = 0; // index of next hole in holes vector
    1097           28 : 
    1098           28 :         let mut keys = 0;
    1099              : 
    1100      2064066 :         while let Some((key, lsn, value)) = all_values_iter
    1101      2064066 :             .next()
    1102      2064066 :             .await
    1103      2064066 :             .map_err(CompactionError::Other)?
    1104              :         {
    1105      2064038 :             keys += 1;
    1106      2064038 : 
    1107      2064038 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1108              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1109              :                 // shuffling an order of million keys per layer, this means we'll check it
    1110              :                 // around tens of times per layer.
    1111            0 :                 return Err(CompactionError::ShuttingDown);
    1112      2064038 :             }
    1113      2064038 : 
    1114      2064038 :             let same_key = prev_key == Some(key);
    1115      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1116      2064038 :             if !same_key || lsn == dup_end_lsn {
    1117      2024000 :                 let mut next_key_size = 0u64;
    1118      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1119      2024000 :                 dup_start_lsn = Lsn::INVALID;
    1120      2024000 :                 if !same_key {
    1121      2024000 :                     dup_end_lsn = Lsn::INVALID;
    1122      2024000 :                 }
    1123              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1124      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1125      2024000 :                     next_key_size = next_size;
    1126      2024000 :                     if key != next_key {
    1127      2023972 :                         if dup_end_lsn.is_valid() {
    1128            0 :                             // We are writting segment with duplicates:
    1129            0 :                             // place all remaining values of this key in separate segment
    1130            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1131            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1132      2023972 :                         }
    1133      2023972 :                         break;
    1134           28 :                     }
    1135           28 :                     key_values_total_size += next_size;
    1136           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1137           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1138           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1139              :                         // Split key between multiple layers: such layer can contain only single key
    1140            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1141            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1142              :                         } else {
    1143            0 :                             lsn // start with the first LSN for this key
    1144              :                         };
    1145            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1146            0 :                         break;
    1147           28 :                     }
    1148              :                 }
    1149              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1150      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1151            0 :                     dup_start_lsn = dup_end_lsn;
    1152            0 :                     dup_end_lsn = lsn_range.end;
    1153      2024000 :                 }
    1154      2024000 :                 if writer.is_some() {
    1155      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1156      2023972 :                     let contains_hole =
    1157      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1158              :                     // check if key cause layer overflow or contains hole...
    1159      2023972 :                     if is_dup_layer
    1160      2023972 :                         || dup_end_lsn.is_valid()
    1161      2023972 :                         || written_size + key_values_total_size > target_file_size
    1162      2023692 :                         || contains_hole
    1163              :                     {
    1164              :                         // ... if so, flush previous layer and prepare to write new one
    1165          280 :                         let (desc, path) = writer
    1166          280 :                             .take()
    1167          280 :                             .unwrap()
    1168          280 :                             .finish(prev_key.unwrap().next(), ctx)
    1169          280 :                             .await
    1170          280 :                             .map_err(CompactionError::Other)?;
    1171          280 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1172          280 :                             .map_err(CompactionError::Other)?;
    1173              : 
    1174          280 :                         new_layers.push(new_delta);
    1175          280 :                         writer = None;
    1176          280 : 
    1177          280 :                         if contains_hole {
    1178            0 :                             // skip hole
    1179            0 :                             next_hole += 1;
    1180          280 :                         }
    1181      2023692 :                     }
    1182           28 :                 }
    1183              :                 // Remember size of key value because at next iteration we will access next item
    1184      2024000 :                 key_values_total_size = next_key_size;
    1185        40038 :             }
    1186      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1187            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1188            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1189            0 :                 )))
    1190      2064038 :             });
    1191              : 
    1192      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1193      2064038 :                 if writer.is_none() {
    1194          308 :                     if self.cancel.is_cancelled() {
    1195              :                         // to be somewhat responsive to cancellation, check for each new layer
    1196            0 :                         return Err(CompactionError::ShuttingDown);
    1197          308 :                     }
    1198              :                     // Create writer if not initiaized yet
    1199          308 :                     writer = Some(
    1200              :                         DeltaLayerWriter::new(
    1201          308 :                             self.conf,
    1202          308 :                             self.timeline_id,
    1203          308 :                             self.tenant_shard_id,
    1204          308 :                             key,
    1205          308 :                             if dup_end_lsn.is_valid() {
    1206              :                                 // this is a layer containing slice of values of the same key
    1207            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1208            0 :                                 dup_start_lsn..dup_end_lsn
    1209              :                             } else {
    1210          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1211          308 :                                 lsn_range.clone()
    1212              :                             },
    1213          308 :                             ctx,
    1214          308 :                         )
    1215          308 :                         .await
    1216          308 :                         .map_err(CompactionError::Other)?,
    1217              :                     );
    1218              : 
    1219          308 :                     keys = 0;
    1220      2063730 :                 }
    1221              : 
    1222      2064038 :                 writer
    1223      2064038 :                     .as_mut()
    1224      2064038 :                     .unwrap()
    1225      2064038 :                     .put_value(key, lsn, value, ctx)
    1226      2064038 :                     .await
    1227      2064038 :                     .map_err(CompactionError::Other)?;
    1228              :             } else {
    1229            0 :                 let shard = self.shard_identity.shard_index();
    1230            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1231            0 :                 if cfg!(debug_assertions) {
    1232            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    1233            0 :                 }
    1234            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1235              :             }
    1236              : 
    1237      2064038 :             if !new_layers.is_empty() {
    1238        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1239      2044252 :             }
    1240              : 
    1241      2064038 :             prev_key = Some(key);
    1242              :         }
    1243           28 :         if let Some(writer) = writer {
    1244           28 :             let (desc, path) = writer
    1245           28 :                 .finish(prev_key.unwrap().next(), ctx)
    1246           28 :                 .await
    1247           28 :                 .map_err(CompactionError::Other)?;
    1248           28 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1249           28 :                 .map_err(CompactionError::Other)?;
    1250           28 :             new_layers.push(new_delta);
    1251            0 :         }
    1252              : 
    1253              :         // Sync layers
    1254           28 :         if !new_layers.is_empty() {
    1255              :             // Print a warning if the created layer is larger than double the target size
    1256              :             // Add two pages for potential overhead. This should in theory be already
    1257              :             // accounted for in the target calculation, but for very small targets,
    1258              :             // we still might easily hit the limit otherwise.
    1259           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1260          308 :             for layer in new_layers.iter() {
    1261          308 :                 if layer.layer_desc().file_size > warn_limit {
    1262            0 :                     warn!(
    1263              :                         %layer,
    1264            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1265              :                     );
    1266          308 :                 }
    1267              :             }
    1268              : 
    1269              :             // The writer.finish() above already did the fsync of the inodes.
    1270              :             // We just need to fsync the directory in which these inodes are linked,
    1271              :             // which we know to be the timeline directory.
    1272              :             //
    1273              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1274              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1275              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1276           28 :             let timeline_dir = VirtualFile::open(
    1277           28 :                 &self
    1278           28 :                     .conf
    1279           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1280           28 :                 ctx,
    1281           28 :             )
    1282           28 :             .await
    1283           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1284           28 :             timeline_dir
    1285           28 :                 .sync_all()
    1286           28 :                 .await
    1287           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1288            0 :         }
    1289              : 
    1290           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1291           28 :         stats.new_deltas_count = Some(new_layers.len());
    1292          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1293           28 : 
    1294           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1295           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1296              :         {
    1297           28 :             Ok(stats_json) => {
    1298           28 :                 info!(
    1299            0 :                     stats_json = stats_json.as_str(),
    1300            0 :                     "compact_level0_phase1 stats available"
    1301              :                 )
    1302              :             }
    1303            0 :             Err(e) => {
    1304            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1305              :             }
    1306              :         }
    1307              : 
    1308              :         // Without this, rustc complains about deltas_to_compact still
    1309              :         // being borrowed when we `.into_iter()` below.
    1310           28 :         drop(all_values_iter);
    1311           28 : 
    1312           28 :         Ok(CompactLevel0Phase1Result {
    1313           28 :             new_layers,
    1314           28 :             deltas_to_compact: deltas_to_compact
    1315           28 :                 .into_iter()
    1316          402 :                 .map(|x| x.drop_eviction_guard())
    1317           28 :                 .collect::<Vec<_>>(),
    1318           28 :             fully_compacted,
    1319           28 :         })
    1320          364 :     }
    1321              : }
    1322              : 
    1323              : #[derive(Default)]
    1324              : struct CompactLevel0Phase1Result {
    1325              :     new_layers: Vec<ResidentLayer>,
    1326              :     deltas_to_compact: Vec<Layer>,
    1327              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1328              :     // L0 compaction size limit.
    1329              :     fully_compacted: bool,
    1330              : }
    1331              : 
    1332              : #[derive(Default)]
    1333              : struct CompactLevel0Phase1StatsBuilder {
    1334              :     version: Option<u64>,
    1335              :     tenant_id: Option<TenantShardId>,
    1336              :     timeline_id: Option<TimelineId>,
    1337              :     read_lock_acquisition_micros: DurationRecorder,
    1338              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1339              :     read_lock_held_key_sort_micros: DurationRecorder,
    1340              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1341              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1342              :     read_lock_drop_micros: DurationRecorder,
    1343              :     write_layer_files_micros: DurationRecorder,
    1344              :     level0_deltas_count: Option<usize>,
    1345              :     new_deltas_count: Option<usize>,
    1346              :     new_deltas_size: Option<u64>,
    1347              : }
    1348              : 
    1349              : #[derive(serde::Serialize)]
    1350              : struct CompactLevel0Phase1Stats {
    1351              :     version: u64,
    1352              :     tenant_id: TenantShardId,
    1353              :     timeline_id: TimelineId,
    1354              :     read_lock_acquisition_micros: RecordedDuration,
    1355              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1356              :     read_lock_held_key_sort_micros: RecordedDuration,
    1357              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1358              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1359              :     read_lock_drop_micros: RecordedDuration,
    1360              :     write_layer_files_micros: RecordedDuration,
    1361              :     level0_deltas_count: usize,
    1362              :     new_deltas_count: usize,
    1363              :     new_deltas_size: u64,
    1364              : }
    1365              : 
    1366              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1367              :     type Error = anyhow::Error;
    1368              : 
    1369           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1370           28 :         Ok(Self {
    1371           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1372           28 :             tenant_id: value
    1373           28 :                 .tenant_id
    1374           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1375           28 :             timeline_id: value
    1376           28 :                 .timeline_id
    1377           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1378           28 :             read_lock_acquisition_micros: value
    1379           28 :                 .read_lock_acquisition_micros
    1380           28 :                 .into_recorded()
    1381           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1382           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1383           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1384           28 :                 .into_recorded()
    1385           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1386           28 :             read_lock_held_key_sort_micros: value
    1387           28 :                 .read_lock_held_key_sort_micros
    1388           28 :                 .into_recorded()
    1389           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1390           28 :             read_lock_held_prerequisites_micros: value
    1391           28 :                 .read_lock_held_prerequisites_micros
    1392           28 :                 .into_recorded()
    1393           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1394           28 :             read_lock_held_compute_holes_micros: value
    1395           28 :                 .read_lock_held_compute_holes_micros
    1396           28 :                 .into_recorded()
    1397           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1398           28 :             read_lock_drop_micros: value
    1399           28 :                 .read_lock_drop_micros
    1400           28 :                 .into_recorded()
    1401           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1402           28 :             write_layer_files_micros: value
    1403           28 :                 .write_layer_files_micros
    1404           28 :                 .into_recorded()
    1405           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1406           28 :             level0_deltas_count: value
    1407           28 :                 .level0_deltas_count
    1408           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1409           28 :             new_deltas_count: value
    1410           28 :                 .new_deltas_count
    1411           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1412           28 :             new_deltas_size: value
    1413           28 :                 .new_deltas_size
    1414           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1415              :         })
    1416           28 :     }
    1417              : }
    1418              : 
    1419              : impl Timeline {
    1420              :     /// Entry point for new tiered compaction algorithm.
    1421              :     ///
    1422              :     /// All the real work is in the implementation in the pageserver_compaction
    1423              :     /// crate. The code here would apply to any algorithm implemented by the
    1424              :     /// same interface, but tiered is the only one at the moment.
    1425              :     ///
    1426              :     /// TODO: cancellation
    1427            0 :     pub(crate) async fn compact_tiered(
    1428            0 :         self: &Arc<Self>,
    1429            0 :         _cancel: &CancellationToken,
    1430            0 :         ctx: &RequestContext,
    1431            0 :     ) -> Result<(), CompactionError> {
    1432            0 :         let fanout = self.get_compaction_threshold() as u64;
    1433            0 :         let target_file_size = self.get_checkpoint_distance();
    1434              : 
    1435              :         // Find the top of the historical layers
    1436            0 :         let end_lsn = {
    1437            0 :             let guard = self.layers.read().await;
    1438            0 :             let layers = guard.layer_map()?;
    1439              : 
    1440            0 :             let l0_deltas = layers.level0_deltas();
    1441            0 : 
    1442            0 :             // As an optimization, if we find that there are too few L0 layers,
    1443            0 :             // bail out early. We know that the compaction algorithm would do
    1444            0 :             // nothing in that case.
    1445            0 :             if l0_deltas.len() < fanout as usize {
    1446              :                 // doesn't need compacting
    1447            0 :                 return Ok(());
    1448            0 :             }
    1449            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1450            0 :         };
    1451            0 : 
    1452            0 :         // Is the timeline being deleted?
    1453            0 :         if self.is_stopping() {
    1454            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1455            0 :             return Err(CompactionError::ShuttingDown);
    1456            0 :         }
    1457              : 
    1458            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1459              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1460            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1461            0 : 
    1462            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1463            0 :             &mut adaptor,
    1464            0 :             end_lsn,
    1465            0 :             target_file_size,
    1466            0 :             fanout,
    1467            0 :             ctx,
    1468            0 :         )
    1469            0 :         .await
    1470              :         // TODO: compact_tiered needs to return CompactionError
    1471            0 :         .map_err(CompactionError::Other)?;
    1472              : 
    1473            0 :         adaptor.flush_updates().await?;
    1474            0 :         Ok(())
    1475            0 :     }
    1476              : 
    1477              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1478              :     ///
    1479              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1480              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1481              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1482              :     ///
    1483              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1484              :     ///
    1485              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1486              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1487              :     ///
    1488              :     /// The function will produce:
    1489              :     ///
    1490              :     /// ```plain
    1491              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1492              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1493              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1494              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1495              :     /// ```
    1496              :     ///
    1497              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1498          630 :     pub(crate) async fn generate_key_retention(
    1499          630 :         self: &Arc<Timeline>,
    1500          630 :         key: Key,
    1501          630 :         full_history: &[(Key, Lsn, Value)],
    1502          630 :         horizon: Lsn,
    1503          630 :         retain_lsn_below_horizon: &[Lsn],
    1504          630 :         delta_threshold_cnt: usize,
    1505          630 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1506          630 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1507          630 :         // Pre-checks for the invariants
    1508          630 :         if cfg!(debug_assertions) {
    1509         1530 :             for (log_key, _, _) in full_history {
    1510          900 :                 assert_eq!(log_key, &key, "mismatched key");
    1511              :             }
    1512          630 :             for i in 1..full_history.len() {
    1513          270 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1514          270 :                 if full_history[i - 1].1 == full_history[i].1 {
    1515            0 :                     assert!(
    1516            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1517            0 :                         "unordered delta/image, or duplicated delta"
    1518              :                     );
    1519          270 :                 }
    1520              :             }
    1521              :             // There was an assertion for no base image that checks if the first
    1522              :             // record in the history is `will_init` before, but it was removed.
    1523              :             // This is explained in the test cases for generate_key_retention.
    1524              :             // Search "incomplete history" for more information.
    1525         1410 :             for lsn in retain_lsn_below_horizon {
    1526          780 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1527              :             }
    1528          630 :             for i in 1..retain_lsn_below_horizon.len() {
    1529          356 :                 assert!(
    1530          356 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1531            0 :                     "unordered LSN"
    1532              :                 );
    1533              :             }
    1534            0 :         }
    1535          630 :         let has_ancestor = base_img_from_ancestor.is_some();
    1536              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1537              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1538          630 :         let (mut split_history, lsn_split_points) = {
    1539          630 :             let mut split_history = Vec::new();
    1540          630 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1541          630 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1542         1410 :             for lsn in retain_lsn_below_horizon {
    1543          780 :                 lsn_split_points.push(*lsn);
    1544          780 :             }
    1545          630 :             lsn_split_points.push(horizon);
    1546          630 :             let mut current_idx = 0;
    1547         1530 :             for item @ (_, lsn, _) in full_history {
    1548         1144 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1549          244 :                     current_idx += 1;
    1550          244 :                 }
    1551          900 :                 split_history[current_idx].push(item);
    1552              :             }
    1553          630 :             (split_history, lsn_split_points)
    1554              :         };
    1555              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1556         2670 :         for split_for_lsn in &mut split_history {
    1557         2040 :             let mut prev_lsn = None;
    1558         2040 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1559         2040 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1560          900 :                 if let Some(prev_lsn) = &prev_lsn {
    1561          118 :                     if *prev_lsn == lsn {
    1562              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1563              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1564              :                         // drop this delta and keep the image.
    1565              :                         //
    1566              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1567              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1568              :                         // dropped.
    1569              :                         //
    1570              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1571              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1572            0 :                         continue;
    1573          118 :                     }
    1574          782 :                 }
    1575          900 :                 prev_lsn = Some(lsn);
    1576          900 :                 new_split_for_lsn.push(record);
    1577              :             }
    1578         2040 :             *split_for_lsn = new_split_for_lsn;
    1579              :         }
    1580              :         // Step 3: generate images when necessary
    1581          630 :         let mut retention = Vec::with_capacity(split_history.len());
    1582          630 :         let mut records_since_last_image = 0;
    1583          630 :         let batch_cnt = split_history.len();
    1584          630 :         assert!(
    1585          630 :             batch_cnt >= 2,
    1586            0 :             "should have at least below + above horizon batches"
    1587              :         );
    1588          630 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1589          630 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1590           42 :             replay_history.push((key, lsn, Value::Image(img)));
    1591          588 :         }
    1592              : 
    1593              :         /// Generate debug information for the replay history
    1594            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1595              :             use std::fmt::Write;
    1596            0 :             let mut output = String::new();
    1597            0 :             if let Some((key, _, _)) = replay_history.first() {
    1598            0 :                 write!(output, "key={} ", key).unwrap();
    1599            0 :                 let mut cnt = 0;
    1600            0 :                 for (_, lsn, val) in replay_history {
    1601            0 :                     if val.is_image() {
    1602            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1603            0 :                     } else if val.will_init() {
    1604            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1605            0 :                     } else {
    1606            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1607            0 :                     }
    1608            0 :                     cnt += 1;
    1609            0 :                     if cnt >= 128 {
    1610            0 :                         write!(output, "... and more").unwrap();
    1611            0 :                         break;
    1612            0 :                     }
    1613              :                 }
    1614            0 :             } else {
    1615            0 :                 write!(output, "<no history>").unwrap();
    1616            0 :             }
    1617            0 :             output
    1618            0 :         }
    1619              : 
    1620            0 :         fn generate_debug_trace(
    1621            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1622            0 :             full_history: &[(Key, Lsn, Value)],
    1623            0 :             lsns: &[Lsn],
    1624            0 :             horizon: Lsn,
    1625            0 :         ) -> String {
    1626              :             use std::fmt::Write;
    1627            0 :             let mut output = String::new();
    1628            0 :             if let Some(replay_history) = replay_history {
    1629            0 :                 writeln!(
    1630            0 :                     output,
    1631            0 :                     "replay_history: {}",
    1632            0 :                     generate_history_trace(replay_history)
    1633            0 :                 )
    1634            0 :                 .unwrap();
    1635            0 :             } else {
    1636            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1637            0 :             }
    1638            0 :             writeln!(
    1639            0 :                 output,
    1640            0 :                 "full_history: {}",
    1641            0 :                 generate_history_trace(full_history)
    1642            0 :             )
    1643            0 :             .unwrap();
    1644            0 :             writeln!(
    1645            0 :                 output,
    1646            0 :                 "when processing: [{}] horizon={}",
    1647            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1648            0 :                 horizon
    1649            0 :             )
    1650            0 :             .unwrap();
    1651            0 :             output
    1652            0 :         }
    1653              : 
    1654         2040 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1655              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1656         2040 :             records_since_last_image += split_for_lsn.len();
    1657         2040 :             let generate_image = if i == 0 && !has_ancestor {
    1658              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1659          588 :                 true
    1660         1452 :             } else if i == batch_cnt - 1 {
    1661              :                 // Do not generate images for the last batch (above horizon)
    1662          630 :                 false
    1663          822 :             } else if records_since_last_image >= delta_threshold_cnt {
    1664              :                 // Generate images when there are too many records
    1665            6 :                 true
    1666              :             } else {
    1667          816 :                 false
    1668              :             };
    1669         2040 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1670              :             // Only retain the items after the last image record
    1671         2514 :             for idx in (0..replay_history.len()).rev() {
    1672         2514 :                 if replay_history[idx].2.will_init() {
    1673         2040 :                     replay_history = replay_history[idx..].to_vec();
    1674         2040 :                     break;
    1675          474 :                 }
    1676              :             }
    1677         2040 :             if let Some((_, _, val)) = replay_history.first() {
    1678         2040 :                 if !val.will_init() {
    1679            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1680            0 :                         || {
    1681            0 :                             generate_debug_trace(
    1682            0 :                                 Some(&replay_history),
    1683            0 :                                 full_history,
    1684            0 :                                 retain_lsn_below_horizon,
    1685            0 :                                 horizon,
    1686            0 :                             )
    1687            0 :                         },
    1688            0 :                     );
    1689         2040 :                 }
    1690            0 :             }
    1691         2040 :             if generate_image && records_since_last_image > 0 {
    1692          594 :                 records_since_last_image = 0;
    1693          594 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1694          594 :                     Some(replay_history.clone())
    1695              :                 } else {
    1696            0 :                     None
    1697              :                 };
    1698          594 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1699          594 :                 let history = std::mem::take(&mut replay_history);
    1700          594 :                 let mut img = None;
    1701          594 :                 let mut records = Vec::with_capacity(history.len());
    1702          594 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1703          572 :                     img = Some((*lsn, val.clone()));
    1704          572 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1705           34 :                         let Value::WalRecord(rec) = val else {
    1706            0 :                             return Err(anyhow::anyhow!(
    1707            0 :                                 "invalid record, first record is image, expect walrecords"
    1708            0 :                             ))
    1709            0 :                             .with_context(|| {
    1710            0 :                                 generate_debug_trace(
    1711            0 :                                     replay_history_for_debug_ref,
    1712            0 :                                     full_history,
    1713            0 :                                     retain_lsn_below_horizon,
    1714            0 :                                     horizon,
    1715            0 :                                 )
    1716            0 :                             });
    1717              :                         };
    1718           34 :                         records.push((lsn, rec));
    1719              :                     }
    1720              :                 } else {
    1721           36 :                     for (_, lsn, val) in history.into_iter() {
    1722           36 :                         let Value::WalRecord(rec) = val else {
    1723            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1724            0 :                                 .with_context(|| generate_debug_trace(
    1725            0 :                                     replay_history_for_debug_ref,
    1726            0 :                                     full_history,
    1727            0 :                                     retain_lsn_below_horizon,
    1728            0 :                                     horizon,
    1729            0 :                                 ));
    1730              :                         };
    1731           36 :                         records.push((lsn, rec));
    1732              :                     }
    1733              :                 }
    1734          594 :                 records.reverse();
    1735          594 :                 let state = ValueReconstructState { img, records };
    1736          594 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1737          594 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1738          594 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1739          594 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1740         1446 :             } else {
    1741         1446 :                 let deltas = split_for_lsn
    1742         1446 :                     .iter()
    1743         1446 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1744         1446 :                     .collect_vec();
    1745         1446 :                 retention.push(deltas);
    1746         1446 :             }
    1747              :         }
    1748          630 :         let mut result = Vec::with_capacity(retention.len());
    1749          630 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1750         2040 :         for (idx, logs) in retention.into_iter().enumerate() {
    1751         2040 :             if idx == lsn_split_points.len() {
    1752          630 :                 return Ok(KeyHistoryRetention {
    1753          630 :                     below_horizon: result,
    1754          630 :                     above_horizon: KeyLogAtLsn(logs),
    1755          630 :                 });
    1756         1410 :             } else {
    1757         1410 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1758         1410 :             }
    1759              :         }
    1760            0 :         unreachable!("key retention is empty")
    1761          630 :     }
    1762              : 
    1763              :     /// Check how much space is left on the disk
    1764           52 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    1765           52 :         let tenants_dir = self.conf.tenants_path();
    1766              : 
    1767           52 :         let stat = Statvfs::get(&tenants_dir, None)
    1768           52 :             .context("statvfs failed, presumably directory got unlinked")?;
    1769              : 
    1770           52 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    1771           52 : 
    1772           52 :         Ok(avail_bytes)
    1773           52 :     }
    1774              : 
    1775              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    1776              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    1777              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    1778              :     /// compaction.
    1779           52 :     async fn check_compaction_space(
    1780           52 :         self: &Arc<Self>,
    1781           52 :         layer_selection: &[Layer],
    1782           52 :     ) -> anyhow::Result<()> {
    1783           52 :         let available_space = self.check_available_space().await?;
    1784           52 :         let mut remote_layer_size = 0;
    1785           52 :         let mut all_layer_size = 0;
    1786          204 :         for layer in layer_selection {
    1787          152 :             let needs_download = layer.needs_download().await?;
    1788          152 :             if needs_download.is_some() {
    1789            0 :                 remote_layer_size += layer.layer_desc().file_size;
    1790          152 :             }
    1791          152 :             all_layer_size += layer.layer_desc().file_size;
    1792              :         }
    1793           52 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    1794           52 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    1795              :         {
    1796            0 :             return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    1797            0 :                 available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
    1798           52 :         }
    1799           52 :         Ok(())
    1800           52 :     }
    1801              : 
    1802              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    1803              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    1804              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    1805              :     /// here.
    1806           54 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    1807           54 :         let gc_cutoff_lsn = {
    1808           54 :             let gc_info = self.gc_info.read().unwrap();
    1809           54 :             gc_info.min_cutoff()
    1810           54 :         };
    1811           54 : 
    1812           54 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    1813           54 :         // let watermark = watermark.min(self.standby_horizon.load());
    1814           54 : 
    1815           54 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    1816           54 :         // them when computing the watermark.
    1817           54 :         gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
    1818           54 :     }
    1819              : 
    1820              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    1821              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    1822              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    1823              :     /// like a full compaction of the specified keyspace.
    1824            0 :     pub(crate) async fn gc_compaction_split_jobs(
    1825            0 :         self: &Arc<Self>,
    1826            0 :         job: GcCompactJob,
    1827            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    1828            0 :     ) -> anyhow::Result<Vec<GcCompactJob>> {
    1829            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    1830            0 :             job.compact_lsn_range.end
    1831              :         } else {
    1832            0 :             self.get_gc_compaction_watermark()
    1833              :         };
    1834              : 
    1835              :         // Split compaction job to about 4GB each
    1836              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    1837            0 :         let sub_compaction_max_job_size_mb =
    1838            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    1839            0 : 
    1840            0 :         let mut compact_jobs = Vec::new();
    1841              :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    1842              :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    1843            0 :         let ((dense_ks, sparse_ks), _) = {
    1844            0 :             let Ok(partition) = self.partitioning.try_lock() else {
    1845            0 :                 bail!("failed to acquire partition lock during gc-compaction");
    1846              :             };
    1847            0 :             partition.clone()
    1848              :         };
    1849              :         // Truncate the key range to be within user specified compaction range.
    1850            0 :         fn truncate_to(
    1851            0 :             source_start: &Key,
    1852            0 :             source_end: &Key,
    1853            0 :             target_start: &Key,
    1854            0 :             target_end: &Key,
    1855            0 :         ) -> Option<(Key, Key)> {
    1856            0 :             let start = source_start.max(target_start);
    1857            0 :             let end = source_end.min(target_end);
    1858            0 :             if start < end {
    1859            0 :                 Some((*start, *end))
    1860              :             } else {
    1861            0 :                 None
    1862              :             }
    1863            0 :         }
    1864            0 :         let mut split_key_ranges = Vec::new();
    1865            0 :         let ranges = dense_ks
    1866            0 :             .parts
    1867            0 :             .iter()
    1868            0 :             .map(|partition| partition.ranges.iter())
    1869            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    1870            0 :             .flatten()
    1871            0 :             .cloned()
    1872            0 :             .collect_vec();
    1873            0 :         for range in ranges.iter() {
    1874            0 :             let Some((start, end)) = truncate_to(
    1875            0 :                 &range.start,
    1876            0 :                 &range.end,
    1877            0 :                 &job.compact_key_range.start,
    1878            0 :                 &job.compact_key_range.end,
    1879            0 :             ) else {
    1880            0 :                 continue;
    1881              :             };
    1882            0 :             split_key_ranges.push((start, end));
    1883              :         }
    1884            0 :         split_key_ranges.sort();
    1885            0 :         let guard = self.layers.read().await;
    1886            0 :         let layer_map = guard.layer_map()?;
    1887            0 :         let mut current_start = None;
    1888            0 :         let ranges_num = split_key_ranges.len();
    1889            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    1890            0 :             if current_start.is_none() {
    1891            0 :                 current_start = Some(start);
    1892            0 :             }
    1893            0 :             let start = current_start.unwrap();
    1894            0 :             if start >= end {
    1895              :                 // We have already processed this partition.
    1896            0 :                 continue;
    1897            0 :             }
    1898            0 :             let res = layer_map.range_search(start..end, compact_below_lsn);
    1899            0 :             let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
    1900            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    1901              :                 // Try to extend the compaction range so that we include at least one full layer file.
    1902            0 :                 let extended_end = res
    1903            0 :                     .found
    1904            0 :                     .keys()
    1905            0 :                     .map(|layer| layer.layer.key_range.end)
    1906            0 :                     .min();
    1907              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    1908              :                 // In this case, we simply use the specified key range end.
    1909            0 :                 let end = if let Some(extended_end) = extended_end {
    1910            0 :                     extended_end.max(end)
    1911              :                 } else {
    1912            0 :                     end
    1913              :                 };
    1914            0 :                 info!(
    1915            0 :                     "splitting compaction job: {}..{}, estimated_size={}",
    1916              :                     start, end, total_size
    1917              :                 );
    1918            0 :                 compact_jobs.push(GcCompactJob {
    1919            0 :                     dry_run: job.dry_run,
    1920            0 :                     compact_key_range: start..end,
    1921            0 :                     compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    1922            0 :                 });
    1923            0 :                 current_start = Some(end);
    1924            0 :             }
    1925              :         }
    1926            0 :         drop(guard);
    1927            0 :         Ok(compact_jobs)
    1928            0 :     }
    1929              : 
    1930              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1931              :     ///
    1932              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1933              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1934              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1935              :     /// and create delta layers with all deltas >= gc horizon.
    1936              :     ///
    1937              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    1938              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    1939              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    1940              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    1941              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    1942              :     /// part of the range.
    1943              :     ///
    1944              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    1945              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    1946           54 :     pub(crate) async fn compact_with_gc(
    1947           54 :         self: &Arc<Self>,
    1948           54 :         cancel: &CancellationToken,
    1949           54 :         options: CompactOptions,
    1950           54 :         ctx: &RequestContext,
    1951           54 :     ) -> anyhow::Result<()> {
    1952           54 :         let sub_compaction = options.sub_compaction;
    1953           54 :         let job = GcCompactJob::from_compact_options(options.clone());
    1954           54 :         if sub_compaction {
    1955            0 :             info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
    1956            0 :             let jobs = self
    1957            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    1958            0 :                 .await?;
    1959            0 :             let jobs_len = jobs.len();
    1960            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    1961            0 :                 info!(
    1962            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    1963            0 :                     idx + 1,
    1964              :                     jobs_len
    1965              :                 );
    1966            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    1967              :             }
    1968            0 :             if jobs_len == 0 {
    1969            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    1970            0 :             }
    1971            0 :             return Ok(());
    1972           54 :         }
    1973           54 :         self.compact_with_gc_inner(cancel, job, ctx).await
    1974           54 :     }
    1975              : 
    1976           54 :     async fn compact_with_gc_inner(
    1977           54 :         self: &Arc<Self>,
    1978           54 :         cancel: &CancellationToken,
    1979           54 :         job: GcCompactJob,
    1980           54 :         ctx: &RequestContext,
    1981           54 :     ) -> anyhow::Result<()> {
    1982           54 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1983           54 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1984           54 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1985           54 : 
    1986           54 :         let gc_lock = async {
    1987           54 :             tokio::select! {
    1988           54 :                 guard = self.gc_lock.lock() => Ok(guard),
    1989              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1990           54 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1991              :             }
    1992           54 :         };
    1993              : 
    1994           54 :         let gc_lock = crate::timed(
    1995           54 :             gc_lock,
    1996           54 :             "acquires gc lock",
    1997           54 :             std::time::Duration::from_secs(5),
    1998           54 :         )
    1999           54 :         .await?;
    2000              : 
    2001           54 :         let dry_run = job.dry_run;
    2002           54 :         let compact_key_range = job.compact_key_range;
    2003           54 :         let compact_lsn_range = job.compact_lsn_range;
    2004           54 : 
    2005           54 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
    2006              : 
    2007           54 :         scopeguard::defer! {
    2008           54 :             info!("done enhanced gc bottom-most compaction");
    2009           54 :         };
    2010           54 : 
    2011           54 :         let mut stat = CompactionStatistics::default();
    2012              : 
    2013              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    2014              :         // The layer selection has the following properties:
    2015              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    2016              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    2017           52 :         let job_desc = {
    2018           54 :             let guard = self.layers.read().await;
    2019           54 :             let layers = guard.layer_map()?;
    2020           54 :             let gc_info = self.gc_info.read().unwrap();
    2021           54 :             let mut retain_lsns_below_horizon = Vec::new();
    2022           54 :             let gc_cutoff = {
    2023              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2024              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2025              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2026              :                 // to get the truth data.
    2027           54 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    2028              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2029              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2030              :                 // the real cutoff.
    2031           54 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2032           48 :                     real_gc_cutoff
    2033              :                 } else {
    2034            6 :                     compact_lsn_range.end
    2035              :                 };
    2036           54 :                 if gc_cutoff > real_gc_cutoff {
    2037            4 :                     warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
    2038            4 :                     gc_cutoff = real_gc_cutoff;
    2039           50 :                 }
    2040           54 :                 gc_cutoff
    2041              :             };
    2042           70 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2043           70 :                 if lsn < &gc_cutoff {
    2044           70 :                     retain_lsns_below_horizon.push(*lsn);
    2045           70 :                 }
    2046              :             }
    2047           54 :             for lsn in gc_info.leases.keys() {
    2048            0 :                 if lsn < &gc_cutoff {
    2049            0 :                     retain_lsns_below_horizon.push(*lsn);
    2050            0 :                 }
    2051              :             }
    2052           54 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2053           54 :             drop(gc_info);
    2054              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2055           54 :             let Some(max_layer_lsn) = layers
    2056           54 :                 .iter_historic_layers()
    2057          244 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2058          208 :                 .map(|desc| desc.get_lsn_range().end)
    2059           54 :                 .max()
    2060              :             else {
    2061            0 :                 info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
    2062            0 :                 return Ok(());
    2063              :             };
    2064              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    2065              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    2066              :             // it is a branch.
    2067           54 :             let Some(min_layer_lsn) = layers
    2068           54 :                 .iter_historic_layers()
    2069          244 :                 .filter(|desc| {
    2070          244 :                     if compact_lsn_range.start == Lsn::INVALID {
    2071          198 :                         true // select all layers below if start == Lsn(0)
    2072              :                     } else {
    2073           46 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    2074              :                     }
    2075          244 :                 })
    2076          226 :                 .map(|desc| desc.get_lsn_range().start)
    2077           54 :                 .min()
    2078              :             else {
    2079            0 :                 info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
    2080            0 :                 return Ok(());
    2081              :             };
    2082              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2083              :             // layers to compact.
    2084           54 :             let mut rewrite_layers = Vec::new();
    2085          244 :             for desc in layers.iter_historic_layers() {
    2086          244 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2087          208 :                     && desc.get_lsn_range().start >= min_layer_lsn
    2088          190 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    2089              :                 {
    2090              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2091              :                     // even if it might contain extra keys
    2092          152 :                     selected_layers.push(guard.get_from_desc(&desc));
    2093          152 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2094          152 :                     // to overlap image layers)
    2095          152 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    2096            2 :                     {
    2097            2 :                         rewrite_layers.push(desc);
    2098          150 :                     }
    2099           92 :                 }
    2100              :             }
    2101           54 :             if selected_layers.is_empty() {
    2102            2 :                 info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
    2103            2 :                 return Ok(());
    2104           52 :             }
    2105           52 :             retain_lsns_below_horizon.sort();
    2106           52 :             GcCompactionJobDescription {
    2107           52 :                 selected_layers,
    2108           52 :                 gc_cutoff,
    2109           52 :                 retain_lsns_below_horizon,
    2110           52 :                 min_layer_lsn,
    2111           52 :                 max_layer_lsn,
    2112           52 :                 compaction_key_range: compact_key_range,
    2113           52 :                 rewrite_layers,
    2114           52 :             }
    2115              :         };
    2116           52 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    2117              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    2118              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    2119            8 :             (true, job_desc.min_layer_lsn)
    2120           44 :         } else if self.ancestor_timeline.is_some() {
    2121              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    2122              :             // LSN ranges all the way to the ancestor timeline.
    2123            2 :             (true, self.ancestor_lsn)
    2124              :         } else {
    2125           42 :             let res = job_desc
    2126           42 :                 .retain_lsns_below_horizon
    2127           42 :                 .first()
    2128           42 :                 .copied()
    2129           42 :                 .unwrap_or(job_desc.gc_cutoff);
    2130           42 :             if cfg!(debug_assertions) {
    2131           42 :                 assert_eq!(
    2132           42 :                     res,
    2133           42 :                     job_desc
    2134           42 :                         .retain_lsns_below_horizon
    2135           42 :                         .iter()
    2136           42 :                         .min()
    2137           42 :                         .copied()
    2138           42 :                         .unwrap_or(job_desc.gc_cutoff)
    2139           42 :                 );
    2140            0 :             }
    2141           42 :             (false, res)
    2142              :         };
    2143           52 :         info!(
    2144            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    2145            0 :             job_desc.selected_layers.len(),
    2146            0 :             job_desc.rewrite_layers.len(),
    2147              :             job_desc.max_layer_lsn,
    2148              :             job_desc.min_layer_lsn,
    2149              :             job_desc.gc_cutoff,
    2150              :             lowest_retain_lsn,
    2151              :             job_desc.compaction_key_range.start,
    2152              :             job_desc.compaction_key_range.end,
    2153              :             has_data_below,
    2154              :         );
    2155              : 
    2156          204 :         for layer in &job_desc.selected_layers {
    2157          152 :             debug!("read layer: {}", layer.layer_desc().key());
    2158              :         }
    2159           54 :         for layer in &job_desc.rewrite_layers {
    2160            2 :             debug!("rewrite layer: {}", layer.key());
    2161              :         }
    2162              : 
    2163           52 :         self.check_compaction_space(&job_desc.selected_layers)
    2164           52 :             .await?;
    2165              : 
    2166              :         // Generate statistics for the compaction
    2167          204 :         for layer in &job_desc.selected_layers {
    2168          152 :             let desc = layer.layer_desc();
    2169          152 :             if desc.is_delta() {
    2170           86 :                 stat.visit_delta_layer(desc.file_size());
    2171           86 :             } else {
    2172           66 :                 stat.visit_image_layer(desc.file_size());
    2173           66 :             }
    2174              :         }
    2175              : 
    2176              :         // Step 1: construct a k-merge iterator over all layers.
    2177              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2178           52 :         let layer_names = job_desc
    2179           52 :             .selected_layers
    2180           52 :             .iter()
    2181          152 :             .map(|layer| layer.layer_desc().layer_name())
    2182           52 :             .collect_vec();
    2183           52 :         if let Some(err) = check_valid_layermap(&layer_names) {
    2184            0 :             bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
    2185           52 :         }
    2186           52 :         // The maximum LSN we are processing in this compaction loop
    2187           52 :         let end_lsn = job_desc
    2188           52 :             .selected_layers
    2189           52 :             .iter()
    2190          152 :             .map(|l| l.layer_desc().lsn_range.end)
    2191           52 :             .max()
    2192           52 :             .unwrap();
    2193           52 :         let mut delta_layers = Vec::new();
    2194           52 :         let mut image_layers = Vec::new();
    2195           52 :         let mut downloaded_layers = Vec::new();
    2196           52 :         let mut total_downloaded_size = 0;
    2197           52 :         let mut total_layer_size = 0;
    2198          204 :         for layer in &job_desc.selected_layers {
    2199          152 :             if layer.needs_download().await?.is_some() {
    2200            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    2201          152 :             }
    2202          152 :             total_layer_size += layer.layer_desc().file_size;
    2203          152 :             let resident_layer = layer.download_and_keep_resident().await?;
    2204          152 :             downloaded_layers.push(resident_layer);
    2205              :         }
    2206           52 :         info!(
    2207            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    2208            0 :             total_downloaded_size,
    2209            0 :             total_layer_size,
    2210            0 :             total_downloaded_size as f64 / total_layer_size as f64
    2211              :         );
    2212          204 :         for resident_layer in &downloaded_layers {
    2213          152 :             if resident_layer.layer_desc().is_delta() {
    2214           86 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2215           86 :                 delta_layers.push(layer);
    2216              :             } else {
    2217           66 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2218           66 :                 image_layers.push(layer);
    2219              :             }
    2220              :         }
    2221           52 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2222           52 :         let mut merge_iter = FilterIterator::create(
    2223           52 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2224           52 :             dense_ks,
    2225           52 :             sparse_ks,
    2226           52 :         )?;
    2227              : 
    2228              :         // Step 2: Produce images+deltas.
    2229           52 :         let mut accumulated_values = Vec::new();
    2230           52 :         let mut last_key: Option<Key> = None;
    2231              : 
    2232              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2233              :         // when some condition meet.
    2234           52 :         let mut image_layer_writer = if !has_data_below {
    2235              :             Some(
    2236           42 :                 SplitImageLayerWriter::new(
    2237           42 :                     self.conf,
    2238           42 :                     self.timeline_id,
    2239           42 :                     self.tenant_shard_id,
    2240           42 :                     job_desc.compaction_key_range.start,
    2241           42 :                     lowest_retain_lsn,
    2242           42 :                     self.get_compaction_target_size(),
    2243           42 :                     ctx,
    2244           42 :                 )
    2245           42 :                 .await?,
    2246              :             )
    2247              :         } else {
    2248           10 :             None
    2249              :         };
    2250              : 
    2251           52 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2252           52 :             self.conf,
    2253           52 :             self.timeline_id,
    2254           52 :             self.tenant_shard_id,
    2255           52 :             lowest_retain_lsn..end_lsn,
    2256           52 :             self.get_compaction_target_size(),
    2257           52 :         )
    2258           52 :         .await?;
    2259              : 
    2260              :         #[derive(Default)]
    2261              :         struct RewritingLayers {
    2262              :             before: Option<DeltaLayerWriter>,
    2263              :             after: Option<DeltaLayerWriter>,
    2264              :         }
    2265           52 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2266              : 
    2267              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    2268              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    2269              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    2270              :         ///
    2271              :         /// This function unifies the cases so that later code doesn't have to think about it.
    2272              :         ///
    2273              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2274              :         /// is needed for reconstruction. This should be fixed in the future.
    2275              :         ///
    2276              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2277              :         /// images.
    2278          622 :         async fn get_ancestor_image(
    2279          622 :             this_tline: &Arc<Timeline>,
    2280          622 :             key: Key,
    2281          622 :             ctx: &RequestContext,
    2282          622 :             has_data_below: bool,
    2283          622 :             history_lsn_point: Lsn,
    2284          622 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2285          622 :             if !has_data_below {
    2286          584 :                 return Ok(None);
    2287           38 :             };
    2288              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2289              :             // as much existing code as possible.
    2290           38 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    2291           38 :             Ok(Some((key, history_lsn_point, img)))
    2292          622 :         }
    2293              : 
    2294              :         // Actually, we can decide not to write to the image layer at all at this point because
    2295              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2296              :         // create this writer, and discard the writer in the end.
    2297              : 
    2298          966 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2299          914 :             if cancel.is_cancelled() {
    2300            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2301          914 :             }
    2302          914 :             if self.shard_identity.is_key_disposable(&key) {
    2303              :                 // If this shard does not need to store this key, simply skip it.
    2304              :                 //
    2305              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2306              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2307              :                 // a whole layer file as handling key spaces (ranges).
    2308            0 :                 if cfg!(debug_assertions) {
    2309            0 :                     let shard = self.shard_identity.shard_index();
    2310            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2311            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2312            0 :                 }
    2313            0 :                 continue;
    2314          914 :             }
    2315          914 :             if !job_desc.compaction_key_range.contains(&key) {
    2316           64 :                 if !desc.is_delta {
    2317           60 :                     continue;
    2318            4 :                 }
    2319            4 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2320            4 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2321            0 :                     if rewriter.before.is_none() {
    2322            0 :                         rewriter.before = Some(
    2323            0 :                             DeltaLayerWriter::new(
    2324            0 :                                 self.conf,
    2325            0 :                                 self.timeline_id,
    2326            0 :                                 self.tenant_shard_id,
    2327            0 :                                 desc.key_range.start,
    2328            0 :                                 desc.lsn_range.clone(),
    2329            0 :                                 ctx,
    2330            0 :                             )
    2331            0 :                             .await?,
    2332              :                         );
    2333            0 :                     }
    2334            0 :                     rewriter.before.as_mut().unwrap()
    2335            4 :                 } else if key >= job_desc.compaction_key_range.end {
    2336            4 :                     if rewriter.after.is_none() {
    2337            2 :                         rewriter.after = Some(
    2338            2 :                             DeltaLayerWriter::new(
    2339            2 :                                 self.conf,
    2340            2 :                                 self.timeline_id,
    2341            2 :                                 self.tenant_shard_id,
    2342            2 :                                 job_desc.compaction_key_range.end,
    2343            2 :                                 desc.lsn_range.clone(),
    2344            2 :                                 ctx,
    2345            2 :                             )
    2346            2 :                             .await?,
    2347              :                         );
    2348            2 :                     }
    2349            4 :                     rewriter.after.as_mut().unwrap()
    2350              :                 } else {
    2351            0 :                     unreachable!()
    2352              :                 };
    2353            4 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2354            4 :                 continue;
    2355          850 :             }
    2356          850 :             match val {
    2357          610 :                 Value::Image(_) => stat.visit_image_key(&val),
    2358          240 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2359              :             }
    2360          850 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2361          280 :                 if last_key.is_none() {
    2362           52 :                     last_key = Some(key);
    2363          228 :                 }
    2364          280 :                 accumulated_values.push((key, lsn, val));
    2365              :             } else {
    2366          570 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2367          570 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    2368          570 :                 let retention = self
    2369          570 :                     .generate_key_retention(
    2370          570 :                         *last_key,
    2371          570 :                         &accumulated_values,
    2372          570 :                         job_desc.gc_cutoff,
    2373          570 :                         &job_desc.retain_lsns_below_horizon,
    2374          570 :                         COMPACTION_DELTA_THRESHOLD,
    2375          570 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    2376          570 :                             .await?,
    2377              :                     )
    2378          570 :                     .await?;
    2379          570 :                 retention
    2380          570 :                     .pipe_to(
    2381          570 :                         *last_key,
    2382          570 :                         &mut delta_layer_writer,
    2383          570 :                         image_layer_writer.as_mut(),
    2384          570 :                         &mut stat,
    2385          570 :                         ctx,
    2386          570 :                     )
    2387          570 :                     .await?;
    2388          570 :                 accumulated_values.clear();
    2389          570 :                 *last_key = key;
    2390          570 :                 accumulated_values.push((key, lsn, val));
    2391              :             }
    2392              :         }
    2393              : 
    2394              :         // TODO: move the below part to the loop body
    2395           52 :         let last_key = last_key.expect("no keys produced during compaction");
    2396           52 :         stat.on_unique_key_visited();
    2397              : 
    2398           52 :         let retention = self
    2399           52 :             .generate_key_retention(
    2400           52 :                 last_key,
    2401           52 :                 &accumulated_values,
    2402           52 :                 job_desc.gc_cutoff,
    2403           52 :                 &job_desc.retain_lsns_below_horizon,
    2404           52 :                 COMPACTION_DELTA_THRESHOLD,
    2405           52 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
    2406              :             )
    2407           52 :             .await?;
    2408           52 :         retention
    2409           52 :             .pipe_to(
    2410           52 :                 last_key,
    2411           52 :                 &mut delta_layer_writer,
    2412           52 :                 image_layer_writer.as_mut(),
    2413           52 :                 &mut stat,
    2414           52 :                 ctx,
    2415           52 :             )
    2416           52 :             .await?;
    2417              :         // end: move the above part to the loop body
    2418              : 
    2419           52 :         let mut rewrote_delta_layers = Vec::new();
    2420           54 :         for (key, writers) in delta_layer_rewriters {
    2421            2 :             if let Some(delta_writer_before) = writers.before {
    2422            0 :                 let (desc, path) = delta_writer_before
    2423            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    2424            0 :                     .await?;
    2425            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2426            0 :                 rewrote_delta_layers.push(layer);
    2427            2 :             }
    2428            2 :             if let Some(delta_writer_after) = writers.after {
    2429            2 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    2430            2 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2431            2 :                 rewrote_delta_layers.push(layer);
    2432            0 :             }
    2433              :         }
    2434              : 
    2435           74 :         let discard = |key: &PersistentLayerKey| {
    2436           74 :             let key = key.clone();
    2437           74 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    2438           74 :         };
    2439              : 
    2440           52 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    2441           42 :             if !dry_run {
    2442           38 :                 let end_key = job_desc.compaction_key_range.end;
    2443           38 :                 writer
    2444           38 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    2445           38 :                     .await?
    2446              :             } else {
    2447            4 :                 drop(writer);
    2448            4 :                 Vec::new()
    2449              :             }
    2450              :         } else {
    2451           10 :             Vec::new()
    2452              :         };
    2453              : 
    2454           52 :         let produced_delta_layers = if !dry_run {
    2455           48 :             delta_layer_writer
    2456           48 :                 .finish_with_discard_fn(self, ctx, discard)
    2457           48 :                 .await?
    2458              :         } else {
    2459            4 :             drop(delta_layer_writer);
    2460            4 :             Vec::new()
    2461              :         };
    2462              : 
    2463              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    2464              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    2465           52 :         let mut compact_to = Vec::new();
    2466           52 :         let mut keep_layers = HashSet::new();
    2467           52 :         let produced_delta_layers_len = produced_delta_layers.len();
    2468           52 :         let produced_image_layers_len = produced_image_layers.len();
    2469           88 :         for action in produced_delta_layers {
    2470           36 :             match action {
    2471           22 :                 BatchWriterResult::Produced(layer) => {
    2472           22 :                     if cfg!(debug_assertions) {
    2473           22 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    2474            0 :                     }
    2475           22 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    2476           22 :                     compact_to.push(layer);
    2477              :                 }
    2478           14 :                 BatchWriterResult::Discarded(l) => {
    2479           14 :                     if cfg!(debug_assertions) {
    2480           14 :                         info!("discarded delta layer: {}", l);
    2481            0 :                     }
    2482           14 :                     keep_layers.insert(l);
    2483           14 :                     stat.discard_delta_layer();
    2484              :                 }
    2485              :             }
    2486              :         }
    2487           54 :         for layer in &rewrote_delta_layers {
    2488            2 :             debug!(
    2489            0 :                 "produced rewritten delta layer: {}",
    2490            0 :                 layer.layer_desc().key()
    2491              :             );
    2492              :         }
    2493           52 :         compact_to.extend(rewrote_delta_layers);
    2494           90 :         for action in produced_image_layers {
    2495           38 :             match action {
    2496           30 :                 BatchWriterResult::Produced(layer) => {
    2497           30 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    2498           30 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2499           30 :                     compact_to.push(layer);
    2500              :                 }
    2501            8 :                 BatchWriterResult::Discarded(l) => {
    2502            8 :                     debug!("discarded image layer: {}", l);
    2503            8 :                     keep_layers.insert(l);
    2504            8 :                     stat.discard_image_layer();
    2505              :                 }
    2506              :             }
    2507              :         }
    2508              : 
    2509           52 :         let mut layer_selection = job_desc.selected_layers;
    2510              : 
    2511              :         // Partial compaction might select more data than it processes, e.g., if
    2512              :         // the compaction_key_range only partially overlaps:
    2513              :         //
    2514              :         //         [---compaction_key_range---]
    2515              :         //   [---A----][----B----][----C----][----D----]
    2516              :         //
    2517              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    2518              :         // the compaction key range, so we can always discard them. However, for image
    2519              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    2520              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    2521              :         //
    2522              :         // The created image layers contain whatever is needed from B, C, and from
    2523              :         // `----]` of A, and from  `[---` of D.
    2524              :         //
    2525              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    2526              :         // keep that data.
    2527              :         //
    2528              :         // The solution for now is to keep A and D completely if they are image layers.
    2529              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    2530              :         // is _not_ fully covered by compaction_key_range).
    2531          204 :         for layer in &layer_selection {
    2532          152 :             if !layer.layer_desc().is_delta() {
    2533           66 :                 if !overlaps_with(
    2534           66 :                     &layer.layer_desc().key_range,
    2535           66 :                     &job_desc.compaction_key_range,
    2536           66 :                 ) {
    2537            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    2538           66 :                 }
    2539           66 :                 if !fully_contains(
    2540           66 :                     &job_desc.compaction_key_range,
    2541           66 :                     &layer.layer_desc().key_range,
    2542           66 :                 ) {
    2543            8 :                     keep_layers.insert(layer.layer_desc().key());
    2544           58 :                 }
    2545           86 :             }
    2546              :         }
    2547              : 
    2548          152 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2549           52 : 
    2550           52 :         info!(
    2551            0 :             "gc-compaction statistics: {}",
    2552            0 :             serde_json::to_string(&stat)?
    2553              :         );
    2554              : 
    2555           52 :         if dry_run {
    2556            4 :             return Ok(());
    2557           48 :         }
    2558           48 : 
    2559           48 :         info!(
    2560            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2561            0 :             produced_delta_layers_len,
    2562            0 :             produced_image_layers_len,
    2563            0 :             layer_selection.len()
    2564              :         );
    2565              : 
    2566              :         // Step 3: Place back to the layer map.
    2567              : 
    2568              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    2569           48 :         let all_layers = {
    2570           48 :             let guard = self.layers.read().await;
    2571           48 :             let layer_map = guard.layer_map()?;
    2572           48 :             layer_map.iter_historic_layers().collect_vec()
    2573           48 :         };
    2574           48 : 
    2575           48 :         let mut final_layers = all_layers
    2576           48 :             .iter()
    2577          214 :             .map(|layer| layer.layer_name())
    2578           48 :             .collect::<HashSet<_>>();
    2579          152 :         for layer in &layer_selection {
    2580          104 :             final_layers.remove(&layer.layer_desc().layer_name());
    2581          104 :         }
    2582          102 :         for layer in &compact_to {
    2583           54 :             final_layers.insert(layer.layer_desc().layer_name());
    2584           54 :         }
    2585           48 :         let final_layers = final_layers.into_iter().collect_vec();
    2586              : 
    2587              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    2588              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    2589              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    2590           48 :         if let Some(err) = check_valid_layermap(&final_layers) {
    2591            0 :             bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
    2592           48 :         }
    2593              : 
    2594              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    2595              :         // operate on L1 layers.
    2596              :         {
    2597           48 :             let mut guard = self.layers.write().await;
    2598           48 :             guard
    2599           48 :                 .open_mut()?
    2600           48 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2601           48 :         };
    2602           48 : 
    2603           48 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    2604           48 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    2605           48 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    2606           48 :         // be batched into `schedule_compaction_update`.
    2607           48 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    2608           48 :         self.schedule_uploads(disk_consistent_lsn, None)?;
    2609           48 :         self.remote_client
    2610           48 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2611              : 
    2612           48 :         drop(gc_lock);
    2613           48 : 
    2614           48 :         Ok(())
    2615           54 :     }
    2616              : }
    2617              : 
    2618              : struct TimelineAdaptor {
    2619              :     timeline: Arc<Timeline>,
    2620              : 
    2621              :     keyspace: (Lsn, KeySpace),
    2622              : 
    2623              :     new_deltas: Vec<ResidentLayer>,
    2624              :     new_images: Vec<ResidentLayer>,
    2625              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2626              : }
    2627              : 
    2628              : impl TimelineAdaptor {
    2629            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2630            0 :         Self {
    2631            0 :             timeline: timeline.clone(),
    2632            0 :             keyspace,
    2633            0 :             new_images: Vec::new(),
    2634            0 :             new_deltas: Vec::new(),
    2635            0 :             layers_to_delete: Vec::new(),
    2636            0 :         }
    2637            0 :     }
    2638              : 
    2639            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2640            0 :         let layers_to_delete = {
    2641            0 :             let guard = self.timeline.layers.read().await;
    2642            0 :             self.layers_to_delete
    2643            0 :                 .iter()
    2644            0 :                 .map(|x| guard.get_from_desc(x))
    2645            0 :                 .collect::<Vec<Layer>>()
    2646            0 :         };
    2647            0 :         self.timeline
    2648            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2649            0 :             .await?;
    2650              : 
    2651            0 :         self.timeline
    2652            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2653              : 
    2654            0 :         self.new_deltas.clear();
    2655            0 :         self.layers_to_delete.clear();
    2656            0 :         Ok(())
    2657            0 :     }
    2658              : }
    2659              : 
    2660              : #[derive(Clone)]
    2661              : struct ResidentDeltaLayer(ResidentLayer);
    2662              : #[derive(Clone)]
    2663              : struct ResidentImageLayer(ResidentLayer);
    2664              : 
    2665              : impl CompactionJobExecutor for TimelineAdaptor {
    2666              :     type Key = pageserver_api::key::Key;
    2667              : 
    2668              :     type Layer = OwnArc<PersistentLayerDesc>;
    2669              :     type DeltaLayer = ResidentDeltaLayer;
    2670              :     type ImageLayer = ResidentImageLayer;
    2671              : 
    2672              :     type RequestContext = crate::context::RequestContext;
    2673              : 
    2674            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2675            0 :         self.timeline.get_shard_identity()
    2676            0 :     }
    2677              : 
    2678            0 :     async fn get_layers(
    2679            0 :         &mut self,
    2680            0 :         key_range: &Range<Key>,
    2681            0 :         lsn_range: &Range<Lsn>,
    2682            0 :         _ctx: &RequestContext,
    2683            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2684            0 :         self.flush_updates().await?;
    2685              : 
    2686            0 :         let guard = self.timeline.layers.read().await;
    2687            0 :         let layer_map = guard.layer_map()?;
    2688              : 
    2689            0 :         let result = layer_map
    2690            0 :             .iter_historic_layers()
    2691            0 :             .filter(|l| {
    2692            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2693            0 :             })
    2694            0 :             .map(OwnArc)
    2695            0 :             .collect();
    2696            0 :         Ok(result)
    2697            0 :     }
    2698              : 
    2699            0 :     async fn get_keyspace(
    2700            0 :         &mut self,
    2701            0 :         key_range: &Range<Key>,
    2702            0 :         lsn: Lsn,
    2703            0 :         _ctx: &RequestContext,
    2704            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2705            0 :         if lsn == self.keyspace.0 {
    2706            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2707            0 :                 &self.keyspace.1.ranges,
    2708            0 :                 key_range,
    2709            0 :             ))
    2710              :         } else {
    2711              :             // The current compaction implementation only ever requests the key space
    2712              :             // at the compaction end LSN.
    2713            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2714              :         }
    2715            0 :     }
    2716              : 
    2717            0 :     async fn downcast_delta_layer(
    2718            0 :         &self,
    2719            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2720            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2721            0 :         // this is a lot more complex than a simple downcast...
    2722            0 :         if layer.is_delta() {
    2723            0 :             let l = {
    2724            0 :                 let guard = self.timeline.layers.read().await;
    2725            0 :                 guard.get_from_desc(layer)
    2726              :             };
    2727            0 :             let result = l.download_and_keep_resident().await?;
    2728              : 
    2729            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2730              :         } else {
    2731            0 :             Ok(None)
    2732              :         }
    2733            0 :     }
    2734              : 
    2735            0 :     async fn create_image(
    2736            0 :         &mut self,
    2737            0 :         lsn: Lsn,
    2738            0 :         key_range: &Range<Key>,
    2739            0 :         ctx: &RequestContext,
    2740            0 :     ) -> anyhow::Result<()> {
    2741            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2742            0 :     }
    2743              : 
    2744            0 :     async fn create_delta(
    2745            0 :         &mut self,
    2746            0 :         lsn_range: &Range<Lsn>,
    2747            0 :         key_range: &Range<Key>,
    2748            0 :         input_layers: &[ResidentDeltaLayer],
    2749            0 :         ctx: &RequestContext,
    2750            0 :     ) -> anyhow::Result<()> {
    2751            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2752              : 
    2753            0 :         let mut all_entries = Vec::new();
    2754            0 :         for dl in input_layers.iter() {
    2755            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2756              :         }
    2757              : 
    2758              :         // The current stdlib sorting implementation is designed in a way where it is
    2759              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2760            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2761              : 
    2762            0 :         let mut writer = DeltaLayerWriter::new(
    2763            0 :             self.timeline.conf,
    2764            0 :             self.timeline.timeline_id,
    2765            0 :             self.timeline.tenant_shard_id,
    2766            0 :             key_range.start,
    2767            0 :             lsn_range.clone(),
    2768            0 :             ctx,
    2769            0 :         )
    2770            0 :         .await?;
    2771              : 
    2772            0 :         let mut dup_values = 0;
    2773            0 : 
    2774            0 :         // This iterator walks through all key-value pairs from all the layers
    2775            0 :         // we're compacting, in key, LSN order.
    2776            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2777              :         for &DeltaEntry {
    2778            0 :             key, lsn, ref val, ..
    2779            0 :         } in all_entries.iter()
    2780              :         {
    2781            0 :             if prev == Some((key, lsn)) {
    2782              :                 // This is a duplicate. Skip it.
    2783              :                 //
    2784              :                 // It can happen if compaction is interrupted after writing some
    2785              :                 // layers but not all, and we are compacting the range again.
    2786              :                 // The calculations in the algorithm assume that there are no
    2787              :                 // duplicates, so the math on targeted file size is likely off,
    2788              :                 // and we will create smaller files than expected.
    2789            0 :                 dup_values += 1;
    2790            0 :                 continue;
    2791            0 :             }
    2792              : 
    2793            0 :             let value = val.load(ctx).await?;
    2794              : 
    2795            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2796              : 
    2797            0 :             prev = Some((key, lsn));
    2798              :         }
    2799              : 
    2800            0 :         if dup_values > 0 {
    2801            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2802            0 :         }
    2803              : 
    2804            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2805            0 :             Err(anyhow::anyhow!(
    2806            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2807            0 :             ))
    2808            0 :         });
    2809              : 
    2810            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2811            0 :         let new_delta_layer =
    2812            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2813              : 
    2814            0 :         self.new_deltas.push(new_delta_layer);
    2815            0 :         Ok(())
    2816            0 :     }
    2817              : 
    2818            0 :     async fn delete_layer(
    2819            0 :         &mut self,
    2820            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2821            0 :         _ctx: &RequestContext,
    2822            0 :     ) -> anyhow::Result<()> {
    2823            0 :         self.layers_to_delete.push(layer.clone().0);
    2824            0 :         Ok(())
    2825            0 :     }
    2826              : }
    2827              : 
    2828              : impl TimelineAdaptor {
    2829            0 :     async fn create_image_impl(
    2830            0 :         &mut self,
    2831            0 :         lsn: Lsn,
    2832            0 :         key_range: &Range<Key>,
    2833            0 :         ctx: &RequestContext,
    2834            0 :     ) -> Result<(), CreateImageLayersError> {
    2835            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2836              : 
    2837            0 :         let image_layer_writer = ImageLayerWriter::new(
    2838            0 :             self.timeline.conf,
    2839            0 :             self.timeline.timeline_id,
    2840            0 :             self.timeline.tenant_shard_id,
    2841            0 :             key_range,
    2842            0 :             lsn,
    2843            0 :             ctx,
    2844            0 :         )
    2845            0 :         .await?;
    2846              : 
    2847            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2848            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2849            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2850            0 :             )))
    2851            0 :         });
    2852              : 
    2853            0 :         let keyspace = KeySpace {
    2854            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2855              :         };
    2856              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2857            0 :         let start = Key::MIN;
    2858              :         let ImageLayerCreationOutcome {
    2859            0 :             image,
    2860              :             next_start_key: _,
    2861            0 :         } = self
    2862            0 :             .timeline
    2863            0 :             .create_image_layer_for_rel_blocks(
    2864            0 :                 &keyspace,
    2865            0 :                 image_layer_writer,
    2866            0 :                 lsn,
    2867            0 :                 ctx,
    2868            0 :                 key_range.clone(),
    2869            0 :                 start,
    2870            0 :             )
    2871            0 :             .await?;
    2872              : 
    2873            0 :         if let Some(image_layer) = image {
    2874            0 :             self.new_images.push(image_layer);
    2875            0 :         }
    2876              : 
    2877            0 :         timer.stop_and_record();
    2878            0 : 
    2879            0 :         Ok(())
    2880            0 :     }
    2881              : }
    2882              : 
    2883              : impl CompactionRequestContext for crate::context::RequestContext {}
    2884              : 
    2885              : #[derive(Debug, Clone)]
    2886              : pub struct OwnArc<T>(pub Arc<T>);
    2887              : 
    2888              : impl<T> Deref for OwnArc<T> {
    2889              :     type Target = <Arc<T> as Deref>::Target;
    2890            0 :     fn deref(&self) -> &Self::Target {
    2891            0 :         &self.0
    2892            0 :     }
    2893              : }
    2894              : 
    2895              : impl<T> AsRef<T> for OwnArc<T> {
    2896            0 :     fn as_ref(&self) -> &T {
    2897            0 :         self.0.as_ref()
    2898            0 :     }
    2899              : }
    2900              : 
    2901              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2902            0 :     fn key_range(&self) -> &Range<Key> {
    2903            0 :         &self.key_range
    2904            0 :     }
    2905            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2906            0 :         &self.lsn_range
    2907            0 :     }
    2908            0 :     fn file_size(&self) -> u64 {
    2909            0 :         self.file_size
    2910            0 :     }
    2911            0 :     fn short_id(&self) -> std::string::String {
    2912            0 :         self.as_ref().short_id().to_string()
    2913            0 :     }
    2914            0 :     fn is_delta(&self) -> bool {
    2915            0 :         self.as_ref().is_delta()
    2916            0 :     }
    2917              : }
    2918              : 
    2919              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2920            0 :     fn key_range(&self) -> &Range<Key> {
    2921            0 :         &self.layer_desc().key_range
    2922            0 :     }
    2923            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2924            0 :         &self.layer_desc().lsn_range
    2925            0 :     }
    2926            0 :     fn file_size(&self) -> u64 {
    2927            0 :         self.layer_desc().file_size
    2928            0 :     }
    2929            0 :     fn short_id(&self) -> std::string::String {
    2930            0 :         self.layer_desc().short_id().to_string()
    2931            0 :     }
    2932            0 :     fn is_delta(&self) -> bool {
    2933            0 :         true
    2934            0 :     }
    2935              : }
    2936              : 
    2937              : use crate::tenant::timeline::DeltaEntry;
    2938              : 
    2939              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2940            0 :     fn key_range(&self) -> &Range<Key> {
    2941            0 :         &self.0.layer_desc().key_range
    2942            0 :     }
    2943            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2944            0 :         &self.0.layer_desc().lsn_range
    2945            0 :     }
    2946            0 :     fn file_size(&self) -> u64 {
    2947            0 :         self.0.layer_desc().file_size
    2948            0 :     }
    2949            0 :     fn short_id(&self) -> std::string::String {
    2950            0 :         self.0.layer_desc().short_id().to_string()
    2951            0 :     }
    2952            0 :     fn is_delta(&self) -> bool {
    2953            0 :         true
    2954            0 :     }
    2955              : }
    2956              : 
    2957              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2958              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2959              : 
    2960            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2961            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    2962            0 :     }
    2963              : }
    2964              : 
    2965              : impl CompactionLayer<Key> for ResidentImageLayer {
    2966            0 :     fn key_range(&self) -> &Range<Key> {
    2967            0 :         &self.0.layer_desc().key_range
    2968            0 :     }
    2969            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2970            0 :         &self.0.layer_desc().lsn_range
    2971            0 :     }
    2972            0 :     fn file_size(&self) -> u64 {
    2973            0 :         self.0.layer_desc().file_size
    2974            0 :     }
    2975            0 :     fn short_id(&self) -> std::string::String {
    2976            0 :         self.0.layer_desc().short_id().to_string()
    2977            0 :     }
    2978            0 :     fn is_delta(&self) -> bool {
    2979            0 :         false
    2980            0 :     }
    2981              : }
    2982              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta