LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 71d97c4519b9017c5903db4dfe4edf4a84645500.info Lines: 60.0 % 1988 1192
Test Date: 2024-12-19 16:48:20 Functions: 42.4 % 144 61

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::key::KEY_SIZE;
      22              : use pageserver_api::keyspace::ShardedRange;
      23              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      24              : use serde::Serialize;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      27              : use utils::id::TimelineId;
      28              : 
      29              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache;
      31              : use crate::statvfs::Statvfs;
      32              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      33              : use crate::tenant::storage_layer::batch_split_writer::{
      34              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      35              : };
      36              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      37              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      38              : use crate::tenant::storage_layer::{
      39              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      40              : };
      41              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      42              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      43              : use crate::tenant::timeline::{Layer, ResidentLayer};
      44              : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
      45              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      46              : use pageserver_api::config::tenant_conf_defaults::{
      47              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      48              : };
      49              : 
      50              : use pageserver_api::key::Key;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::record::NeonWalRecord;
      53              : use pageserver_api::value::Value;
      54              : 
      55              : use utils::lsn::Lsn;
      56              : 
      57              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      58              : use pageserver_compaction::interface::*;
      59              : 
      60              : use super::CompactionError;
      61              : 
      62              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      63              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      64              : 
      65              : /// A scheduled compaction task.
      66              : pub(crate) struct ScheduledCompactionTask {
      67              :     /// It's unfortunate that we need to store a compact options struct here because the only outer
      68              :     /// API we can call here is `compact_with_options` which does a few setup calls before starting the
      69              :     /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
      70              :     pub options: CompactOptions,
      71              :     /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
      72              :     pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
      73              :     /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
      74              :     pub gc_block: Option<gc_block::Guard>,
      75              : }
      76              : 
      77              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
      78              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
      79              : /// called.
      80              : #[derive(Debug, Clone)]
      81              : pub(crate) struct GcCompactJob {
      82              :     pub dry_run: bool,
      83              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
      84              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
      85              :     pub compact_key_range: Range<Key>,
      86              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
      87              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
      88              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
      89              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
      90              :     pub compact_lsn_range: Range<Lsn>,
      91              : }
      92              : 
      93              : impl GcCompactJob {
      94           54 :     pub fn from_compact_options(options: CompactOptions) -> Self {
      95           54 :         GcCompactJob {
      96           54 :             dry_run: options.flags.contains(CompactFlags::DryRun),
      97           54 :             compact_key_range: options
      98           54 :                 .compact_key_range
      99           54 :                 .map(|x| x.into())
     100           54 :                 .unwrap_or(Key::MIN..Key::MAX),
     101           54 :             compact_lsn_range: options
     102           54 :                 .compact_lsn_range
     103           54 :                 .map(|x| x.into())
     104           54 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     105           54 :         }
     106           54 :     }
     107              : }
     108              : 
     109              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     110              : /// and contains the exact layers we want to compact.
     111              : pub struct GcCompactionJobDescription {
     112              :     /// All layers to read in the compaction job
     113              :     selected_layers: Vec<Layer>,
     114              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     115              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     116              :     gc_cutoff: Lsn,
     117              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     118              :     /// generate an image == this LSN.
     119              :     retain_lsns_below_horizon: Vec<Lsn>,
     120              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     121              :     /// \>= this LSN will be kept and will not be rewritten.
     122              :     max_layer_lsn: Lsn,
     123              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     124              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     125              :     /// k-merge within gc-compaction.
     126              :     min_layer_lsn: Lsn,
     127              :     /// Only compact layers overlapping with this range.
     128              :     compaction_key_range: Range<Key>,
     129              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     130              :     /// This field is here solely for debugging. The field will not be read once the compaction
     131              :     /// description is generated.
     132              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     133              : }
     134              : 
     135              : /// The result of bottom-most compaction for a single key at each LSN.
     136              : #[derive(Debug)]
     137              : #[cfg_attr(test, derive(PartialEq))]
     138              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     139              : 
     140              : /// The result of bottom-most compaction.
     141              : #[derive(Debug)]
     142              : #[cfg_attr(test, derive(PartialEq))]
     143              : pub(crate) struct KeyHistoryRetention {
     144              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     145              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     146              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     147              :     pub(crate) above_horizon: KeyLogAtLsn,
     148              : }
     149              : 
     150              : impl KeyHistoryRetention {
     151              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     152              :     ///
     153              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     154              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     155              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     156              :     /// Then we delete branch @ 0x20.
     157              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     158              :     /// And that wouldnt' change the shape of the layer.
     159              :     ///
     160              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     161              :     ///
     162              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     163           74 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     164           74 :         if dry_run {
     165            0 :             return true;
     166           74 :         }
     167           74 :         let guard = tline.layers.read().await;
     168           74 :         if !guard.contains_key(key) {
     169           52 :             return false;
     170           22 :         }
     171           22 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     172           22 :         drop(guard);
     173           22 :         if layer_generation == tline.generation {
     174           22 :             info!(
     175              :                 key=%key,
     176              :                 ?layer_generation,
     177            0 :                 "discard layer due to duplicated layer key in the same generation",
     178              :             );
     179           22 :             true
     180              :         } else {
     181            0 :             false
     182              :         }
     183           74 :     }
     184              : 
     185              :     /// Pipe a history of a single key to the writers.
     186              :     ///
     187              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     188              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     189              :     #[allow(clippy::too_many_arguments)]
     190          622 :     async fn pipe_to(
     191          622 :         self,
     192          622 :         key: Key,
     193          622 :         delta_writer: &mut SplitDeltaLayerWriter,
     194          622 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     195          622 :         stat: &mut CompactionStatistics,
     196          622 :         ctx: &RequestContext,
     197          622 :     ) -> anyhow::Result<()> {
     198          622 :         let mut first_batch = true;
     199         2012 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     200         1390 :             if first_batch {
     201          622 :                 if logs.len() == 1 && logs[0].1.is_image() {
     202          584 :                     let Value::Image(img) = &logs[0].1 else {
     203            0 :                         unreachable!()
     204              :                     };
     205          584 :                     stat.produce_image_key(img);
     206          584 :                     if let Some(image_writer) = image_writer.as_mut() {
     207          584 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     208              :                     } else {
     209            0 :                         delta_writer
     210            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     211            0 :                             .await?;
     212              :                     }
     213              :                 } else {
     214           66 :                     for (lsn, val) in logs {
     215           28 :                         stat.produce_key(&val);
     216           28 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     217              :                     }
     218              :                 }
     219          622 :                 first_batch = false;
     220              :             } else {
     221          884 :                 for (lsn, val) in logs {
     222          116 :                     stat.produce_key(&val);
     223          116 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     224              :                 }
     225              :             }
     226              :         }
     227          622 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     228          680 :         for (lsn, val) in above_horizon_logs {
     229           58 :             stat.produce_key(&val);
     230           58 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     231              :         }
     232          622 :         Ok(())
     233          622 :     }
     234              : }
     235              : 
     236              : #[derive(Debug, Serialize, Default)]
     237              : struct CompactionStatisticsNumSize {
     238              :     num: u64,
     239              :     size: u64,
     240              : }
     241              : 
     242              : #[derive(Debug, Serialize, Default)]
     243              : pub struct CompactionStatistics {
     244              :     delta_layer_visited: CompactionStatisticsNumSize,
     245              :     image_layer_visited: CompactionStatisticsNumSize,
     246              :     delta_layer_produced: CompactionStatisticsNumSize,
     247              :     image_layer_produced: CompactionStatisticsNumSize,
     248              :     num_delta_layer_discarded: usize,
     249              :     num_image_layer_discarded: usize,
     250              :     num_unique_keys_visited: usize,
     251              :     wal_keys_visited: CompactionStatisticsNumSize,
     252              :     image_keys_visited: CompactionStatisticsNumSize,
     253              :     wal_produced: CompactionStatisticsNumSize,
     254              :     image_produced: CompactionStatisticsNumSize,
     255              : }
     256              : 
     257              : impl CompactionStatistics {
     258         1042 :     fn estimated_size_of_value(val: &Value) -> usize {
     259          432 :         match val {
     260          610 :             Value::Image(img) => img.len(),
     261            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     262          432 :             _ => std::mem::size_of::<NeonWalRecord>(),
     263              :         }
     264         1042 :     }
     265         1636 :     fn estimated_size_of_key() -> usize {
     266         1636 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     267         1636 :     }
     268           86 :     fn visit_delta_layer(&mut self, size: u64) {
     269           86 :         self.delta_layer_visited.num += 1;
     270           86 :         self.delta_layer_visited.size += size;
     271           86 :     }
     272           66 :     fn visit_image_layer(&mut self, size: u64) {
     273           66 :         self.image_layer_visited.num += 1;
     274           66 :         self.image_layer_visited.size += size;
     275           66 :     }
     276          622 :     fn on_unique_key_visited(&mut self) {
     277          622 :         self.num_unique_keys_visited += 1;
     278          622 :     }
     279          240 :     fn visit_wal_key(&mut self, val: &Value) {
     280          240 :         self.wal_keys_visited.num += 1;
     281          240 :         self.wal_keys_visited.size +=
     282          240 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     283          240 :     }
     284          610 :     fn visit_image_key(&mut self, val: &Value) {
     285          610 :         self.image_keys_visited.num += 1;
     286          610 :         self.image_keys_visited.size +=
     287          610 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     288          610 :     }
     289          202 :     fn produce_key(&mut self, val: &Value) {
     290          202 :         match val {
     291           10 :             Value::Image(img) => self.produce_image_key(img),
     292          192 :             Value::WalRecord(_) => self.produce_wal_key(val),
     293              :         }
     294          202 :     }
     295          192 :     fn produce_wal_key(&mut self, val: &Value) {
     296          192 :         self.wal_produced.num += 1;
     297          192 :         self.wal_produced.size +=
     298          192 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     299          192 :     }
     300          594 :     fn produce_image_key(&mut self, val: &Bytes) {
     301          594 :         self.image_produced.num += 1;
     302          594 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     303          594 :     }
     304           14 :     fn discard_delta_layer(&mut self) {
     305           14 :         self.num_delta_layer_discarded += 1;
     306           14 :     }
     307            8 :     fn discard_image_layer(&mut self) {
     308            8 :         self.num_image_layer_discarded += 1;
     309            8 :     }
     310           22 :     fn produce_delta_layer(&mut self, size: u64) {
     311           22 :         self.delta_layer_produced.num += 1;
     312           22 :         self.delta_layer_produced.size += size;
     313           22 :     }
     314           30 :     fn produce_image_layer(&mut self, size: u64) {
     315           30 :         self.image_layer_produced.num += 1;
     316           30 :         self.image_layer_produced.size += size;
     317           30 :     }
     318              : }
     319              : 
     320              : impl Timeline {
     321              :     /// TODO: cancellation
     322              :     ///
     323              :     /// Returns whether the compaction has pending tasks.
     324          364 :     pub(crate) async fn compact_legacy(
     325          364 :         self: &Arc<Self>,
     326          364 :         cancel: &CancellationToken,
     327          364 :         options: CompactOptions,
     328          364 :         ctx: &RequestContext,
     329          364 :     ) -> Result<bool, CompactionError> {
     330          364 :         if options
     331          364 :             .flags
     332          364 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     333              :         {
     334            0 :             self.compact_with_gc(cancel, options, ctx)
     335            0 :                 .await
     336            0 :                 .map_err(CompactionError::Other)?;
     337            0 :             return Ok(false);
     338          364 :         }
     339          364 : 
     340          364 :         if options.flags.contains(CompactFlags::DryRun) {
     341            0 :             return Err(CompactionError::Other(anyhow!(
     342            0 :                 "dry-run mode is not supported for legacy compaction for now"
     343            0 :             )));
     344          364 :         }
     345          364 : 
     346          364 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
     347              :             // maybe useful in the future? could implement this at some point
     348            0 :             return Err(CompactionError::Other(anyhow!(
     349            0 :                 "compaction range is not supported for legacy compaction for now"
     350            0 :             )));
     351          364 :         }
     352          364 : 
     353          364 :         // High level strategy for compaction / image creation:
     354          364 :         //
     355          364 :         // 1. First, calculate the desired "partitioning" of the
     356          364 :         // currently in-use key space. The goal is to partition the
     357          364 :         // key space into roughly fixed-size chunks, but also take into
     358          364 :         // account any existing image layers, and try to align the
     359          364 :         // chunk boundaries with the existing image layers to avoid
     360          364 :         // too much churn. Also try to align chunk boundaries with
     361          364 :         // relation boundaries.  In principle, we don't know about
     362          364 :         // relation boundaries here, we just deal with key-value
     363          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     364          364 :         // map relations into key-value pairs. But in practice we know
     365          364 :         // that 'field6' is the block number, and the fields 1-5
     366          364 :         // identify a relation. This is just an optimization,
     367          364 :         // though.
     368          364 :         //
     369          364 :         // 2. Once we know the partitioning, for each partition,
     370          364 :         // decide if it's time to create a new image layer. The
     371          364 :         // criteria is: there has been too much "churn" since the last
     372          364 :         // image layer? The "churn" is fuzzy concept, it's a
     373          364 :         // combination of too many delta files, or too much WAL in
     374          364 :         // total in the delta file. Or perhaps: if creating an image
     375          364 :         // file would allow to delete some older files.
     376          364 :         //
     377          364 :         // 3. After that, we compact all level0 delta files if there
     378          364 :         // are too many of them.  While compacting, we also garbage
     379          364 :         // collect any page versions that are no longer needed because
     380          364 :         // of the new image layers we created in step 2.
     381          364 :         //
     382          364 :         // TODO: This high level strategy hasn't been implemented yet.
     383          364 :         // Below are functions compact_level0() and create_image_layers()
     384          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     385          364 :         // above. Rewrite it.
     386          364 : 
     387          364 :         // Is the timeline being deleted?
     388          364 :         if self.is_stopping() {
     389            0 :             trace!("Dropping out of compaction on timeline shutdown");
     390            0 :             return Err(CompactionError::ShuttingDown);
     391          364 :         }
     392          364 : 
     393          364 :         let target_file_size = self.get_checkpoint_distance();
     394              : 
     395              :         // Define partitioning schema if needed
     396              : 
     397              :         // FIXME: the match should only cover repartitioning, not the next steps
     398          364 :         let (partition_count, has_pending_tasks) = match self
     399          364 :             .repartition(
     400          364 :                 self.get_last_record_lsn(),
     401          364 :                 self.get_compaction_target_size(),
     402          364 :                 options.flags,
     403          364 :                 ctx,
     404          364 :             )
     405          364 :             .await
     406              :         {
     407          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     408          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     409          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     410          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     411          364 :                     .build();
     412          364 : 
     413          364 :                 // 2. Compact
     414          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     415          364 :                 let fully_compacted = self
     416          364 :                     .compact_level0(
     417          364 :                         target_file_size,
     418          364 :                         options.flags.contains(CompactFlags::ForceL0Compaction),
     419          364 :                         ctx,
     420          364 :                     )
     421          364 :                     .await?;
     422          364 :                 timer.stop_and_record();
     423          364 : 
     424          364 :                 let mut partitioning = dense_partitioning;
     425          364 :                 partitioning
     426          364 :                     .parts
     427          364 :                     .extend(sparse_partitioning.into_dense().parts);
     428          364 : 
     429          364 :                 // 3. Create new image layers for partitions that have been modified
     430          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     431          364 :                 if fully_compacted {
     432          364 :                     let image_layers = self
     433          364 :                         .create_image_layers(
     434          364 :                             &partitioning,
     435          364 :                             lsn,
     436          364 :                             if options
     437          364 :                                 .flags
     438          364 :                                 .contains(CompactFlags::ForceImageLayerCreation)
     439              :                             {
     440           14 :                                 ImageLayerCreationMode::Force
     441              :                             } else {
     442          350 :                                 ImageLayerCreationMode::Try
     443              :                             },
     444          364 :                             &image_ctx,
     445          364 :                         )
     446          364 :                         .await?;
     447              : 
     448          364 :                     self.upload_new_image_layers(image_layers)?;
     449              :                 } else {
     450            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     451              :                 }
     452          364 :                 (partitioning.parts.len(), !fully_compacted)
     453              :             }
     454            0 :             Err(err) => {
     455            0 :                 // no partitioning? This is normal, if the timeline was just created
     456            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     457            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     458            0 :                 // error but continue.
     459            0 :                 //
     460            0 :                 // Suppress error when it's due to cancellation
     461            0 :                 if !self.cancel.is_cancelled() && !err.is_cancelled() {
     462            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     463            0 :                 }
     464            0 :                 (1, false)
     465              :             }
     466              :         };
     467              : 
     468          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     469              :             // Limit the number of layer rewrites to the number of partitions: this means its
     470              :             // runtime should be comparable to a full round of image layer creations, rather than
     471              :             // being potentially much longer.
     472            0 :             let rewrite_max = partition_count;
     473            0 : 
     474            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     475          364 :         }
     476              : 
     477          364 :         Ok(has_pending_tasks)
     478          364 :     }
     479              : 
     480              :     /// Check for layers that are elegible to be rewritten:
     481              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     482              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     483              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     484              :     ///   rather not maintain compatibility with indefinitely.
     485              :     ///
     486              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     487              :     /// how much work it will try to do in each compaction pass.
     488            0 :     async fn compact_shard_ancestors(
     489            0 :         self: &Arc<Self>,
     490            0 :         rewrite_max: usize,
     491            0 :         ctx: &RequestContext,
     492            0 :     ) -> Result<(), CompactionError> {
     493            0 :         let mut drop_layers = Vec::new();
     494            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     495            0 : 
     496            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     497            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     498            0 :         // pitr_interval, for example because a branchpoint references it.
     499            0 :         //
     500            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     501            0 :         // are rewriting layers.
     502            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     503            0 : 
     504            0 :         tracing::info!(
     505            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     506            0 :             *latest_gc_cutoff,
     507            0 :             self.gc_info.read().unwrap().cutoffs.time
     508              :         );
     509              : 
     510            0 :         let layers = self.layers.read().await;
     511            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     512            0 :             let layer = layers.get_from_desc(&layer_desc);
     513            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     514              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     515            0 :                 continue;
     516            0 :             }
     517            0 : 
     518            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     519            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     520            0 :             let layer_local_page_count = sharded_range.page_count();
     521            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     522            0 :             if layer_local_page_count == 0 {
     523              :                 // This ancestral layer only covers keys that belong to other shards.
     524              :                 // We include the full metadata in the log: if we had some critical bug that caused
     525              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     526            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     527            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     528              :                 );
     529              : 
     530            0 :                 if cfg!(debug_assertions) {
     531              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     532              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     533              :                     // should be !is_key_disposable()
     534            0 :                     let range = layer_desc.get_key_range();
     535            0 :                     let mut key = range.start;
     536            0 :                     while key < range.end {
     537            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     538            0 :                         key = key.next();
     539              :                     }
     540            0 :                 }
     541              : 
     542            0 :                 drop_layers.push(layer);
     543            0 :                 continue;
     544            0 :             } else if layer_local_page_count != u32::MAX
     545            0 :                 && layer_local_page_count == layer_raw_page_count
     546              :             {
     547            0 :                 debug!(%layer,
     548            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     549              :                     layer_local_page_count
     550              :                 );
     551            0 :                 continue;
     552            0 :             }
     553            0 : 
     554            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     555            0 :             if layer_local_page_count != u32::MAX
     556            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     557              :             {
     558            0 :                 debug!(%layer,
     559            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     560              :                     layer_local_page_count,
     561              :                     layer_raw_page_count
     562              :                 );
     563            0 :             }
     564              : 
     565              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     566              :             // without incurring the I/O cost of a rewrite.
     567            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     568            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     569            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     570            0 :                 continue;
     571            0 :             }
     572            0 : 
     573            0 :             if layer_desc.is_delta() {
     574              :                 // We do not yet implement rewrite of delta layers
     575            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     576            0 :                 continue;
     577            0 :             }
     578            0 : 
     579            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     580            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     581            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     582            0 :             if layer.metadata().generation == self.generation {
     583            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     584            0 :                 continue;
     585            0 :             }
     586            0 : 
     587            0 :             if layers_to_rewrite.len() >= rewrite_max {
     588            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     589            0 :                     layers_to_rewrite.len()
     590              :                 );
     591            0 :                 continue;
     592            0 :             }
     593            0 : 
     594            0 :             // Fall through: all our conditions for doing a rewrite passed.
     595            0 :             layers_to_rewrite.push(layer);
     596              :         }
     597              : 
     598              :         // Drop read lock on layer map before we start doing time-consuming I/O
     599            0 :         drop(layers);
     600            0 : 
     601            0 :         let mut replace_image_layers = Vec::new();
     602              : 
     603            0 :         for layer in layers_to_rewrite {
     604            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     605            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     606            0 :                 self.conf,
     607            0 :                 self.timeline_id,
     608            0 :                 self.tenant_shard_id,
     609            0 :                 &layer.layer_desc().key_range,
     610            0 :                 layer.layer_desc().image_layer_lsn(),
     611            0 :                 ctx,
     612            0 :             )
     613            0 :             .await
     614            0 :             .map_err(CompactionError::Other)?;
     615              : 
     616              :             // Safety of layer rewrites:
     617              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     618              :             //   cannot interfere with the new one.
     619              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     620              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     621              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     622              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     623              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     624              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     625              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     626              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     627              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     628              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     629            0 :             let resident = layer.download_and_keep_resident().await?;
     630              : 
     631            0 :             let keys_written = resident
     632            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     633            0 :                 .await?;
     634              : 
     635            0 :             if keys_written > 0 {
     636            0 :                 let (desc, path) = image_layer_writer
     637            0 :                     .finish(ctx)
     638            0 :                     .await
     639            0 :                     .map_err(CompactionError::Other)?;
     640            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     641            0 :                     .map_err(CompactionError::Other)?;
     642            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     643            0 :                     layer.metadata().file_size,
     644            0 :                     new_layer.metadata().file_size);
     645              : 
     646            0 :                 replace_image_layers.push((layer, new_layer));
     647            0 :             } else {
     648            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     649            0 :                 // the layer has no data for us with the ShardedRange check above, but
     650            0 :                 drop_layers.push(layer);
     651            0 :             }
     652              :         }
     653              : 
     654              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     655              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     656              :         // to remote index) and be removed. This is inefficient but safe.
     657            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     658            0 : 
     659            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     660            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     661            0 :             .await?;
     662              : 
     663            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     664            0 : 
     665            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     666            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     667            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     668            0 :         // load.
     669            0 :         match self.remote_client.wait_completion().await {
     670            0 :             Ok(()) => (),
     671            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     672              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     673            0 :                 return Err(CompactionError::ShuttingDown)
     674              :             }
     675              :         }
     676              : 
     677            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     678            0 : 
     679            0 :         Ok(())
     680            0 :     }
     681              : 
     682              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     683              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     684              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     685              :     ///
     686              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     687              :     /// that we know won't be needed for reads.
     688          202 :     pub(super) async fn update_layer_visibility(
     689          202 :         &self,
     690          202 :     ) -> Result<(), super::layer_manager::Shutdown> {
     691          202 :         let head_lsn = self.get_last_record_lsn();
     692              : 
     693              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     694              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     695              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     696              :         // they will be subject to L0->L1 compaction in the near future.
     697          202 :         let layer_manager = self.layers.read().await;
     698          202 :         let layer_map = layer_manager.layer_map()?;
     699              : 
     700          202 :         let readable_points = {
     701          202 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     702          202 : 
     703          202 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     704          202 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
     705            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
     706            0 :                     continue;
     707            0 :                 }
     708            0 :                 readable_points.push(*child_lsn);
     709              :             }
     710          202 :             readable_points.push(head_lsn);
     711          202 :             readable_points
     712          202 :         };
     713          202 : 
     714          202 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     715          524 :         for (layer_desc, visibility) in layer_visibility {
     716          322 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     717          322 :             let layer = layer_manager.get_from_desc(&layer_desc);
     718          322 :             layer.set_visibility(visibility);
     719          322 :         }
     720              : 
     721              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     722              :         // avoid assuming that everything at a branch point is visible.
     723          202 :         drop(covered);
     724          202 :         Ok(())
     725          202 :     }
     726              : 
     727              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     728              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     729          364 :     async fn compact_level0(
     730          364 :         self: &Arc<Self>,
     731          364 :         target_file_size: u64,
     732          364 :         force_compaction_ignore_threshold: bool,
     733          364 :         ctx: &RequestContext,
     734          364 :     ) -> Result<bool, CompactionError> {
     735              :         let CompactLevel0Phase1Result {
     736          364 :             new_layers,
     737          364 :             deltas_to_compact,
     738          364 :             fully_compacted,
     739              :         } = {
     740          364 :             let phase1_span = info_span!("compact_level0_phase1");
     741          364 :             let ctx = ctx.attached_child();
     742          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     743          364 :                 version: Some(2),
     744          364 :                 tenant_id: Some(self.tenant_shard_id),
     745          364 :                 timeline_id: Some(self.timeline_id),
     746          364 :                 ..Default::default()
     747          364 :             };
     748          364 : 
     749          364 :             let begin = tokio::time::Instant::now();
     750          364 :             let phase1_layers_locked = self.layers.read().await;
     751          364 :             let now = tokio::time::Instant::now();
     752          364 :             stats.read_lock_acquisition_micros =
     753          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     754          364 :             self.compact_level0_phase1(
     755          364 :                 phase1_layers_locked,
     756          364 :                 stats,
     757          364 :                 target_file_size,
     758          364 :                 force_compaction_ignore_threshold,
     759          364 :                 &ctx,
     760          364 :             )
     761          364 :             .instrument(phase1_span)
     762          364 :             .await?
     763              :         };
     764              : 
     765          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     766              :             // nothing to do
     767          336 :             return Ok(true);
     768           28 :         }
     769           28 : 
     770           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     771           28 :             .await?;
     772           28 :         Ok(fully_compacted)
     773          364 :     }
     774              : 
     775              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     776          364 :     async fn compact_level0_phase1<'a>(
     777          364 :         self: &'a Arc<Self>,
     778          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     779          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     780          364 :         target_file_size: u64,
     781          364 :         force_compaction_ignore_threshold: bool,
     782          364 :         ctx: &RequestContext,
     783          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     784          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     785          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     786          364 :         let layers = guard.layer_map()?;
     787          364 :         let level0_deltas = layers.level0_deltas();
     788          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     789          364 : 
     790          364 :         // Only compact if enough layers have accumulated.
     791          364 :         let threshold = self.get_compaction_threshold();
     792          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     793          336 :             if force_compaction_ignore_threshold {
     794            0 :                 if !level0_deltas.is_empty() {
     795            0 :                     info!(
     796            0 :                         level0_deltas = level0_deltas.len(),
     797            0 :                         threshold, "too few deltas to compact, but forcing compaction"
     798              :                     );
     799              :                 } else {
     800            0 :                     info!(
     801            0 :                         level0_deltas = level0_deltas.len(),
     802            0 :                         threshold, "too few deltas to compact, cannot force compaction"
     803              :                     );
     804            0 :                     return Ok(CompactLevel0Phase1Result::default());
     805              :                 }
     806              :             } else {
     807          336 :                 debug!(
     808            0 :                     level0_deltas = level0_deltas.len(),
     809            0 :                     threshold, "too few deltas to compact"
     810              :                 );
     811          336 :                 return Ok(CompactLevel0Phase1Result::default());
     812              :             }
     813           28 :         }
     814              : 
     815           28 :         let mut level0_deltas = level0_deltas
     816           28 :             .iter()
     817          402 :             .map(|x| guard.get_from_desc(x))
     818           28 :             .collect::<Vec<_>>();
     819           28 : 
     820           28 :         // Gather the files to compact in this iteration.
     821           28 :         //
     822           28 :         // Start with the oldest Level 0 delta file, and collect any other
     823           28 :         // level 0 files that form a contiguous sequence, such that the end
     824           28 :         // LSN of previous file matches the start LSN of the next file.
     825           28 :         //
     826           28 :         // Note that if the files don't form such a sequence, we might
     827           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     828           28 :         // us to get rid of the level 0 file, and compact the other files on
     829           28 :         // the next iteration. This could probably made smarter, but such
     830           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     831           28 :         // of a crash, partial download from cloud storage, or something like
     832           28 :         // that, so it's not a big deal in practice.
     833          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     834           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     835           28 : 
     836           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     837           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     838           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     839           28 : 
     840           28 :         // Accumulate the size of layers in `deltas_to_compact`
     841           28 :         let mut deltas_to_compact_bytes = 0;
     842           28 : 
     843           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     844           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     845           28 :         // work in this function to only operate on this much delta data at once.
     846           28 :         //
     847           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     848           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     849           28 :         // still let them compact a full stack of L0s in one go.
     850           28 :         let delta_size_limit = std::cmp::max(
     851           28 :             self.get_compaction_threshold(),
     852           28 :             DEFAULT_COMPACTION_THRESHOLD,
     853           28 :         ) as u64
     854           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     855           28 : 
     856           28 :         let mut fully_compacted = true;
     857           28 : 
     858           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     859          402 :         for l in level0_deltas_iter {
     860          374 :             let lsn_range = &l.layer_desc().lsn_range;
     861          374 : 
     862          374 :             if lsn_range.start != prev_lsn_end {
     863            0 :                 break;
     864          374 :             }
     865          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     866          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     867          374 :             prev_lsn_end = lsn_range.end;
     868          374 : 
     869          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     870            0 :                 info!(
     871            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     872            0 :                     l0_deltas_total = level0_deltas.len(),
     873            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     874              :                     delta_size_limit
     875              :                 );
     876            0 :                 fully_compacted = false;
     877            0 : 
     878            0 :                 // Proceed with compaction, but only a subset of L0s
     879            0 :                 break;
     880          374 :             }
     881              :         }
     882           28 :         let lsn_range = Range {
     883           28 :             start: deltas_to_compact
     884           28 :                 .first()
     885           28 :                 .unwrap()
     886           28 :                 .layer_desc()
     887           28 :                 .lsn_range
     888           28 :                 .start,
     889           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     890           28 :         };
     891           28 : 
     892           28 :         info!(
     893            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     894            0 :             lsn_range.start,
     895            0 :             lsn_range.end,
     896            0 :             deltas_to_compact.len(),
     897            0 :             level0_deltas.len()
     898              :         );
     899              : 
     900          402 :         for l in deltas_to_compact.iter() {
     901          402 :             info!("compact includes {l}");
     902              :         }
     903              : 
     904              :         // We don't need the original list of layers anymore. Drop it so that
     905              :         // we don't accidentally use it later in the function.
     906           28 :         drop(level0_deltas);
     907           28 : 
     908           28 :         stats.read_lock_held_prerequisites_micros = stats
     909           28 :             .read_lock_held_spawn_blocking_startup_micros
     910           28 :             .till_now();
     911              : 
     912              :         // TODO: replace with streaming k-merge
     913           28 :         let all_keys = {
     914           28 :             let mut all_keys = Vec::new();
     915          402 :             for l in deltas_to_compact.iter() {
     916          402 :                 if self.cancel.is_cancelled() {
     917            0 :                     return Err(CompactionError::ShuttingDown);
     918          402 :                 }
     919          402 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     920          402 :                 let keys = delta
     921          402 :                     .index_entries(ctx)
     922          402 :                     .await
     923          402 :                     .map_err(CompactionError::Other)?;
     924          402 :                 all_keys.extend(keys);
     925              :             }
     926              :             // The current stdlib sorting implementation is designed in a way where it is
     927              :             // particularly fast where the slice is made up of sorted sub-ranges.
     928      4423826 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     929           28 :             all_keys
     930           28 :         };
     931           28 : 
     932           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     933              : 
     934              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     935              :         //
     936              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     937              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     938              :         // cover the hole, but actually don't contain any WAL records for that key range.
     939              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     940              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     941              :         //
     942              :         // The algorithm chooses holes as follows.
     943              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     944              :         // - Filter: min threshold on range length
     945              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     946              :         //
     947              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     948              :         #[derive(PartialEq, Eq)]
     949              :         struct Hole {
     950              :             key_range: Range<Key>,
     951              :             coverage_size: usize,
     952              :         }
     953           28 :         let holes: Vec<Hole> = {
     954              :             use std::cmp::Ordering;
     955              :             impl Ord for Hole {
     956            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     957            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     958            0 :                 }
     959              :             }
     960              :             impl PartialOrd for Hole {
     961            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     962            0 :                     Some(self.cmp(other))
     963            0 :                 }
     964              :             }
     965           28 :             let max_holes = deltas_to_compact.len();
     966           28 :             let last_record_lsn = self.get_last_record_lsn();
     967           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     968           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     969           28 :                                             // min-heap (reserve space for one more element added before eviction)
     970           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     971           28 :             let mut prev: Option<Key> = None;
     972              : 
     973      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     974      2064038 :                 if let Some(prev_key) = prev {
     975              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     976              :                     // compaction is the gap between data key and metadata keys.
     977      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     978            0 :                         && !Key::is_metadata_key(&prev_key)
     979              :                     {
     980            0 :                         let key_range = prev_key..next_key;
     981            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     982            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     983            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     984            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     985            0 :                         let coverage_size =
     986            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     987            0 :                         if coverage_size >= min_hole_coverage_size {
     988            0 :                             heap.push(Hole {
     989            0 :                                 key_range,
     990            0 :                                 coverage_size,
     991            0 :                             });
     992            0 :                             if heap.len() > max_holes {
     993            0 :                                 heap.pop(); // remove smallest hole
     994            0 :                             }
     995            0 :                         }
     996      2064010 :                     }
     997           28 :                 }
     998      2064038 :                 prev = Some(next_key.next());
     999              :             }
    1000           28 :             let mut holes = heap.into_vec();
    1001           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1002           28 :             holes
    1003           28 :         };
    1004           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1005           28 :         drop_rlock(guard);
    1006           28 : 
    1007           28 :         if self.cancel.is_cancelled() {
    1008            0 :             return Err(CompactionError::ShuttingDown);
    1009           28 :         }
    1010           28 : 
    1011           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1012              : 
    1013              :         // This iterator walks through all key-value pairs from all the layers
    1014              :         // we're compacting, in key, LSN order.
    1015              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1016              :         // then the Value::Image is ordered before Value::WalRecord.
    1017           28 :         let mut all_values_iter = {
    1018           28 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1019          402 :             for l in deltas_to_compact.iter() {
    1020          402 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1021          402 :                 deltas.push(l);
    1022              :             }
    1023           28 :             MergeIterator::create(&deltas, &[], ctx)
    1024           28 :         };
    1025           28 : 
    1026           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1027           28 :         let mut all_keys_iter = all_keys
    1028           28 :             .iter()
    1029      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1030      2064010 :             .coalesce(|mut prev, cur| {
    1031      2064010 :                 // Coalesce keys that belong to the same key pair.
    1032      2064010 :                 // This ensures that compaction doesn't put them
    1033      2064010 :                 // into different layer files.
    1034      2064010 :                 // Still limit this by the target file size,
    1035      2064010 :                 // so that we keep the size of the files in
    1036      2064010 :                 // check.
    1037      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1038        40038 :                     prev.2 += cur.2;
    1039        40038 :                     Ok(prev)
    1040              :                 } else {
    1041      2023972 :                     Err((prev, cur))
    1042              :                 }
    1043      2064010 :             });
    1044           28 : 
    1045           28 :         // Merge the contents of all the input delta layers into a new set
    1046           28 :         // of delta layers, based on the current partitioning.
    1047           28 :         //
    1048           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1049           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1050           28 :         // would be too large. In that case, we also split on the LSN dimension.
    1051           28 :         //
    1052           28 :         // LSN
    1053           28 :         //  ^
    1054           28 :         //  |
    1055           28 :         //  | +-----------+            +--+--+--+--+
    1056           28 :         //  | |           |            |  |  |  |  |
    1057           28 :         //  | +-----------+            |  |  |  |  |
    1058           28 :         //  | |           |            |  |  |  |  |
    1059           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1060           28 :         //  | |           |            |  |  |  |  |
    1061           28 :         //  | +-----------+            |  |  |  |  |
    1062           28 :         //  | |           |            |  |  |  |  |
    1063           28 :         //  | +-----------+            +--+--+--+--+
    1064           28 :         //  |
    1065           28 :         //  +--------------> key
    1066           28 :         //
    1067           28 :         //
    1068           28 :         // If one key (X) has a lot of page versions:
    1069           28 :         //
    1070           28 :         // LSN
    1071           28 :         //  ^
    1072           28 :         //  |                                 (X)
    1073           28 :         //  | +-----------+            +--+--+--+--+
    1074           28 :         //  | |           |            |  |  |  |  |
    1075           28 :         //  | +-----------+            |  |  +--+  |
    1076           28 :         //  | |           |            |  |  |  |  |
    1077           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1078           28 :         //  | |           |            |  |  +--+  |
    1079           28 :         //  | +-----------+            |  |  |  |  |
    1080           28 :         //  | |           |            |  |  |  |  |
    1081           28 :         //  | +-----------+            +--+--+--+--+
    1082           28 :         //  |
    1083           28 :         //  +--------------> key
    1084           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1085           28 :         // based on the partitioning.
    1086           28 :         //
    1087           28 :         // TODO: we should also opportunistically materialize and
    1088           28 :         // garbage collect what we can.
    1089           28 :         let mut new_layers = Vec::new();
    1090           28 :         let mut prev_key: Option<Key> = None;
    1091           28 :         let mut writer: Option<DeltaLayerWriter> = None;
    1092           28 :         let mut key_values_total_size = 0u64;
    1093           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1094           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1095           28 :         let mut next_hole = 0; // index of next hole in holes vector
    1096           28 : 
    1097           28 :         let mut keys = 0;
    1098              : 
    1099      2064066 :         while let Some((key, lsn, value)) = all_values_iter
    1100      2064066 :             .next()
    1101      2064066 :             .await
    1102      2064066 :             .map_err(CompactionError::Other)?
    1103              :         {
    1104      2064038 :             keys += 1;
    1105      2064038 : 
    1106      2064038 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1107              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1108              :                 // shuffling an order of million keys per layer, this means we'll check it
    1109              :                 // around tens of times per layer.
    1110            0 :                 return Err(CompactionError::ShuttingDown);
    1111      2064038 :             }
    1112      2064038 : 
    1113      2064038 :             let same_key = prev_key == Some(key);
    1114      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1115      2064038 :             if !same_key || lsn == dup_end_lsn {
    1116      2024000 :                 let mut next_key_size = 0u64;
    1117      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1118      2024000 :                 dup_start_lsn = Lsn::INVALID;
    1119      2024000 :                 if !same_key {
    1120      2024000 :                     dup_end_lsn = Lsn::INVALID;
    1121      2024000 :                 }
    1122              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1123      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1124      2024000 :                     next_key_size = next_size;
    1125      2024000 :                     if key != next_key {
    1126      2023972 :                         if dup_end_lsn.is_valid() {
    1127            0 :                             // We are writting segment with duplicates:
    1128            0 :                             // place all remaining values of this key in separate segment
    1129            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1130            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1131      2023972 :                         }
    1132      2023972 :                         break;
    1133           28 :                     }
    1134           28 :                     key_values_total_size += next_size;
    1135           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1136           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1137           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1138              :                         // Split key between multiple layers: such layer can contain only single key
    1139            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1140            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1141              :                         } else {
    1142            0 :                             lsn // start with the first LSN for this key
    1143              :                         };
    1144            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1145            0 :                         break;
    1146           28 :                     }
    1147              :                 }
    1148              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1149      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1150            0 :                     dup_start_lsn = dup_end_lsn;
    1151            0 :                     dup_end_lsn = lsn_range.end;
    1152      2024000 :                 }
    1153      2024000 :                 if writer.is_some() {
    1154      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1155      2023972 :                     let contains_hole =
    1156      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1157              :                     // check if key cause layer overflow or contains hole...
    1158      2023972 :                     if is_dup_layer
    1159      2023972 :                         || dup_end_lsn.is_valid()
    1160      2023972 :                         || written_size + key_values_total_size > target_file_size
    1161      2023692 :                         || contains_hole
    1162              :                     {
    1163              :                         // ... if so, flush previous layer and prepare to write new one
    1164          280 :                         let (desc, path) = writer
    1165          280 :                             .take()
    1166          280 :                             .unwrap()
    1167          280 :                             .finish(prev_key.unwrap().next(), ctx)
    1168          280 :                             .await
    1169          280 :                             .map_err(CompactionError::Other)?;
    1170          280 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1171          280 :                             .map_err(CompactionError::Other)?;
    1172              : 
    1173          280 :                         new_layers.push(new_delta);
    1174          280 :                         writer = None;
    1175          280 : 
    1176          280 :                         if contains_hole {
    1177            0 :                             // skip hole
    1178            0 :                             next_hole += 1;
    1179          280 :                         }
    1180      2023692 :                     }
    1181           28 :                 }
    1182              :                 // Remember size of key value because at next iteration we will access next item
    1183      2024000 :                 key_values_total_size = next_key_size;
    1184        40038 :             }
    1185      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1186            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1187            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1188            0 :                 )))
    1189      2064038 :             });
    1190              : 
    1191      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1192      2064038 :                 if writer.is_none() {
    1193          308 :                     if self.cancel.is_cancelled() {
    1194              :                         // to be somewhat responsive to cancellation, check for each new layer
    1195            0 :                         return Err(CompactionError::ShuttingDown);
    1196          308 :                     }
    1197              :                     // Create writer if not initiaized yet
    1198          308 :                     writer = Some(
    1199              :                         DeltaLayerWriter::new(
    1200          308 :                             self.conf,
    1201          308 :                             self.timeline_id,
    1202          308 :                             self.tenant_shard_id,
    1203          308 :                             key,
    1204          308 :                             if dup_end_lsn.is_valid() {
    1205              :                                 // this is a layer containing slice of values of the same key
    1206            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1207            0 :                                 dup_start_lsn..dup_end_lsn
    1208              :                             } else {
    1209          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1210          308 :                                 lsn_range.clone()
    1211              :                             },
    1212          308 :                             ctx,
    1213          308 :                         )
    1214          308 :                         .await
    1215          308 :                         .map_err(CompactionError::Other)?,
    1216              :                     );
    1217              : 
    1218          308 :                     keys = 0;
    1219      2063730 :                 }
    1220              : 
    1221      2064038 :                 writer
    1222      2064038 :                     .as_mut()
    1223      2064038 :                     .unwrap()
    1224      2064038 :                     .put_value(key, lsn, value, ctx)
    1225      2064038 :                     .await
    1226      2064038 :                     .map_err(CompactionError::Other)?;
    1227              :             } else {
    1228            0 :                 let shard = self.shard_identity.shard_index();
    1229            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1230            0 :                 if cfg!(debug_assertions) {
    1231            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    1232            0 :                 }
    1233            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1234              :             }
    1235              : 
    1236      2064038 :             if !new_layers.is_empty() {
    1237        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1238      2044252 :             }
    1239              : 
    1240      2064038 :             prev_key = Some(key);
    1241              :         }
    1242           28 :         if let Some(writer) = writer {
    1243           28 :             let (desc, path) = writer
    1244           28 :                 .finish(prev_key.unwrap().next(), ctx)
    1245           28 :                 .await
    1246           28 :                 .map_err(CompactionError::Other)?;
    1247           28 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1248           28 :                 .map_err(CompactionError::Other)?;
    1249           28 :             new_layers.push(new_delta);
    1250            0 :         }
    1251              : 
    1252              :         // Sync layers
    1253           28 :         if !new_layers.is_empty() {
    1254              :             // Print a warning if the created layer is larger than double the target size
    1255              :             // Add two pages for potential overhead. This should in theory be already
    1256              :             // accounted for in the target calculation, but for very small targets,
    1257              :             // we still might easily hit the limit otherwise.
    1258           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1259          308 :             for layer in new_layers.iter() {
    1260          308 :                 if layer.layer_desc().file_size > warn_limit {
    1261            0 :                     warn!(
    1262              :                         %layer,
    1263            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1264              :                     );
    1265          308 :                 }
    1266              :             }
    1267              : 
    1268              :             // The writer.finish() above already did the fsync of the inodes.
    1269              :             // We just need to fsync the directory in which these inodes are linked,
    1270              :             // which we know to be the timeline directory.
    1271              :             //
    1272              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1273              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1274              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1275           28 :             let timeline_dir = VirtualFile::open(
    1276           28 :                 &self
    1277           28 :                     .conf
    1278           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1279           28 :                 ctx,
    1280           28 :             )
    1281           28 :             .await
    1282           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1283           28 :             timeline_dir
    1284           28 :                 .sync_all()
    1285           28 :                 .await
    1286           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1287            0 :         }
    1288              : 
    1289           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1290           28 :         stats.new_deltas_count = Some(new_layers.len());
    1291          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1292           28 : 
    1293           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1294           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1295              :         {
    1296           28 :             Ok(stats_json) => {
    1297           28 :                 info!(
    1298            0 :                     stats_json = stats_json.as_str(),
    1299            0 :                     "compact_level0_phase1 stats available"
    1300              :                 )
    1301              :             }
    1302            0 :             Err(e) => {
    1303            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1304              :             }
    1305              :         }
    1306              : 
    1307              :         // Without this, rustc complains about deltas_to_compact still
    1308              :         // being borrowed when we `.into_iter()` below.
    1309           28 :         drop(all_values_iter);
    1310           28 : 
    1311           28 :         Ok(CompactLevel0Phase1Result {
    1312           28 :             new_layers,
    1313           28 :             deltas_to_compact: deltas_to_compact
    1314           28 :                 .into_iter()
    1315          402 :                 .map(|x| x.drop_eviction_guard())
    1316           28 :                 .collect::<Vec<_>>(),
    1317           28 :             fully_compacted,
    1318           28 :         })
    1319          364 :     }
    1320              : }
    1321              : 
    1322              : #[derive(Default)]
    1323              : struct CompactLevel0Phase1Result {
    1324              :     new_layers: Vec<ResidentLayer>,
    1325              :     deltas_to_compact: Vec<Layer>,
    1326              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1327              :     // L0 compaction size limit.
    1328              :     fully_compacted: bool,
    1329              : }
    1330              : 
    1331              : #[derive(Default)]
    1332              : struct CompactLevel0Phase1StatsBuilder {
    1333              :     version: Option<u64>,
    1334              :     tenant_id: Option<TenantShardId>,
    1335              :     timeline_id: Option<TimelineId>,
    1336              :     read_lock_acquisition_micros: DurationRecorder,
    1337              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1338              :     read_lock_held_key_sort_micros: DurationRecorder,
    1339              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1340              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1341              :     read_lock_drop_micros: DurationRecorder,
    1342              :     write_layer_files_micros: DurationRecorder,
    1343              :     level0_deltas_count: Option<usize>,
    1344              :     new_deltas_count: Option<usize>,
    1345              :     new_deltas_size: Option<u64>,
    1346              : }
    1347              : 
    1348              : #[derive(serde::Serialize)]
    1349              : struct CompactLevel0Phase1Stats {
    1350              :     version: u64,
    1351              :     tenant_id: TenantShardId,
    1352              :     timeline_id: TimelineId,
    1353              :     read_lock_acquisition_micros: RecordedDuration,
    1354              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1355              :     read_lock_held_key_sort_micros: RecordedDuration,
    1356              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1357              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1358              :     read_lock_drop_micros: RecordedDuration,
    1359              :     write_layer_files_micros: RecordedDuration,
    1360              :     level0_deltas_count: usize,
    1361              :     new_deltas_count: usize,
    1362              :     new_deltas_size: u64,
    1363              : }
    1364              : 
    1365              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1366              :     type Error = anyhow::Error;
    1367              : 
    1368           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1369           28 :         Ok(Self {
    1370           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1371           28 :             tenant_id: value
    1372           28 :                 .tenant_id
    1373           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1374           28 :             timeline_id: value
    1375           28 :                 .timeline_id
    1376           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1377           28 :             read_lock_acquisition_micros: value
    1378           28 :                 .read_lock_acquisition_micros
    1379           28 :                 .into_recorded()
    1380           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1381           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1382           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1383           28 :                 .into_recorded()
    1384           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1385           28 :             read_lock_held_key_sort_micros: value
    1386           28 :                 .read_lock_held_key_sort_micros
    1387           28 :                 .into_recorded()
    1388           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1389           28 :             read_lock_held_prerequisites_micros: value
    1390           28 :                 .read_lock_held_prerequisites_micros
    1391           28 :                 .into_recorded()
    1392           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1393           28 :             read_lock_held_compute_holes_micros: value
    1394           28 :                 .read_lock_held_compute_holes_micros
    1395           28 :                 .into_recorded()
    1396           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1397           28 :             read_lock_drop_micros: value
    1398           28 :                 .read_lock_drop_micros
    1399           28 :                 .into_recorded()
    1400           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1401           28 :             write_layer_files_micros: value
    1402           28 :                 .write_layer_files_micros
    1403           28 :                 .into_recorded()
    1404           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1405           28 :             level0_deltas_count: value
    1406           28 :                 .level0_deltas_count
    1407           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1408           28 :             new_deltas_count: value
    1409           28 :                 .new_deltas_count
    1410           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1411           28 :             new_deltas_size: value
    1412           28 :                 .new_deltas_size
    1413           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1414              :         })
    1415           28 :     }
    1416              : }
    1417              : 
    1418              : impl Timeline {
    1419              :     /// Entry point for new tiered compaction algorithm.
    1420              :     ///
    1421              :     /// All the real work is in the implementation in the pageserver_compaction
    1422              :     /// crate. The code here would apply to any algorithm implemented by the
    1423              :     /// same interface, but tiered is the only one at the moment.
    1424              :     ///
    1425              :     /// TODO: cancellation
    1426            0 :     pub(crate) async fn compact_tiered(
    1427            0 :         self: &Arc<Self>,
    1428            0 :         _cancel: &CancellationToken,
    1429            0 :         ctx: &RequestContext,
    1430            0 :     ) -> Result<(), CompactionError> {
    1431            0 :         let fanout = self.get_compaction_threshold() as u64;
    1432            0 :         let target_file_size = self.get_checkpoint_distance();
    1433              : 
    1434              :         // Find the top of the historical layers
    1435            0 :         let end_lsn = {
    1436            0 :             let guard = self.layers.read().await;
    1437            0 :             let layers = guard.layer_map()?;
    1438              : 
    1439            0 :             let l0_deltas = layers.level0_deltas();
    1440            0 : 
    1441            0 :             // As an optimization, if we find that there are too few L0 layers,
    1442            0 :             // bail out early. We know that the compaction algorithm would do
    1443            0 :             // nothing in that case.
    1444            0 :             if l0_deltas.len() < fanout as usize {
    1445              :                 // doesn't need compacting
    1446            0 :                 return Ok(());
    1447            0 :             }
    1448            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1449            0 :         };
    1450            0 : 
    1451            0 :         // Is the timeline being deleted?
    1452            0 :         if self.is_stopping() {
    1453            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1454            0 :             return Err(CompactionError::ShuttingDown);
    1455            0 :         }
    1456              : 
    1457            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1458              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1459            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1460            0 : 
    1461            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1462            0 :             &mut adaptor,
    1463            0 :             end_lsn,
    1464            0 :             target_file_size,
    1465            0 :             fanout,
    1466            0 :             ctx,
    1467            0 :         )
    1468            0 :         .await
    1469              :         // TODO: compact_tiered needs to return CompactionError
    1470            0 :         .map_err(CompactionError::Other)?;
    1471              : 
    1472            0 :         adaptor.flush_updates().await?;
    1473            0 :         Ok(())
    1474            0 :     }
    1475              : 
    1476              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1477              :     ///
    1478              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1479              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1480              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1481              :     ///
    1482              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1483              :     ///
    1484              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1485              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1486              :     ///
    1487              :     /// The function will produce:
    1488              :     ///
    1489              :     /// ```plain
    1490              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1491              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1492              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1493              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1494              :     /// ```
    1495              :     ///
    1496              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1497          630 :     pub(crate) async fn generate_key_retention(
    1498          630 :         self: &Arc<Timeline>,
    1499          630 :         key: Key,
    1500          630 :         full_history: &[(Key, Lsn, Value)],
    1501          630 :         horizon: Lsn,
    1502          630 :         retain_lsn_below_horizon: &[Lsn],
    1503          630 :         delta_threshold_cnt: usize,
    1504          630 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1505          630 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1506          630 :         // Pre-checks for the invariants
    1507          630 :         if cfg!(debug_assertions) {
    1508         1530 :             for (log_key, _, _) in full_history {
    1509          900 :                 assert_eq!(log_key, &key, "mismatched key");
    1510              :             }
    1511          630 :             for i in 1..full_history.len() {
    1512          270 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1513          270 :                 if full_history[i - 1].1 == full_history[i].1 {
    1514            0 :                     assert!(
    1515            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1516            0 :                         "unordered delta/image, or duplicated delta"
    1517              :                     );
    1518          270 :                 }
    1519              :             }
    1520              :             // There was an assertion for no base image that checks if the first
    1521              :             // record in the history is `will_init` before, but it was removed.
    1522              :             // This is explained in the test cases for generate_key_retention.
    1523              :             // Search "incomplete history" for more information.
    1524         1410 :             for lsn in retain_lsn_below_horizon {
    1525          780 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1526              :             }
    1527          630 :             for i in 1..retain_lsn_below_horizon.len() {
    1528          356 :                 assert!(
    1529          356 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1530            0 :                     "unordered LSN"
    1531              :                 );
    1532              :             }
    1533            0 :         }
    1534          630 :         let has_ancestor = base_img_from_ancestor.is_some();
    1535              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1536              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1537          630 :         let (mut split_history, lsn_split_points) = {
    1538          630 :             let mut split_history = Vec::new();
    1539          630 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1540          630 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1541         1410 :             for lsn in retain_lsn_below_horizon {
    1542          780 :                 lsn_split_points.push(*lsn);
    1543          780 :             }
    1544          630 :             lsn_split_points.push(horizon);
    1545          630 :             let mut current_idx = 0;
    1546         1530 :             for item @ (_, lsn, _) in full_history {
    1547         1144 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1548          244 :                     current_idx += 1;
    1549          244 :                 }
    1550          900 :                 split_history[current_idx].push(item);
    1551              :             }
    1552          630 :             (split_history, lsn_split_points)
    1553              :         };
    1554              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1555         2670 :         for split_for_lsn in &mut split_history {
    1556         2040 :             let mut prev_lsn = None;
    1557         2040 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1558         2040 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1559          900 :                 if let Some(prev_lsn) = &prev_lsn {
    1560          118 :                     if *prev_lsn == lsn {
    1561              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1562              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1563              :                         // drop this delta and keep the image.
    1564              :                         //
    1565              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1566              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1567              :                         // dropped.
    1568              :                         //
    1569              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1570              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1571            0 :                         continue;
    1572          118 :                     }
    1573          782 :                 }
    1574          900 :                 prev_lsn = Some(lsn);
    1575          900 :                 new_split_for_lsn.push(record);
    1576              :             }
    1577         2040 :             *split_for_lsn = new_split_for_lsn;
    1578              :         }
    1579              :         // Step 3: generate images when necessary
    1580          630 :         let mut retention = Vec::with_capacity(split_history.len());
    1581          630 :         let mut records_since_last_image = 0;
    1582          630 :         let batch_cnt = split_history.len();
    1583          630 :         assert!(
    1584          630 :             batch_cnt >= 2,
    1585            0 :             "should have at least below + above horizon batches"
    1586              :         );
    1587          630 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1588          630 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1589           42 :             replay_history.push((key, lsn, Value::Image(img)));
    1590          588 :         }
    1591              : 
    1592              :         /// Generate debug information for the replay history
    1593            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1594              :             use std::fmt::Write;
    1595            0 :             let mut output = String::new();
    1596            0 :             if let Some((key, _, _)) = replay_history.first() {
    1597            0 :                 write!(output, "key={} ", key).unwrap();
    1598            0 :                 let mut cnt = 0;
    1599            0 :                 for (_, lsn, val) in replay_history {
    1600            0 :                     if val.is_image() {
    1601            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1602            0 :                     } else if val.will_init() {
    1603            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1604            0 :                     } else {
    1605            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1606            0 :                     }
    1607            0 :                     cnt += 1;
    1608            0 :                     if cnt >= 128 {
    1609            0 :                         write!(output, "... and more").unwrap();
    1610            0 :                         break;
    1611            0 :                     }
    1612              :                 }
    1613            0 :             } else {
    1614            0 :                 write!(output, "<no history>").unwrap();
    1615            0 :             }
    1616            0 :             output
    1617            0 :         }
    1618              : 
    1619            0 :         fn generate_debug_trace(
    1620            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1621            0 :             full_history: &[(Key, Lsn, Value)],
    1622            0 :             lsns: &[Lsn],
    1623            0 :             horizon: Lsn,
    1624            0 :         ) -> String {
    1625              :             use std::fmt::Write;
    1626            0 :             let mut output = String::new();
    1627            0 :             if let Some(replay_history) = replay_history {
    1628            0 :                 writeln!(
    1629            0 :                     output,
    1630            0 :                     "replay_history: {}",
    1631            0 :                     generate_history_trace(replay_history)
    1632            0 :                 )
    1633            0 :                 .unwrap();
    1634            0 :             } else {
    1635            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1636            0 :             }
    1637            0 :             writeln!(
    1638            0 :                 output,
    1639            0 :                 "full_history: {}",
    1640            0 :                 generate_history_trace(full_history)
    1641            0 :             )
    1642            0 :             .unwrap();
    1643            0 :             writeln!(
    1644            0 :                 output,
    1645            0 :                 "when processing: [{}] horizon={}",
    1646            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1647            0 :                 horizon
    1648            0 :             )
    1649            0 :             .unwrap();
    1650            0 :             output
    1651            0 :         }
    1652              : 
    1653         2040 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1654              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1655         2040 :             records_since_last_image += split_for_lsn.len();
    1656         2040 :             let generate_image = if i == 0 && !has_ancestor {
    1657              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1658          588 :                 true
    1659         1452 :             } else if i == batch_cnt - 1 {
    1660              :                 // Do not generate images for the last batch (above horizon)
    1661          630 :                 false
    1662          822 :             } else if records_since_last_image >= delta_threshold_cnt {
    1663              :                 // Generate images when there are too many records
    1664            6 :                 true
    1665              :             } else {
    1666          816 :                 false
    1667              :             };
    1668         2040 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1669              :             // Only retain the items after the last image record
    1670         2514 :             for idx in (0..replay_history.len()).rev() {
    1671         2514 :                 if replay_history[idx].2.will_init() {
    1672         2040 :                     replay_history = replay_history[idx..].to_vec();
    1673         2040 :                     break;
    1674          474 :                 }
    1675              :             }
    1676         2040 :             if let Some((_, _, val)) = replay_history.first() {
    1677         2040 :                 if !val.will_init() {
    1678            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1679            0 :                         || {
    1680            0 :                             generate_debug_trace(
    1681            0 :                                 Some(&replay_history),
    1682            0 :                                 full_history,
    1683            0 :                                 retain_lsn_below_horizon,
    1684            0 :                                 horizon,
    1685            0 :                             )
    1686            0 :                         },
    1687            0 :                     );
    1688         2040 :                 }
    1689            0 :             }
    1690         2040 :             if generate_image && records_since_last_image > 0 {
    1691          594 :                 records_since_last_image = 0;
    1692          594 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1693          594 :                     Some(replay_history.clone())
    1694              :                 } else {
    1695            0 :                     None
    1696              :                 };
    1697          594 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1698          594 :                 let history = std::mem::take(&mut replay_history);
    1699          594 :                 let mut img = None;
    1700          594 :                 let mut records = Vec::with_capacity(history.len());
    1701          594 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1702          572 :                     img = Some((*lsn, val.clone()));
    1703          572 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1704           34 :                         let Value::WalRecord(rec) = val else {
    1705            0 :                             return Err(anyhow::anyhow!(
    1706            0 :                                 "invalid record, first record is image, expect walrecords"
    1707            0 :                             ))
    1708            0 :                             .with_context(|| {
    1709            0 :                                 generate_debug_trace(
    1710            0 :                                     replay_history_for_debug_ref,
    1711            0 :                                     full_history,
    1712            0 :                                     retain_lsn_below_horizon,
    1713            0 :                                     horizon,
    1714            0 :                                 )
    1715            0 :                             });
    1716              :                         };
    1717           34 :                         records.push((lsn, rec));
    1718              :                     }
    1719              :                 } else {
    1720           36 :                     for (_, lsn, val) in history.into_iter() {
    1721           36 :                         let Value::WalRecord(rec) = val else {
    1722            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1723            0 :                                 .with_context(|| generate_debug_trace(
    1724            0 :                                     replay_history_for_debug_ref,
    1725            0 :                                     full_history,
    1726            0 :                                     retain_lsn_below_horizon,
    1727            0 :                                     horizon,
    1728            0 :                                 ));
    1729              :                         };
    1730           36 :                         records.push((lsn, rec));
    1731              :                     }
    1732              :                 }
    1733          594 :                 records.reverse();
    1734          594 :                 let state = ValueReconstructState { img, records };
    1735          594 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1736          594 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1737          594 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1738          594 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1739         1446 :             } else {
    1740         1446 :                 let deltas = split_for_lsn
    1741         1446 :                     .iter()
    1742         1446 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1743         1446 :                     .collect_vec();
    1744         1446 :                 retention.push(deltas);
    1745         1446 :             }
    1746              :         }
    1747          630 :         let mut result = Vec::with_capacity(retention.len());
    1748          630 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1749         2040 :         for (idx, logs) in retention.into_iter().enumerate() {
    1750         2040 :             if idx == lsn_split_points.len() {
    1751          630 :                 return Ok(KeyHistoryRetention {
    1752          630 :                     below_horizon: result,
    1753          630 :                     above_horizon: KeyLogAtLsn(logs),
    1754          630 :                 });
    1755         1410 :             } else {
    1756         1410 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1757         1410 :             }
    1758              :         }
    1759            0 :         unreachable!("key retention is empty")
    1760          630 :     }
    1761              : 
    1762              :     /// Check how much space is left on the disk
    1763           52 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    1764           52 :         let tenants_dir = self.conf.tenants_path();
    1765              : 
    1766           52 :         let stat = Statvfs::get(&tenants_dir, None)
    1767           52 :             .context("statvfs failed, presumably directory got unlinked")?;
    1768              : 
    1769           52 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    1770           52 : 
    1771           52 :         Ok(avail_bytes)
    1772           52 :     }
    1773              : 
    1774              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    1775              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    1776              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    1777              :     /// compaction.
    1778           52 :     async fn check_compaction_space(
    1779           52 :         self: &Arc<Self>,
    1780           52 :         layer_selection: &[Layer],
    1781           52 :     ) -> anyhow::Result<()> {
    1782           52 :         let available_space = self.check_available_space().await?;
    1783           52 :         let mut remote_layer_size = 0;
    1784           52 :         let mut all_layer_size = 0;
    1785          204 :         for layer in layer_selection {
    1786          152 :             let needs_download = layer.needs_download().await?;
    1787          152 :             if needs_download.is_some() {
    1788            0 :                 remote_layer_size += layer.layer_desc().file_size;
    1789          152 :             }
    1790          152 :             all_layer_size += layer.layer_desc().file_size;
    1791              :         }
    1792           52 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    1793           52 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    1794              :         {
    1795            0 :             return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    1796            0 :                 available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
    1797           52 :         }
    1798           52 :         Ok(())
    1799           52 :     }
    1800              : 
    1801              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    1802              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    1803              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    1804              :     /// like a full compaction of the specified keyspace.
    1805            0 :     pub(crate) async fn gc_compaction_split_jobs(
    1806            0 :         self: &Arc<Self>,
    1807            0 :         job: GcCompactJob,
    1808            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    1809            0 :     ) -> anyhow::Result<Vec<GcCompactJob>> {
    1810            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    1811            0 :             job.compact_lsn_range.end
    1812              :         } else {
    1813            0 :             *self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
    1814              :         };
    1815              : 
    1816              :         // Split compaction job to about 4GB each
    1817              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    1818            0 :         let sub_compaction_max_job_size_mb =
    1819            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    1820            0 : 
    1821            0 :         let mut compact_jobs = Vec::new();
    1822              :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    1823              :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    1824            0 :         let ((dense_ks, sparse_ks), _) = {
    1825            0 :             let Ok(partition) = self.partitioning.try_lock() else {
    1826            0 :                 bail!("failed to acquire partition lock during gc-compaction");
    1827              :             };
    1828            0 :             partition.clone()
    1829              :         };
    1830              :         // Truncate the key range to be within user specified compaction range.
    1831            0 :         fn truncate_to(
    1832            0 :             source_start: &Key,
    1833            0 :             source_end: &Key,
    1834            0 :             target_start: &Key,
    1835            0 :             target_end: &Key,
    1836            0 :         ) -> Option<(Key, Key)> {
    1837            0 :             let start = source_start.max(target_start);
    1838            0 :             let end = source_end.min(target_end);
    1839            0 :             if start < end {
    1840            0 :                 Some((*start, *end))
    1841              :             } else {
    1842            0 :                 None
    1843              :             }
    1844            0 :         }
    1845            0 :         let mut split_key_ranges = Vec::new();
    1846            0 :         let ranges = dense_ks
    1847            0 :             .parts
    1848            0 :             .iter()
    1849            0 :             .map(|partition| partition.ranges.iter())
    1850            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    1851            0 :             .flatten()
    1852            0 :             .cloned()
    1853            0 :             .collect_vec();
    1854            0 :         for range in ranges.iter() {
    1855            0 :             let Some((start, end)) = truncate_to(
    1856            0 :                 &range.start,
    1857            0 :                 &range.end,
    1858            0 :                 &job.compact_key_range.start,
    1859            0 :                 &job.compact_key_range.end,
    1860            0 :             ) else {
    1861            0 :                 continue;
    1862              :             };
    1863            0 :             split_key_ranges.push((start, end));
    1864              :         }
    1865            0 :         split_key_ranges.sort();
    1866            0 :         let guard = self.layers.read().await;
    1867            0 :         let layer_map = guard.layer_map()?;
    1868            0 :         let mut current_start = None;
    1869            0 :         let ranges_num = split_key_ranges.len();
    1870            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    1871            0 :             if current_start.is_none() {
    1872            0 :                 current_start = Some(start);
    1873            0 :             }
    1874            0 :             let start = current_start.unwrap();
    1875            0 :             if start >= end {
    1876              :                 // We have already processed this partition.
    1877            0 :                 continue;
    1878            0 :             }
    1879            0 :             let res = layer_map.range_search(start..end, compact_below_lsn);
    1880            0 :             let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
    1881            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    1882              :                 // Try to extend the compaction range so that we include at least one full layer file.
    1883            0 :                 let extended_end = res
    1884            0 :                     .found
    1885            0 :                     .keys()
    1886            0 :                     .map(|layer| layer.layer.key_range.end)
    1887            0 :                     .min();
    1888              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    1889              :                 // In this case, we simply use the specified key range end.
    1890            0 :                 let end = if let Some(extended_end) = extended_end {
    1891            0 :                     extended_end.max(end)
    1892              :                 } else {
    1893            0 :                     end
    1894              :                 };
    1895            0 :                 info!(
    1896            0 :                     "splitting compaction job: {}..{}, estimated_size={}",
    1897              :                     start, end, total_size
    1898              :                 );
    1899            0 :                 compact_jobs.push(GcCompactJob {
    1900            0 :                     dry_run: job.dry_run,
    1901            0 :                     compact_key_range: start..end,
    1902            0 :                     compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    1903            0 :                 });
    1904            0 :                 current_start = Some(end);
    1905            0 :             }
    1906              :         }
    1907            0 :         drop(guard);
    1908            0 :         Ok(compact_jobs)
    1909            0 :     }
    1910              : 
    1911              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1912              :     ///
    1913              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1914              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1915              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1916              :     /// and create delta layers with all deltas >= gc horizon.
    1917              :     ///
    1918              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    1919              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    1920              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    1921              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    1922              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    1923              :     /// part of the range.
    1924              :     ///
    1925              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    1926              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    1927           54 :     pub(crate) async fn compact_with_gc(
    1928           54 :         self: &Arc<Self>,
    1929           54 :         cancel: &CancellationToken,
    1930           54 :         options: CompactOptions,
    1931           54 :         ctx: &RequestContext,
    1932           54 :     ) -> anyhow::Result<()> {
    1933           54 :         let sub_compaction = options.sub_compaction;
    1934           54 :         let job = GcCompactJob::from_compact_options(options.clone());
    1935           54 :         if sub_compaction {
    1936            0 :             info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
    1937            0 :             let jobs = self
    1938            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    1939            0 :                 .await?;
    1940            0 :             let jobs_len = jobs.len();
    1941            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    1942            0 :                 info!(
    1943            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    1944            0 :                     idx + 1,
    1945              :                     jobs_len
    1946              :                 );
    1947            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    1948              :             }
    1949            0 :             if jobs_len == 0 {
    1950            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    1951            0 :             }
    1952            0 :             return Ok(());
    1953           54 :         }
    1954           54 :         self.compact_with_gc_inner(cancel, job, ctx).await
    1955           54 :     }
    1956              : 
    1957           54 :     async fn compact_with_gc_inner(
    1958           54 :         self: &Arc<Self>,
    1959           54 :         cancel: &CancellationToken,
    1960           54 :         job: GcCompactJob,
    1961           54 :         ctx: &RequestContext,
    1962           54 :     ) -> anyhow::Result<()> {
    1963           54 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1964           54 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1965           54 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1966           54 : 
    1967           54 :         let gc_lock = async {
    1968           54 :             tokio::select! {
    1969           54 :                 guard = self.gc_lock.lock() => Ok(guard),
    1970              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1971           54 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1972              :             }
    1973           54 :         };
    1974              : 
    1975           54 :         let gc_lock = crate::timed(
    1976           54 :             gc_lock,
    1977           54 :             "acquires gc lock",
    1978           54 :             std::time::Duration::from_secs(5),
    1979           54 :         )
    1980           54 :         .await?;
    1981              : 
    1982           54 :         let dry_run = job.dry_run;
    1983           54 :         let compact_key_range = job.compact_key_range;
    1984           54 :         let compact_lsn_range = job.compact_lsn_range;
    1985           54 : 
    1986           54 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
    1987              : 
    1988           54 :         scopeguard::defer! {
    1989           54 :             info!("done enhanced gc bottom-most compaction");
    1990           54 :         };
    1991           54 : 
    1992           54 :         let mut stat = CompactionStatistics::default();
    1993              : 
    1994              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1995              :         // The layer selection has the following properties:
    1996              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1997              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1998           52 :         let job_desc = {
    1999           54 :             let guard = self.layers.read().await;
    2000           54 :             let layers = guard.layer_map()?;
    2001           54 :             let gc_info = self.gc_info.read().unwrap();
    2002           54 :             let mut retain_lsns_below_horizon = Vec::new();
    2003           54 :             let gc_cutoff = {
    2004              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2005              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2006              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2007              :                 // to get the truth data.
    2008           54 :                 let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    2009              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2010              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2011              :                 // the real cutoff.
    2012           54 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2013           48 :                     real_gc_cutoff
    2014              :                 } else {
    2015            6 :                     compact_lsn_range.end
    2016              :                 };
    2017           54 :                 if gc_cutoff > real_gc_cutoff {
    2018            4 :                     warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
    2019            4 :                     gc_cutoff = real_gc_cutoff;
    2020           50 :                 }
    2021           54 :                 gc_cutoff
    2022              :             };
    2023           70 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2024           70 :                 if lsn < &gc_cutoff {
    2025           70 :                     retain_lsns_below_horizon.push(*lsn);
    2026           70 :                 }
    2027              :             }
    2028           54 :             for lsn in gc_info.leases.keys() {
    2029            0 :                 if lsn < &gc_cutoff {
    2030            0 :                     retain_lsns_below_horizon.push(*lsn);
    2031            0 :                 }
    2032              :             }
    2033           54 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2034           54 :             drop(gc_info);
    2035              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2036           54 :             let Some(max_layer_lsn) = layers
    2037           54 :                 .iter_historic_layers()
    2038          244 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2039          208 :                 .map(|desc| desc.get_lsn_range().end)
    2040           54 :                 .max()
    2041              :             else {
    2042            0 :                 info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
    2043            0 :                 return Ok(());
    2044              :             };
    2045              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    2046              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    2047              :             // it is a branch.
    2048           54 :             let Some(min_layer_lsn) = layers
    2049           54 :                 .iter_historic_layers()
    2050          244 :                 .filter(|desc| {
    2051          244 :                     if compact_lsn_range.start == Lsn::INVALID {
    2052          198 :                         true // select all layers below if start == Lsn(0)
    2053              :                     } else {
    2054           46 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    2055              :                     }
    2056          244 :                 })
    2057          226 :                 .map(|desc| desc.get_lsn_range().start)
    2058           54 :                 .min()
    2059              :             else {
    2060            0 :                 info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
    2061            0 :                 return Ok(());
    2062              :             };
    2063              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2064              :             // layers to compact.
    2065           54 :             let mut rewrite_layers = Vec::new();
    2066          244 :             for desc in layers.iter_historic_layers() {
    2067          244 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2068          208 :                     && desc.get_lsn_range().start >= min_layer_lsn
    2069          190 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    2070              :                 {
    2071              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2072              :                     // even if it might contain extra keys
    2073          152 :                     selected_layers.push(guard.get_from_desc(&desc));
    2074          152 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2075          152 :                     // to overlap image layers)
    2076          152 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    2077            2 :                     {
    2078            2 :                         rewrite_layers.push(desc);
    2079          150 :                     }
    2080           92 :                 }
    2081              :             }
    2082           54 :             if selected_layers.is_empty() {
    2083            2 :                 info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
    2084            2 :                 return Ok(());
    2085           52 :             }
    2086           52 :             retain_lsns_below_horizon.sort();
    2087           52 :             GcCompactionJobDescription {
    2088           52 :                 selected_layers,
    2089           52 :                 gc_cutoff,
    2090           52 :                 retain_lsns_below_horizon,
    2091           52 :                 min_layer_lsn,
    2092           52 :                 max_layer_lsn,
    2093           52 :                 compaction_key_range: compact_key_range,
    2094           52 :                 rewrite_layers,
    2095           52 :             }
    2096              :         };
    2097           52 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    2098              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    2099              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    2100            8 :             (true, job_desc.min_layer_lsn)
    2101           44 :         } else if self.ancestor_timeline.is_some() {
    2102              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    2103              :             // LSN ranges all the way to the ancestor timeline.
    2104            2 :             (true, self.ancestor_lsn)
    2105              :         } else {
    2106           42 :             let res = job_desc
    2107           42 :                 .retain_lsns_below_horizon
    2108           42 :                 .first()
    2109           42 :                 .copied()
    2110           42 :                 .unwrap_or(job_desc.gc_cutoff);
    2111           42 :             if cfg!(debug_assertions) {
    2112           42 :                 assert_eq!(
    2113           42 :                     res,
    2114           42 :                     job_desc
    2115           42 :                         .retain_lsns_below_horizon
    2116           42 :                         .iter()
    2117           42 :                         .min()
    2118           42 :                         .copied()
    2119           42 :                         .unwrap_or(job_desc.gc_cutoff)
    2120           42 :                 );
    2121            0 :             }
    2122           42 :             (false, res)
    2123              :         };
    2124           52 :         info!(
    2125            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    2126            0 :             job_desc.selected_layers.len(),
    2127            0 :             job_desc.rewrite_layers.len(),
    2128              :             job_desc.max_layer_lsn,
    2129              :             job_desc.min_layer_lsn,
    2130              :             job_desc.gc_cutoff,
    2131              :             lowest_retain_lsn,
    2132              :             job_desc.compaction_key_range.start,
    2133              :             job_desc.compaction_key_range.end,
    2134              :             has_data_below,
    2135              :         );
    2136              : 
    2137          204 :         for layer in &job_desc.selected_layers {
    2138          152 :             debug!("read layer: {}", layer.layer_desc().key());
    2139              :         }
    2140           54 :         for layer in &job_desc.rewrite_layers {
    2141            2 :             debug!("rewrite layer: {}", layer.key());
    2142              :         }
    2143              : 
    2144           52 :         self.check_compaction_space(&job_desc.selected_layers)
    2145           52 :             .await?;
    2146              : 
    2147              :         // Generate statistics for the compaction
    2148          204 :         for layer in &job_desc.selected_layers {
    2149          152 :             let desc = layer.layer_desc();
    2150          152 :             if desc.is_delta() {
    2151           86 :                 stat.visit_delta_layer(desc.file_size());
    2152           86 :             } else {
    2153           66 :                 stat.visit_image_layer(desc.file_size());
    2154           66 :             }
    2155              :         }
    2156              : 
    2157              :         // Step 1: construct a k-merge iterator over all layers.
    2158              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2159              :         // disable the check for now because we need to adjust the check for partial compactions, will enable later.
    2160              :         // let layer_names = job_desc
    2161              :         //     .selected_layers
    2162              :         //     .iter()
    2163              :         //     .map(|layer| layer.layer_desc().layer_name())
    2164              :         //     .collect_vec();
    2165              :         // if let Some(err) = check_valid_layermap(&layer_names) {
    2166              :         //     warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
    2167              :         // }
    2168              :         // The maximum LSN we are processing in this compaction loop
    2169           52 :         let end_lsn = job_desc
    2170           52 :             .selected_layers
    2171           52 :             .iter()
    2172          152 :             .map(|l| l.layer_desc().lsn_range.end)
    2173           52 :             .max()
    2174           52 :             .unwrap();
    2175           52 :         let mut delta_layers = Vec::new();
    2176           52 :         let mut image_layers = Vec::new();
    2177           52 :         let mut downloaded_layers = Vec::new();
    2178           52 :         let mut total_downloaded_size = 0;
    2179           52 :         let mut total_layer_size = 0;
    2180          204 :         for layer in &job_desc.selected_layers {
    2181          152 :             if layer.needs_download().await?.is_some() {
    2182            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    2183          152 :             }
    2184          152 :             total_layer_size += layer.layer_desc().file_size;
    2185          152 :             let resident_layer = layer.download_and_keep_resident().await?;
    2186          152 :             downloaded_layers.push(resident_layer);
    2187              :         }
    2188           52 :         info!(
    2189            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    2190            0 :             total_downloaded_size,
    2191            0 :             total_layer_size,
    2192            0 :             total_downloaded_size as f64 / total_layer_size as f64
    2193              :         );
    2194          204 :         for resident_layer in &downloaded_layers {
    2195          152 :             if resident_layer.layer_desc().is_delta() {
    2196           86 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2197           86 :                 delta_layers.push(layer);
    2198              :             } else {
    2199           66 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2200           66 :                 image_layers.push(layer);
    2201              :             }
    2202              :         }
    2203           52 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2204           52 :         let mut merge_iter = FilterIterator::create(
    2205           52 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2206           52 :             dense_ks,
    2207           52 :             sparse_ks,
    2208           52 :         )?;
    2209              : 
    2210              :         // Step 2: Produce images+deltas.
    2211           52 :         let mut accumulated_values = Vec::new();
    2212           52 :         let mut last_key: Option<Key> = None;
    2213              : 
    2214              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2215              :         // when some condition meet.
    2216           52 :         let mut image_layer_writer = if !has_data_below {
    2217              :             Some(
    2218           42 :                 SplitImageLayerWriter::new(
    2219           42 :                     self.conf,
    2220           42 :                     self.timeline_id,
    2221           42 :                     self.tenant_shard_id,
    2222           42 :                     job_desc.compaction_key_range.start,
    2223           42 :                     lowest_retain_lsn,
    2224           42 :                     self.get_compaction_target_size(),
    2225           42 :                     ctx,
    2226           42 :                 )
    2227           42 :                 .await?,
    2228              :             )
    2229              :         } else {
    2230           10 :             None
    2231              :         };
    2232              : 
    2233           52 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2234           52 :             self.conf,
    2235           52 :             self.timeline_id,
    2236           52 :             self.tenant_shard_id,
    2237           52 :             lowest_retain_lsn..end_lsn,
    2238           52 :             self.get_compaction_target_size(),
    2239           52 :         )
    2240           52 :         .await?;
    2241              : 
    2242              :         #[derive(Default)]
    2243              :         struct RewritingLayers {
    2244              :             before: Option<DeltaLayerWriter>,
    2245              :             after: Option<DeltaLayerWriter>,
    2246              :         }
    2247           52 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2248              : 
    2249              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    2250              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    2251              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    2252              :         ///
    2253              :         /// This function unifies the cases so that later code doesn't have to think about it.
    2254              :         ///
    2255              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2256              :         /// is needed for reconstruction. This should be fixed in the future.
    2257              :         ///
    2258              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2259              :         /// images.
    2260          622 :         async fn get_ancestor_image(
    2261          622 :             this_tline: &Arc<Timeline>,
    2262          622 :             key: Key,
    2263          622 :             ctx: &RequestContext,
    2264          622 :             has_data_below: bool,
    2265          622 :             history_lsn_point: Lsn,
    2266          622 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2267          622 :             if !has_data_below {
    2268          584 :                 return Ok(None);
    2269           38 :             };
    2270              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2271              :             // as much existing code as possible.
    2272           38 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    2273           38 :             Ok(Some((key, history_lsn_point, img)))
    2274          622 :         }
    2275              : 
    2276              :         // Actually, we can decide not to write to the image layer at all at this point because
    2277              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2278              :         // create this writer, and discard the writer in the end.
    2279              : 
    2280          966 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2281          914 :             if cancel.is_cancelled() {
    2282            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2283          914 :             }
    2284          914 :             if self.shard_identity.is_key_disposable(&key) {
    2285              :                 // If this shard does not need to store this key, simply skip it.
    2286              :                 //
    2287              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2288              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2289              :                 // a whole layer file as handling key spaces (ranges).
    2290            0 :                 if cfg!(debug_assertions) {
    2291            0 :                     let shard = self.shard_identity.shard_index();
    2292            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2293            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2294            0 :                 }
    2295            0 :                 continue;
    2296          914 :             }
    2297          914 :             if !job_desc.compaction_key_range.contains(&key) {
    2298           64 :                 if !desc.is_delta {
    2299           60 :                     continue;
    2300            4 :                 }
    2301            4 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2302            4 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2303            0 :                     if rewriter.before.is_none() {
    2304            0 :                         rewriter.before = Some(
    2305            0 :                             DeltaLayerWriter::new(
    2306            0 :                                 self.conf,
    2307            0 :                                 self.timeline_id,
    2308            0 :                                 self.tenant_shard_id,
    2309            0 :                                 desc.key_range.start,
    2310            0 :                                 desc.lsn_range.clone(),
    2311            0 :                                 ctx,
    2312            0 :                             )
    2313            0 :                             .await?,
    2314              :                         );
    2315            0 :                     }
    2316            0 :                     rewriter.before.as_mut().unwrap()
    2317            4 :                 } else if key >= job_desc.compaction_key_range.end {
    2318            4 :                     if rewriter.after.is_none() {
    2319            2 :                         rewriter.after = Some(
    2320            2 :                             DeltaLayerWriter::new(
    2321            2 :                                 self.conf,
    2322            2 :                                 self.timeline_id,
    2323            2 :                                 self.tenant_shard_id,
    2324            2 :                                 job_desc.compaction_key_range.end,
    2325            2 :                                 desc.lsn_range.clone(),
    2326            2 :                                 ctx,
    2327            2 :                             )
    2328            2 :                             .await?,
    2329              :                         );
    2330            2 :                     }
    2331            4 :                     rewriter.after.as_mut().unwrap()
    2332              :                 } else {
    2333            0 :                     unreachable!()
    2334              :                 };
    2335            4 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2336            4 :                 continue;
    2337          850 :             }
    2338          850 :             match val {
    2339          610 :                 Value::Image(_) => stat.visit_image_key(&val),
    2340          240 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2341              :             }
    2342          850 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2343          280 :                 if last_key.is_none() {
    2344           52 :                     last_key = Some(key);
    2345          228 :                 }
    2346          280 :                 accumulated_values.push((key, lsn, val));
    2347              :             } else {
    2348          570 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2349          570 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    2350          570 :                 let retention = self
    2351          570 :                     .generate_key_retention(
    2352          570 :                         *last_key,
    2353          570 :                         &accumulated_values,
    2354          570 :                         job_desc.gc_cutoff,
    2355          570 :                         &job_desc.retain_lsns_below_horizon,
    2356          570 :                         COMPACTION_DELTA_THRESHOLD,
    2357          570 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    2358          570 :                             .await?,
    2359              :                     )
    2360          570 :                     .await?;
    2361          570 :                 retention
    2362          570 :                     .pipe_to(
    2363          570 :                         *last_key,
    2364          570 :                         &mut delta_layer_writer,
    2365          570 :                         image_layer_writer.as_mut(),
    2366          570 :                         &mut stat,
    2367          570 :                         ctx,
    2368          570 :                     )
    2369          570 :                     .await?;
    2370          570 :                 accumulated_values.clear();
    2371          570 :                 *last_key = key;
    2372          570 :                 accumulated_values.push((key, lsn, val));
    2373              :             }
    2374              :         }
    2375              : 
    2376              :         // TODO: move the below part to the loop body
    2377           52 :         let last_key = last_key.expect("no keys produced during compaction");
    2378           52 :         stat.on_unique_key_visited();
    2379              : 
    2380           52 :         let retention = self
    2381           52 :             .generate_key_retention(
    2382           52 :                 last_key,
    2383           52 :                 &accumulated_values,
    2384           52 :                 job_desc.gc_cutoff,
    2385           52 :                 &job_desc.retain_lsns_below_horizon,
    2386           52 :                 COMPACTION_DELTA_THRESHOLD,
    2387           52 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
    2388              :             )
    2389           52 :             .await?;
    2390           52 :         retention
    2391           52 :             .pipe_to(
    2392           52 :                 last_key,
    2393           52 :                 &mut delta_layer_writer,
    2394           52 :                 image_layer_writer.as_mut(),
    2395           52 :                 &mut stat,
    2396           52 :                 ctx,
    2397           52 :             )
    2398           52 :             .await?;
    2399              :         // end: move the above part to the loop body
    2400              : 
    2401           52 :         let mut rewrote_delta_layers = Vec::new();
    2402           54 :         for (key, writers) in delta_layer_rewriters {
    2403            2 :             if let Some(delta_writer_before) = writers.before {
    2404            0 :                 let (desc, path) = delta_writer_before
    2405            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    2406            0 :                     .await?;
    2407            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2408            0 :                 rewrote_delta_layers.push(layer);
    2409            2 :             }
    2410            2 :             if let Some(delta_writer_after) = writers.after {
    2411            2 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    2412            2 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2413            2 :                 rewrote_delta_layers.push(layer);
    2414            0 :             }
    2415              :         }
    2416              : 
    2417           74 :         let discard = |key: &PersistentLayerKey| {
    2418           74 :             let key = key.clone();
    2419           74 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    2420           74 :         };
    2421              : 
    2422           52 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    2423           42 :             if !dry_run {
    2424           38 :                 let end_key = job_desc.compaction_key_range.end;
    2425           38 :                 writer
    2426           38 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    2427           38 :                     .await?
    2428              :             } else {
    2429            4 :                 drop(writer);
    2430            4 :                 Vec::new()
    2431              :             }
    2432              :         } else {
    2433           10 :             Vec::new()
    2434              :         };
    2435              : 
    2436           52 :         let produced_delta_layers = if !dry_run {
    2437           48 :             delta_layer_writer
    2438           48 :                 .finish_with_discard_fn(self, ctx, discard)
    2439           48 :                 .await?
    2440              :         } else {
    2441            4 :             drop(delta_layer_writer);
    2442            4 :             Vec::new()
    2443              :         };
    2444              : 
    2445              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    2446              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    2447           52 :         let mut compact_to = Vec::new();
    2448           52 :         let mut keep_layers = HashSet::new();
    2449           52 :         let produced_delta_layers_len = produced_delta_layers.len();
    2450           52 :         let produced_image_layers_len = produced_image_layers.len();
    2451           88 :         for action in produced_delta_layers {
    2452           36 :             match action {
    2453           22 :                 BatchWriterResult::Produced(layer) => {
    2454           22 :                     if cfg!(debug_assertions) {
    2455           22 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    2456            0 :                     }
    2457           22 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    2458           22 :                     compact_to.push(layer);
    2459              :                 }
    2460           14 :                 BatchWriterResult::Discarded(l) => {
    2461           14 :                     if cfg!(debug_assertions) {
    2462           14 :                         info!("discarded delta layer: {}", l);
    2463            0 :                     }
    2464           14 :                     keep_layers.insert(l);
    2465           14 :                     stat.discard_delta_layer();
    2466              :                 }
    2467              :             }
    2468              :         }
    2469           54 :         for layer in &rewrote_delta_layers {
    2470            2 :             debug!(
    2471            0 :                 "produced rewritten delta layer: {}",
    2472            0 :                 layer.layer_desc().key()
    2473              :             );
    2474              :         }
    2475           52 :         compact_to.extend(rewrote_delta_layers);
    2476           90 :         for action in produced_image_layers {
    2477           38 :             match action {
    2478           30 :                 BatchWriterResult::Produced(layer) => {
    2479           30 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    2480           30 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2481           30 :                     compact_to.push(layer);
    2482              :                 }
    2483            8 :                 BatchWriterResult::Discarded(l) => {
    2484            8 :                     debug!("discarded image layer: {}", l);
    2485            8 :                     keep_layers.insert(l);
    2486            8 :                     stat.discard_image_layer();
    2487              :                 }
    2488              :             }
    2489              :         }
    2490              : 
    2491           52 :         let mut layer_selection = job_desc.selected_layers;
    2492              : 
    2493              :         // Partial compaction might select more data than it processes, e.g., if
    2494              :         // the compaction_key_range only partially overlaps:
    2495              :         //
    2496              :         //         [---compaction_key_range---]
    2497              :         //   [---A----][----B----][----C----][----D----]
    2498              :         //
    2499              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    2500              :         // the compaction key range, so we can always discard them. However, for image
    2501              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    2502              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    2503              :         //
    2504              :         // The created image layers contain whatever is needed from B, C, and from
    2505              :         // `----]` of A, and from  `[---` of D.
    2506              :         //
    2507              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    2508              :         // keep that data.
    2509              :         //
    2510              :         // The solution for now is to keep A and D completely if they are image layers.
    2511              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    2512              :         // is _not_ fully covered by compaction_key_range).
    2513          204 :         for layer in &layer_selection {
    2514          152 :             if !layer.layer_desc().is_delta() {
    2515           66 :                 if !overlaps_with(
    2516           66 :                     &layer.layer_desc().key_range,
    2517           66 :                     &job_desc.compaction_key_range,
    2518           66 :                 ) {
    2519            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    2520           66 :                 }
    2521           66 :                 if !fully_contains(
    2522           66 :                     &job_desc.compaction_key_range,
    2523           66 :                     &layer.layer_desc().key_range,
    2524           66 :                 ) {
    2525            8 :                     keep_layers.insert(layer.layer_desc().key());
    2526           58 :                 }
    2527           86 :             }
    2528              :         }
    2529              : 
    2530          152 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2531           52 : 
    2532           52 :         info!(
    2533            0 :             "gc-compaction statistics: {}",
    2534            0 :             serde_json::to_string(&stat)?
    2535              :         );
    2536              : 
    2537           52 :         if dry_run {
    2538            4 :             return Ok(());
    2539           48 :         }
    2540           48 : 
    2541           48 :         info!(
    2542            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2543            0 :             produced_delta_layers_len,
    2544            0 :             produced_image_layers_len,
    2545            0 :             layer_selection.len()
    2546              :         );
    2547              : 
    2548              :         // Step 3: Place back to the layer map.
    2549              :         {
    2550              :             // TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
    2551           48 :             let mut guard = self.layers.write().await;
    2552           48 :             guard
    2553           48 :                 .open_mut()?
    2554           48 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2555           48 :         };
    2556           48 :         self.remote_client
    2557           48 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2558              : 
    2559           48 :         drop(gc_lock);
    2560           48 : 
    2561           48 :         Ok(())
    2562           54 :     }
    2563              : }
    2564              : 
    2565              : struct TimelineAdaptor {
    2566              :     timeline: Arc<Timeline>,
    2567              : 
    2568              :     keyspace: (Lsn, KeySpace),
    2569              : 
    2570              :     new_deltas: Vec<ResidentLayer>,
    2571              :     new_images: Vec<ResidentLayer>,
    2572              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2573              : }
    2574              : 
    2575              : impl TimelineAdaptor {
    2576            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2577            0 :         Self {
    2578            0 :             timeline: timeline.clone(),
    2579            0 :             keyspace,
    2580            0 :             new_images: Vec::new(),
    2581            0 :             new_deltas: Vec::new(),
    2582            0 :             layers_to_delete: Vec::new(),
    2583            0 :         }
    2584            0 :     }
    2585              : 
    2586            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2587            0 :         let layers_to_delete = {
    2588            0 :             let guard = self.timeline.layers.read().await;
    2589            0 :             self.layers_to_delete
    2590            0 :                 .iter()
    2591            0 :                 .map(|x| guard.get_from_desc(x))
    2592            0 :                 .collect::<Vec<Layer>>()
    2593            0 :         };
    2594            0 :         self.timeline
    2595            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2596            0 :             .await?;
    2597              : 
    2598            0 :         self.timeline
    2599            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2600              : 
    2601            0 :         self.new_deltas.clear();
    2602            0 :         self.layers_to_delete.clear();
    2603            0 :         Ok(())
    2604            0 :     }
    2605              : }
    2606              : 
    2607              : #[derive(Clone)]
    2608              : struct ResidentDeltaLayer(ResidentLayer);
    2609              : #[derive(Clone)]
    2610              : struct ResidentImageLayer(ResidentLayer);
    2611              : 
    2612              : impl CompactionJobExecutor for TimelineAdaptor {
    2613              :     type Key = pageserver_api::key::Key;
    2614              : 
    2615              :     type Layer = OwnArc<PersistentLayerDesc>;
    2616              :     type DeltaLayer = ResidentDeltaLayer;
    2617              :     type ImageLayer = ResidentImageLayer;
    2618              : 
    2619              :     type RequestContext = crate::context::RequestContext;
    2620              : 
    2621            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2622            0 :         self.timeline.get_shard_identity()
    2623            0 :     }
    2624              : 
    2625            0 :     async fn get_layers(
    2626            0 :         &mut self,
    2627            0 :         key_range: &Range<Key>,
    2628            0 :         lsn_range: &Range<Lsn>,
    2629            0 :         _ctx: &RequestContext,
    2630            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2631            0 :         self.flush_updates().await?;
    2632              : 
    2633            0 :         let guard = self.timeline.layers.read().await;
    2634            0 :         let layer_map = guard.layer_map()?;
    2635              : 
    2636            0 :         let result = layer_map
    2637            0 :             .iter_historic_layers()
    2638            0 :             .filter(|l| {
    2639            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2640            0 :             })
    2641            0 :             .map(OwnArc)
    2642            0 :             .collect();
    2643            0 :         Ok(result)
    2644            0 :     }
    2645              : 
    2646            0 :     async fn get_keyspace(
    2647            0 :         &mut self,
    2648            0 :         key_range: &Range<Key>,
    2649            0 :         lsn: Lsn,
    2650            0 :         _ctx: &RequestContext,
    2651            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2652            0 :         if lsn == self.keyspace.0 {
    2653            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2654            0 :                 &self.keyspace.1.ranges,
    2655            0 :                 key_range,
    2656            0 :             ))
    2657              :         } else {
    2658              :             // The current compaction implementation only ever requests the key space
    2659              :             // at the compaction end LSN.
    2660            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2661              :         }
    2662            0 :     }
    2663              : 
    2664            0 :     async fn downcast_delta_layer(
    2665            0 :         &self,
    2666            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2667            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2668            0 :         // this is a lot more complex than a simple downcast...
    2669            0 :         if layer.is_delta() {
    2670            0 :             let l = {
    2671            0 :                 let guard = self.timeline.layers.read().await;
    2672            0 :                 guard.get_from_desc(layer)
    2673              :             };
    2674            0 :             let result = l.download_and_keep_resident().await?;
    2675              : 
    2676            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2677              :         } else {
    2678            0 :             Ok(None)
    2679              :         }
    2680            0 :     }
    2681              : 
    2682            0 :     async fn create_image(
    2683            0 :         &mut self,
    2684            0 :         lsn: Lsn,
    2685            0 :         key_range: &Range<Key>,
    2686            0 :         ctx: &RequestContext,
    2687            0 :     ) -> anyhow::Result<()> {
    2688            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2689            0 :     }
    2690              : 
    2691            0 :     async fn create_delta(
    2692            0 :         &mut self,
    2693            0 :         lsn_range: &Range<Lsn>,
    2694            0 :         key_range: &Range<Key>,
    2695            0 :         input_layers: &[ResidentDeltaLayer],
    2696            0 :         ctx: &RequestContext,
    2697            0 :     ) -> anyhow::Result<()> {
    2698            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2699              : 
    2700            0 :         let mut all_entries = Vec::new();
    2701            0 :         for dl in input_layers.iter() {
    2702            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2703              :         }
    2704              : 
    2705              :         // The current stdlib sorting implementation is designed in a way where it is
    2706              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2707            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2708              : 
    2709            0 :         let mut writer = DeltaLayerWriter::new(
    2710            0 :             self.timeline.conf,
    2711            0 :             self.timeline.timeline_id,
    2712            0 :             self.timeline.tenant_shard_id,
    2713            0 :             key_range.start,
    2714            0 :             lsn_range.clone(),
    2715            0 :             ctx,
    2716            0 :         )
    2717            0 :         .await?;
    2718              : 
    2719            0 :         let mut dup_values = 0;
    2720            0 : 
    2721            0 :         // This iterator walks through all key-value pairs from all the layers
    2722            0 :         // we're compacting, in key, LSN order.
    2723            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2724              :         for &DeltaEntry {
    2725            0 :             key, lsn, ref val, ..
    2726            0 :         } in all_entries.iter()
    2727              :         {
    2728            0 :             if prev == Some((key, lsn)) {
    2729              :                 // This is a duplicate. Skip it.
    2730              :                 //
    2731              :                 // It can happen if compaction is interrupted after writing some
    2732              :                 // layers but not all, and we are compacting the range again.
    2733              :                 // The calculations in the algorithm assume that there are no
    2734              :                 // duplicates, so the math on targeted file size is likely off,
    2735              :                 // and we will create smaller files than expected.
    2736            0 :                 dup_values += 1;
    2737            0 :                 continue;
    2738            0 :             }
    2739              : 
    2740            0 :             let value = val.load(ctx).await?;
    2741              : 
    2742            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2743              : 
    2744            0 :             prev = Some((key, lsn));
    2745              :         }
    2746              : 
    2747            0 :         if dup_values > 0 {
    2748            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2749            0 :         }
    2750              : 
    2751            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2752            0 :             Err(anyhow::anyhow!(
    2753            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2754            0 :             ))
    2755            0 :         });
    2756              : 
    2757            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2758            0 :         let new_delta_layer =
    2759            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2760              : 
    2761            0 :         self.new_deltas.push(new_delta_layer);
    2762            0 :         Ok(())
    2763            0 :     }
    2764              : 
    2765            0 :     async fn delete_layer(
    2766            0 :         &mut self,
    2767            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2768            0 :         _ctx: &RequestContext,
    2769            0 :     ) -> anyhow::Result<()> {
    2770            0 :         self.layers_to_delete.push(layer.clone().0);
    2771            0 :         Ok(())
    2772            0 :     }
    2773              : }
    2774              : 
    2775              : impl TimelineAdaptor {
    2776            0 :     async fn create_image_impl(
    2777            0 :         &mut self,
    2778            0 :         lsn: Lsn,
    2779            0 :         key_range: &Range<Key>,
    2780            0 :         ctx: &RequestContext,
    2781            0 :     ) -> Result<(), CreateImageLayersError> {
    2782            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2783              : 
    2784            0 :         let image_layer_writer = ImageLayerWriter::new(
    2785            0 :             self.timeline.conf,
    2786            0 :             self.timeline.timeline_id,
    2787            0 :             self.timeline.tenant_shard_id,
    2788            0 :             key_range,
    2789            0 :             lsn,
    2790            0 :             ctx,
    2791            0 :         )
    2792            0 :         .await?;
    2793              : 
    2794            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2795            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2796            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2797            0 :             )))
    2798            0 :         });
    2799              : 
    2800            0 :         let keyspace = KeySpace {
    2801            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2802              :         };
    2803              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2804            0 :         let start = Key::MIN;
    2805              :         let ImageLayerCreationOutcome {
    2806            0 :             image,
    2807              :             next_start_key: _,
    2808            0 :         } = self
    2809            0 :             .timeline
    2810            0 :             .create_image_layer_for_rel_blocks(
    2811            0 :                 &keyspace,
    2812            0 :                 image_layer_writer,
    2813            0 :                 lsn,
    2814            0 :                 ctx,
    2815            0 :                 key_range.clone(),
    2816            0 :                 start,
    2817            0 :             )
    2818            0 :             .await?;
    2819              : 
    2820            0 :         if let Some(image_layer) = image {
    2821            0 :             self.new_images.push(image_layer);
    2822            0 :         }
    2823              : 
    2824            0 :         timer.stop_and_record();
    2825            0 : 
    2826            0 :         Ok(())
    2827            0 :     }
    2828              : }
    2829              : 
    2830              : impl CompactionRequestContext for crate::context::RequestContext {}
    2831              : 
    2832              : #[derive(Debug, Clone)]
    2833              : pub struct OwnArc<T>(pub Arc<T>);
    2834              : 
    2835              : impl<T> Deref for OwnArc<T> {
    2836              :     type Target = <Arc<T> as Deref>::Target;
    2837            0 :     fn deref(&self) -> &Self::Target {
    2838            0 :         &self.0
    2839            0 :     }
    2840              : }
    2841              : 
    2842              : impl<T> AsRef<T> for OwnArc<T> {
    2843            0 :     fn as_ref(&self) -> &T {
    2844            0 :         self.0.as_ref()
    2845            0 :     }
    2846              : }
    2847              : 
    2848              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2849            0 :     fn key_range(&self) -> &Range<Key> {
    2850            0 :         &self.key_range
    2851            0 :     }
    2852            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2853            0 :         &self.lsn_range
    2854            0 :     }
    2855            0 :     fn file_size(&self) -> u64 {
    2856            0 :         self.file_size
    2857            0 :     }
    2858            0 :     fn short_id(&self) -> std::string::String {
    2859            0 :         self.as_ref().short_id().to_string()
    2860            0 :     }
    2861            0 :     fn is_delta(&self) -> bool {
    2862            0 :         self.as_ref().is_delta()
    2863            0 :     }
    2864              : }
    2865              : 
    2866              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2867            0 :     fn key_range(&self) -> &Range<Key> {
    2868            0 :         &self.layer_desc().key_range
    2869            0 :     }
    2870            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2871            0 :         &self.layer_desc().lsn_range
    2872            0 :     }
    2873            0 :     fn file_size(&self) -> u64 {
    2874            0 :         self.layer_desc().file_size
    2875            0 :     }
    2876            0 :     fn short_id(&self) -> std::string::String {
    2877            0 :         self.layer_desc().short_id().to_string()
    2878            0 :     }
    2879            0 :     fn is_delta(&self) -> bool {
    2880            0 :         true
    2881            0 :     }
    2882              : }
    2883              : 
    2884              : use crate::tenant::timeline::DeltaEntry;
    2885              : 
    2886              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2887            0 :     fn key_range(&self) -> &Range<Key> {
    2888            0 :         &self.0.layer_desc().key_range
    2889            0 :     }
    2890            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2891            0 :         &self.0.layer_desc().lsn_range
    2892            0 :     }
    2893            0 :     fn file_size(&self) -> u64 {
    2894            0 :         self.0.layer_desc().file_size
    2895            0 :     }
    2896            0 :     fn short_id(&self) -> std::string::String {
    2897            0 :         self.0.layer_desc().short_id().to_string()
    2898            0 :     }
    2899            0 :     fn is_delta(&self) -> bool {
    2900            0 :         true
    2901            0 :     }
    2902              : }
    2903              : 
    2904              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2905              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2906              : 
    2907            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2908            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    2909            0 :     }
    2910              : }
    2911              : 
    2912              : impl CompactionLayer<Key> for ResidentImageLayer {
    2913            0 :     fn key_range(&self) -> &Range<Key> {
    2914            0 :         &self.0.layer_desc().key_range
    2915            0 :     }
    2916            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2917            0 :         &self.0.layer_desc().lsn_range
    2918            0 :     }
    2919            0 :     fn file_size(&self) -> u64 {
    2920            0 :         self.0.layer_desc().file_size
    2921            0 :     }
    2922            0 :     fn short_id(&self) -> std::string::String {
    2923            0 :         self.0.layer_desc().short_id().to_string()
    2924            0 :     }
    2925            0 :     fn is_delta(&self) -> bool {
    2926            0 :         false
    2927            0 :     }
    2928              : }
    2929              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta