LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: bb45db3982713bfd5bec075773079136e362195e.info Lines: 59.3 % 1952 1157
Test Date: 2024-12-11 15:53:32 Functions: 41.5 % 142 59

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
      14              :     ImageLayerCreationMode, RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::key::KEY_SIZE;
      22              : use pageserver_api::keyspace::ShardedRange;
      23              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      24              : use serde::Serialize;
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      27              : use utils::id::TimelineId;
      28              : 
      29              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache;
      31              : use crate::statvfs::Statvfs;
      32              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      33              : use crate::tenant::storage_layer::batch_split_writer::{
      34              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      35              : };
      36              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      37              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      38              : use crate::tenant::storage_layer::{
      39              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      40              : };
      41              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      42              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      43              : use crate::tenant::timeline::{Layer, ResidentLayer};
      44              : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
      45              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      46              : use pageserver_api::config::tenant_conf_defaults::{
      47              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      48              : };
      49              : 
      50              : use pageserver_api::key::Key;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::record::NeonWalRecord;
      53              : use pageserver_api::value::Value;
      54              : 
      55              : use utils::lsn::Lsn;
      56              : 
      57              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      58              : use pageserver_compaction::interface::*;
      59              : 
      60              : use super::CompactionError;
      61              : 
      62              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      63              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      64              : 
      65              : /// A scheduled compaction task.
      66              : pub(crate) struct ScheduledCompactionTask {
      67              :     pub options: CompactOptions,
      68              :     /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
      69              :     pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
      70              :     /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
      71              :     pub gc_block: Option<gc_block::Guard>,
      72              : }
      73              : 
      74              : pub struct GcCompactionJobDescription {
      75              :     /// All layers to read in the compaction job
      76              :     selected_layers: Vec<Layer>,
      77              :     /// GC cutoff of the job
      78              :     gc_cutoff: Lsn,
      79              :     /// LSNs to retain for the job
      80              :     retain_lsns_below_horizon: Vec<Lsn>,
      81              :     /// Maximum layer LSN processed in this compaction
      82              :     max_layer_lsn: Lsn,
      83              :     /// Only compact layers overlapping with this range
      84              :     compaction_key_range: Range<Key>,
      85              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
      86              :     /// This field is here solely for debugging. The field will not be read once the compaction
      87              :     /// description is generated.
      88              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
      89              : }
      90              : 
      91              : /// The result of bottom-most compaction for a single key at each LSN.
      92              : #[derive(Debug)]
      93              : #[cfg_attr(test, derive(PartialEq))]
      94              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
      95              : 
      96              : /// The result of bottom-most compaction.
      97              : #[derive(Debug)]
      98              : #[cfg_attr(test, derive(PartialEq))]
      99              : pub(crate) struct KeyHistoryRetention {
     100              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     101              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     102              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     103              :     pub(crate) above_horizon: KeyLogAtLsn,
     104              : }
     105              : 
     106              : impl KeyHistoryRetention {
     107              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     108              :     ///
     109              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     110              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     111              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     112              :     /// Then we delete branch @ 0x20.
     113              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     114              :     /// And that wouldnt' change the shape of the layer.
     115              :     ///
     116              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     117              :     ///
     118              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     119           58 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     120           58 :         if dry_run {
     121            0 :             return true;
     122           58 :         }
     123           58 :         let guard = tline.layers.read().await;
     124           58 :         if !guard.contains_key(key) {
     125           38 :             return false;
     126           20 :         }
     127           20 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     128           20 :         drop(guard);
     129           20 :         if layer_generation == tline.generation {
     130           20 :             info!(
     131              :                 key=%key,
     132              :                 ?layer_generation,
     133            0 :                 "discard layer due to duplicated layer key in the same generation",
     134              :             );
     135           20 :             true
     136              :         } else {
     137            0 :             false
     138              :         }
     139           58 :     }
     140              : 
     141              :     /// Pipe a history of a single key to the writers.
     142              :     ///
     143              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     144              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     145              :     #[allow(clippy::too_many_arguments)]
     146          530 :     async fn pipe_to(
     147          530 :         self,
     148          530 :         key: Key,
     149          530 :         delta_writer: &mut SplitDeltaLayerWriter,
     150          530 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     151          530 :         stat: &mut CompactionStatistics,
     152          530 :         ctx: &RequestContext,
     153          530 :     ) -> anyhow::Result<()> {
     154          530 :         let mut first_batch = true;
     155         1658 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     156         1128 :             if first_batch {
     157          530 :                 if logs.len() == 1 && logs[0].1.is_image() {
     158          516 :                     let Value::Image(img) = &logs[0].1 else {
     159            0 :                         unreachable!()
     160              :                     };
     161          516 :                     stat.produce_image_key(img);
     162          516 :                     if let Some(image_writer) = image_writer.as_mut() {
     163          516 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     164              :                     } else {
     165            0 :                         delta_writer
     166            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     167            0 :                             .await?;
     168              :                     }
     169              :                 } else {
     170           28 :                     for (lsn, val) in logs {
     171           14 :                         stat.produce_key(&val);
     172           14 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     173              :                     }
     174              :                 }
     175          530 :                 first_batch = false;
     176              :             } else {
     177          684 :                 for (lsn, val) in logs {
     178           86 :                     stat.produce_key(&val);
     179           86 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     180              :                 }
     181              :             }
     182              :         }
     183          530 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     184          568 :         for (lsn, val) in above_horizon_logs {
     185           38 :             stat.produce_key(&val);
     186           38 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     187              :         }
     188          530 :         Ok(())
     189          530 :     }
     190              : }
     191              : 
     192              : #[derive(Debug, Serialize, Default)]
     193              : struct CompactionStatisticsNumSize {
     194              :     num: u64,
     195              :     size: u64,
     196              : }
     197              : 
     198              : #[derive(Debug, Serialize, Default)]
     199              : pub struct CompactionStatistics {
     200              :     delta_layer_visited: CompactionStatisticsNumSize,
     201              :     image_layer_visited: CompactionStatisticsNumSize,
     202              :     delta_layer_produced: CompactionStatisticsNumSize,
     203              :     image_layer_produced: CompactionStatisticsNumSize,
     204              :     num_delta_layer_discarded: usize,
     205              :     num_image_layer_discarded: usize,
     206              :     num_unique_keys_visited: usize,
     207              :     wal_keys_visited: CompactionStatisticsNumSize,
     208              :     image_keys_visited: CompactionStatisticsNumSize,
     209              :     wal_produced: CompactionStatisticsNumSize,
     210              :     image_produced: CompactionStatisticsNumSize,
     211              : }
     212              : 
     213              : impl CompactionStatistics {
     214          846 :     fn estimated_size_of_value(val: &Value) -> usize {
     215          304 :         match val {
     216          542 :             Value::Image(img) => img.len(),
     217            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     218          304 :             _ => std::mem::size_of::<NeonWalRecord>(),
     219              :         }
     220          846 :     }
     221         1372 :     fn estimated_size_of_key() -> usize {
     222         1372 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     223         1372 :     }
     224           62 :     fn visit_delta_layer(&mut self, size: u64) {
     225           62 :         self.delta_layer_visited.num += 1;
     226           62 :         self.delta_layer_visited.size += size;
     227           62 :     }
     228           58 :     fn visit_image_layer(&mut self, size: u64) {
     229           58 :         self.image_layer_visited.num += 1;
     230           58 :         self.image_layer_visited.size += size;
     231           58 :     }
     232          530 :     fn on_unique_key_visited(&mut self) {
     233          530 :         self.num_unique_keys_visited += 1;
     234          530 :     }
     235          176 :     fn visit_wal_key(&mut self, val: &Value) {
     236          176 :         self.wal_keys_visited.num += 1;
     237          176 :         self.wal_keys_visited.size +=
     238          176 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     239          176 :     }
     240          542 :     fn visit_image_key(&mut self, val: &Value) {
     241          542 :         self.image_keys_visited.num += 1;
     242          542 :         self.image_keys_visited.size +=
     243          542 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     244          542 :     }
     245          138 :     fn produce_key(&mut self, val: &Value) {
     246          138 :         match val {
     247           10 :             Value::Image(img) => self.produce_image_key(img),
     248          128 :             Value::WalRecord(_) => self.produce_wal_key(val),
     249              :         }
     250          138 :     }
     251          128 :     fn produce_wal_key(&mut self, val: &Value) {
     252          128 :         self.wal_produced.num += 1;
     253          128 :         self.wal_produced.size +=
     254          128 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     255          128 :     }
     256          526 :     fn produce_image_key(&mut self, val: &Bytes) {
     257          526 :         self.image_produced.num += 1;
     258          526 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     259          526 :     }
     260           12 :     fn discard_delta_layer(&mut self) {
     261           12 :         self.num_delta_layer_discarded += 1;
     262           12 :     }
     263            8 :     fn discard_image_layer(&mut self) {
     264            8 :         self.num_image_layer_discarded += 1;
     265            8 :     }
     266           12 :     fn produce_delta_layer(&mut self, size: u64) {
     267           12 :         self.delta_layer_produced.num += 1;
     268           12 :         self.delta_layer_produced.size += size;
     269           12 :     }
     270           26 :     fn produce_image_layer(&mut self, size: u64) {
     271           26 :         self.image_layer_produced.num += 1;
     272           26 :         self.image_layer_produced.size += size;
     273           26 :     }
     274              : }
     275              : 
     276              : impl Timeline {
     277              :     /// TODO: cancellation
     278              :     ///
     279              :     /// Returns whether the compaction has pending tasks.
     280          364 :     pub(crate) async fn compact_legacy(
     281          364 :         self: &Arc<Self>,
     282          364 :         cancel: &CancellationToken,
     283          364 :         options: CompactOptions,
     284          364 :         ctx: &RequestContext,
     285          364 :     ) -> Result<bool, CompactionError> {
     286          364 :         if options
     287          364 :             .flags
     288          364 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     289              :         {
     290            0 :             self.compact_with_gc(cancel, options, ctx)
     291            0 :                 .await
     292            0 :                 .map_err(CompactionError::Other)?;
     293            0 :             return Ok(false);
     294          364 :         }
     295          364 : 
     296          364 :         if options.flags.contains(CompactFlags::DryRun) {
     297            0 :             return Err(CompactionError::Other(anyhow!(
     298            0 :                 "dry-run mode is not supported for legacy compaction for now"
     299            0 :             )));
     300          364 :         }
     301          364 : 
     302          364 :         if options.compact_range.is_some() {
     303              :             // maybe useful in the future? could implement this at some point
     304            0 :             return Err(CompactionError::Other(anyhow!(
     305            0 :                 "compaction range is not supported for legacy compaction for now"
     306            0 :             )));
     307          364 :         }
     308          364 : 
     309          364 :         // High level strategy for compaction / image creation:
     310          364 :         //
     311          364 :         // 1. First, calculate the desired "partitioning" of the
     312          364 :         // currently in-use key space. The goal is to partition the
     313          364 :         // key space into roughly fixed-size chunks, but also take into
     314          364 :         // account any existing image layers, and try to align the
     315          364 :         // chunk boundaries with the existing image layers to avoid
     316          364 :         // too much churn. Also try to align chunk boundaries with
     317          364 :         // relation boundaries.  In principle, we don't know about
     318          364 :         // relation boundaries here, we just deal with key-value
     319          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     320          364 :         // map relations into key-value pairs. But in practice we know
     321          364 :         // that 'field6' is the block number, and the fields 1-5
     322          364 :         // identify a relation. This is just an optimization,
     323          364 :         // though.
     324          364 :         //
     325          364 :         // 2. Once we know the partitioning, for each partition,
     326          364 :         // decide if it's time to create a new image layer. The
     327          364 :         // criteria is: there has been too much "churn" since the last
     328          364 :         // image layer? The "churn" is fuzzy concept, it's a
     329          364 :         // combination of too many delta files, or too much WAL in
     330          364 :         // total in the delta file. Or perhaps: if creating an image
     331          364 :         // file would allow to delete some older files.
     332          364 :         //
     333          364 :         // 3. After that, we compact all level0 delta files if there
     334          364 :         // are too many of them.  While compacting, we also garbage
     335          364 :         // collect any page versions that are no longer needed because
     336          364 :         // of the new image layers we created in step 2.
     337          364 :         //
     338          364 :         // TODO: This high level strategy hasn't been implemented yet.
     339          364 :         // Below are functions compact_level0() and create_image_layers()
     340          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     341          364 :         // above. Rewrite it.
     342          364 : 
     343          364 :         // Is the timeline being deleted?
     344          364 :         if self.is_stopping() {
     345            0 :             trace!("Dropping out of compaction on timeline shutdown");
     346            0 :             return Err(CompactionError::ShuttingDown);
     347          364 :         }
     348          364 : 
     349          364 :         let target_file_size = self.get_checkpoint_distance();
     350              : 
     351              :         // Define partitioning schema if needed
     352              : 
     353              :         // FIXME: the match should only cover repartitioning, not the next steps
     354          364 :         let (partition_count, has_pending_tasks) = match self
     355          364 :             .repartition(
     356          364 :                 self.get_last_record_lsn(),
     357          364 :                 self.get_compaction_target_size(),
     358          364 :                 options.flags,
     359          364 :                 ctx,
     360          364 :             )
     361          364 :             .await
     362              :         {
     363          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     364          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     365          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     366          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     367          364 :                     .build();
     368          364 : 
     369          364 :                 // 2. Compact
     370          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     371          364 :                 let fully_compacted = self
     372          364 :                     .compact_level0(
     373          364 :                         target_file_size,
     374          364 :                         options.flags.contains(CompactFlags::ForceL0Compaction),
     375          364 :                         ctx,
     376          364 :                     )
     377          364 :                     .await?;
     378          364 :                 timer.stop_and_record();
     379          364 : 
     380          364 :                 let mut partitioning = dense_partitioning;
     381          364 :                 partitioning
     382          364 :                     .parts
     383          364 :                     .extend(sparse_partitioning.into_dense().parts);
     384          364 : 
     385          364 :                 // 3. Create new image layers for partitions that have been modified
     386          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     387          364 :                 if fully_compacted {
     388          364 :                     let image_layers = self
     389          364 :                         .create_image_layers(
     390          364 :                             &partitioning,
     391          364 :                             lsn,
     392          364 :                             if options
     393          364 :                                 .flags
     394          364 :                                 .contains(CompactFlags::ForceImageLayerCreation)
     395              :                             {
     396           14 :                                 ImageLayerCreationMode::Force
     397              :                             } else {
     398          350 :                                 ImageLayerCreationMode::Try
     399              :                             },
     400          364 :                             &image_ctx,
     401          364 :                         )
     402          364 :                         .await?;
     403              : 
     404          364 :                     self.upload_new_image_layers(image_layers)?;
     405              :                 } else {
     406            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     407              :                 }
     408          364 :                 (partitioning.parts.len(), !fully_compacted)
     409              :             }
     410            0 :             Err(err) => {
     411            0 :                 // no partitioning? This is normal, if the timeline was just created
     412            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     413            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     414            0 :                 // error but continue.
     415            0 :                 //
     416            0 :                 // Suppress error when it's due to cancellation
     417            0 :                 if !self.cancel.is_cancelled() && !err.is_cancelled() {
     418            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     419            0 :                 }
     420            0 :                 (1, false)
     421              :             }
     422              :         };
     423              : 
     424          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     425              :             // Limit the number of layer rewrites to the number of partitions: this means its
     426              :             // runtime should be comparable to a full round of image layer creations, rather than
     427              :             // being potentially much longer.
     428            0 :             let rewrite_max = partition_count;
     429            0 : 
     430            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     431          364 :         }
     432              : 
     433          364 :         Ok(has_pending_tasks)
     434          364 :     }
     435              : 
     436              :     /// Check for layers that are elegible to be rewritten:
     437              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     438              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     439              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     440              :     ///   rather not maintain compatibility with indefinitely.
     441              :     ///
     442              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     443              :     /// how much work it will try to do in each compaction pass.
     444            0 :     async fn compact_shard_ancestors(
     445            0 :         self: &Arc<Self>,
     446            0 :         rewrite_max: usize,
     447            0 :         ctx: &RequestContext,
     448            0 :     ) -> Result<(), CompactionError> {
     449            0 :         let mut drop_layers = Vec::new();
     450            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     451            0 : 
     452            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     453            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     454            0 :         // pitr_interval, for example because a branchpoint references it.
     455            0 :         //
     456            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     457            0 :         // are rewriting layers.
     458            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     459            0 : 
     460            0 :         tracing::info!(
     461            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     462            0 :             *latest_gc_cutoff,
     463            0 :             self.gc_info.read().unwrap().cutoffs.time
     464              :         );
     465              : 
     466            0 :         let layers = self.layers.read().await;
     467            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     468            0 :             let layer = layers.get_from_desc(&layer_desc);
     469            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     470              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     471            0 :                 continue;
     472            0 :             }
     473            0 : 
     474            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     475            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     476            0 :             let layer_local_page_count = sharded_range.page_count();
     477            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     478            0 :             if layer_local_page_count == 0 {
     479              :                 // This ancestral layer only covers keys that belong to other shards.
     480              :                 // We include the full metadata in the log: if we had some critical bug that caused
     481              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     482            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     483            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     484              :                 );
     485              : 
     486            0 :                 if cfg!(debug_assertions) {
     487              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     488              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     489              :                     // should be !is_key_disposable()
     490            0 :                     let range = layer_desc.get_key_range();
     491            0 :                     let mut key = range.start;
     492            0 :                     while key < range.end {
     493            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     494            0 :                         key = key.next();
     495              :                     }
     496            0 :                 }
     497              : 
     498            0 :                 drop_layers.push(layer);
     499            0 :                 continue;
     500            0 :             } else if layer_local_page_count != u32::MAX
     501            0 :                 && layer_local_page_count == layer_raw_page_count
     502              :             {
     503            0 :                 debug!(%layer,
     504            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     505              :                     layer_local_page_count
     506              :                 );
     507            0 :                 continue;
     508            0 :             }
     509            0 : 
     510            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     511            0 :             if layer_local_page_count != u32::MAX
     512            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     513              :             {
     514            0 :                 debug!(%layer,
     515            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     516              :                     layer_local_page_count,
     517              :                     layer_raw_page_count
     518              :                 );
     519            0 :             }
     520              : 
     521              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     522              :             // without incurring the I/O cost of a rewrite.
     523            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     524            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     525            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     526            0 :                 continue;
     527            0 :             }
     528            0 : 
     529            0 :             if layer_desc.is_delta() {
     530              :                 // We do not yet implement rewrite of delta layers
     531            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     532            0 :                 continue;
     533            0 :             }
     534            0 : 
     535            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     536            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     537            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     538            0 :             if layer.metadata().generation == self.generation {
     539            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     540            0 :                 continue;
     541            0 :             }
     542            0 : 
     543            0 :             if layers_to_rewrite.len() >= rewrite_max {
     544            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     545            0 :                     layers_to_rewrite.len()
     546              :                 );
     547            0 :                 continue;
     548            0 :             }
     549            0 : 
     550            0 :             // Fall through: all our conditions for doing a rewrite passed.
     551            0 :             layers_to_rewrite.push(layer);
     552              :         }
     553              : 
     554              :         // Drop read lock on layer map before we start doing time-consuming I/O
     555            0 :         drop(layers);
     556            0 : 
     557            0 :         let mut replace_image_layers = Vec::new();
     558              : 
     559            0 :         for layer in layers_to_rewrite {
     560            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     561            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     562            0 :                 self.conf,
     563            0 :                 self.timeline_id,
     564            0 :                 self.tenant_shard_id,
     565            0 :                 &layer.layer_desc().key_range,
     566            0 :                 layer.layer_desc().image_layer_lsn(),
     567            0 :                 ctx,
     568            0 :             )
     569            0 :             .await
     570            0 :             .map_err(CompactionError::Other)?;
     571              : 
     572              :             // Safety of layer rewrites:
     573              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     574              :             //   cannot interfere with the new one.
     575              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     576              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     577              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     578              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     579              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     580              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     581              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     582              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     583              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     584              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     585            0 :             let resident = layer.download_and_keep_resident().await?;
     586              : 
     587            0 :             let keys_written = resident
     588            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     589            0 :                 .await?;
     590              : 
     591            0 :             if keys_written > 0 {
     592            0 :                 let (desc, path) = image_layer_writer
     593            0 :                     .finish(ctx)
     594            0 :                     .await
     595            0 :                     .map_err(CompactionError::Other)?;
     596            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     597            0 :                     .map_err(CompactionError::Other)?;
     598            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     599            0 :                     layer.metadata().file_size,
     600            0 :                     new_layer.metadata().file_size);
     601              : 
     602            0 :                 replace_image_layers.push((layer, new_layer));
     603            0 :             } else {
     604            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     605            0 :                 // the layer has no data for us with the ShardedRange check above, but
     606            0 :                 drop_layers.push(layer);
     607            0 :             }
     608              :         }
     609              : 
     610              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     611              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     612              :         // to remote index) and be removed. This is inefficient but safe.
     613            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     614            0 : 
     615            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     616            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     617            0 :             .await?;
     618              : 
     619            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     620            0 : 
     621            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     622            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     623            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     624            0 :         // load.
     625            0 :         match self.remote_client.wait_completion().await {
     626            0 :             Ok(()) => (),
     627            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     628              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     629            0 :                 return Err(CompactionError::ShuttingDown)
     630              :             }
     631              :         }
     632              : 
     633            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     634            0 : 
     635            0 :         Ok(())
     636            0 :     }
     637              : 
     638              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     639              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     640              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     641              :     ///
     642              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     643              :     /// that we know won't be needed for reads.
     644          198 :     pub(super) async fn update_layer_visibility(
     645          198 :         &self,
     646          198 :     ) -> Result<(), super::layer_manager::Shutdown> {
     647          198 :         let head_lsn = self.get_last_record_lsn();
     648              : 
     649              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     650              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     651              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     652              :         // they will be subject to L0->L1 compaction in the near future.
     653          198 :         let layer_manager = self.layers.read().await;
     654          198 :         let layer_map = layer_manager.layer_map()?;
     655              : 
     656          198 :         let readable_points = {
     657          198 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     658          198 : 
     659          198 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     660          198 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
     661            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
     662            0 :                     continue;
     663            0 :                 }
     664            0 :                 readable_points.push(*child_lsn);
     665              :             }
     666          198 :             readable_points.push(head_lsn);
     667          198 :             readable_points
     668          198 :         };
     669          198 : 
     670          198 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     671          516 :         for (layer_desc, visibility) in layer_visibility {
     672          318 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     673          318 :             let layer = layer_manager.get_from_desc(&layer_desc);
     674          318 :             layer.set_visibility(visibility);
     675          318 :         }
     676              : 
     677              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     678              :         // avoid assuming that everything at a branch point is visible.
     679          198 :         drop(covered);
     680          198 :         Ok(())
     681          198 :     }
     682              : 
     683              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     684              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     685          364 :     async fn compact_level0(
     686          364 :         self: &Arc<Self>,
     687          364 :         target_file_size: u64,
     688          364 :         force_compaction_ignore_threshold: bool,
     689          364 :         ctx: &RequestContext,
     690          364 :     ) -> Result<bool, CompactionError> {
     691              :         let CompactLevel0Phase1Result {
     692          364 :             new_layers,
     693          364 :             deltas_to_compact,
     694          364 :             fully_compacted,
     695              :         } = {
     696          364 :             let phase1_span = info_span!("compact_level0_phase1");
     697          364 :             let ctx = ctx.attached_child();
     698          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     699          364 :                 version: Some(2),
     700          364 :                 tenant_id: Some(self.tenant_shard_id),
     701          364 :                 timeline_id: Some(self.timeline_id),
     702          364 :                 ..Default::default()
     703          364 :             };
     704          364 : 
     705          364 :             let begin = tokio::time::Instant::now();
     706          364 :             let phase1_layers_locked = self.layers.read().await;
     707          364 :             let now = tokio::time::Instant::now();
     708          364 :             stats.read_lock_acquisition_micros =
     709          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     710          364 :             self.compact_level0_phase1(
     711          364 :                 phase1_layers_locked,
     712          364 :                 stats,
     713          364 :                 target_file_size,
     714          364 :                 force_compaction_ignore_threshold,
     715          364 :                 &ctx,
     716          364 :             )
     717          364 :             .instrument(phase1_span)
     718          364 :             .await?
     719              :         };
     720              : 
     721          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     722              :             // nothing to do
     723          336 :             return Ok(true);
     724           28 :         }
     725           28 : 
     726           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     727           28 :             .await?;
     728           28 :         Ok(fully_compacted)
     729          364 :     }
     730              : 
     731              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     732          364 :     async fn compact_level0_phase1<'a>(
     733          364 :         self: &'a Arc<Self>,
     734          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     735          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     736          364 :         target_file_size: u64,
     737          364 :         force_compaction_ignore_threshold: bool,
     738          364 :         ctx: &RequestContext,
     739          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     740          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     741          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     742          364 :         let layers = guard.layer_map()?;
     743          364 :         let level0_deltas = layers.level0_deltas();
     744          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     745          364 : 
     746          364 :         // Only compact if enough layers have accumulated.
     747          364 :         let threshold = self.get_compaction_threshold();
     748          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     749          336 :             if force_compaction_ignore_threshold {
     750            0 :                 if !level0_deltas.is_empty() {
     751            0 :                     info!(
     752            0 :                         level0_deltas = level0_deltas.len(),
     753            0 :                         threshold, "too few deltas to compact, but forcing compaction"
     754              :                     );
     755              :                 } else {
     756            0 :                     info!(
     757            0 :                         level0_deltas = level0_deltas.len(),
     758            0 :                         threshold, "too few deltas to compact, cannot force compaction"
     759              :                     );
     760            0 :                     return Ok(CompactLevel0Phase1Result::default());
     761              :                 }
     762              :             } else {
     763          336 :                 debug!(
     764            0 :                     level0_deltas = level0_deltas.len(),
     765            0 :                     threshold, "too few deltas to compact"
     766              :                 );
     767          336 :                 return Ok(CompactLevel0Phase1Result::default());
     768              :             }
     769           28 :         }
     770              : 
     771           28 :         let mut level0_deltas = level0_deltas
     772           28 :             .iter()
     773          402 :             .map(|x| guard.get_from_desc(x))
     774           28 :             .collect::<Vec<_>>();
     775           28 : 
     776           28 :         // Gather the files to compact in this iteration.
     777           28 :         //
     778           28 :         // Start with the oldest Level 0 delta file, and collect any other
     779           28 :         // level 0 files that form a contiguous sequence, such that the end
     780           28 :         // LSN of previous file matches the start LSN of the next file.
     781           28 :         //
     782           28 :         // Note that if the files don't form such a sequence, we might
     783           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     784           28 :         // us to get rid of the level 0 file, and compact the other files on
     785           28 :         // the next iteration. This could probably made smarter, but such
     786           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     787           28 :         // of a crash, partial download from cloud storage, or something like
     788           28 :         // that, so it's not a big deal in practice.
     789          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     790           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     791           28 : 
     792           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     793           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     794           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     795           28 : 
     796           28 :         // Accumulate the size of layers in `deltas_to_compact`
     797           28 :         let mut deltas_to_compact_bytes = 0;
     798           28 : 
     799           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     800           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     801           28 :         // work in this function to only operate on this much delta data at once.
     802           28 :         //
     803           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     804           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     805           28 :         // still let them compact a full stack of L0s in one go.
     806           28 :         let delta_size_limit = std::cmp::max(
     807           28 :             self.get_compaction_threshold(),
     808           28 :             DEFAULT_COMPACTION_THRESHOLD,
     809           28 :         ) as u64
     810           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     811           28 : 
     812           28 :         let mut fully_compacted = true;
     813           28 : 
     814           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     815          402 :         for l in level0_deltas_iter {
     816          374 :             let lsn_range = &l.layer_desc().lsn_range;
     817          374 : 
     818          374 :             if lsn_range.start != prev_lsn_end {
     819            0 :                 break;
     820          374 :             }
     821          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     822          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     823          374 :             prev_lsn_end = lsn_range.end;
     824          374 : 
     825          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     826            0 :                 info!(
     827            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     828            0 :                     l0_deltas_total = level0_deltas.len(),
     829            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     830              :                     delta_size_limit
     831              :                 );
     832            0 :                 fully_compacted = false;
     833            0 : 
     834            0 :                 // Proceed with compaction, but only a subset of L0s
     835            0 :                 break;
     836          374 :             }
     837              :         }
     838           28 :         let lsn_range = Range {
     839           28 :             start: deltas_to_compact
     840           28 :                 .first()
     841           28 :                 .unwrap()
     842           28 :                 .layer_desc()
     843           28 :                 .lsn_range
     844           28 :                 .start,
     845           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     846           28 :         };
     847           28 : 
     848           28 :         info!(
     849            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     850            0 :             lsn_range.start,
     851            0 :             lsn_range.end,
     852            0 :             deltas_to_compact.len(),
     853            0 :             level0_deltas.len()
     854              :         );
     855              : 
     856          402 :         for l in deltas_to_compact.iter() {
     857          402 :             info!("compact includes {l}");
     858              :         }
     859              : 
     860              :         // We don't need the original list of layers anymore. Drop it so that
     861              :         // we don't accidentally use it later in the function.
     862           28 :         drop(level0_deltas);
     863           28 : 
     864           28 :         stats.read_lock_held_prerequisites_micros = stats
     865           28 :             .read_lock_held_spawn_blocking_startup_micros
     866           28 :             .till_now();
     867              : 
     868              :         // TODO: replace with streaming k-merge
     869           28 :         let all_keys = {
     870           28 :             let mut all_keys = Vec::new();
     871          402 :             for l in deltas_to_compact.iter() {
     872          402 :                 if self.cancel.is_cancelled() {
     873            0 :                     return Err(CompactionError::ShuttingDown);
     874          402 :                 }
     875          402 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     876          402 :                 let keys = delta
     877          402 :                     .index_entries(ctx)
     878          402 :                     .await
     879          402 :                     .map_err(CompactionError::Other)?;
     880          402 :                 all_keys.extend(keys);
     881              :             }
     882              :             // The current stdlib sorting implementation is designed in a way where it is
     883              :             // particularly fast where the slice is made up of sorted sub-ranges.
     884      4423784 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     885           28 :             all_keys
     886           28 :         };
     887           28 : 
     888           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     889              : 
     890              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     891              :         //
     892              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     893              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     894              :         // cover the hole, but actually don't contain any WAL records for that key range.
     895              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     896              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     897              :         //
     898              :         // The algorithm chooses holes as follows.
     899              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     900              :         // - Filter: min threshold on range length
     901              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     902              :         //
     903              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     904              :         #[derive(PartialEq, Eq)]
     905              :         struct Hole {
     906              :             key_range: Range<Key>,
     907              :             coverage_size: usize,
     908              :         }
     909           28 :         let holes: Vec<Hole> = {
     910              :             use std::cmp::Ordering;
     911              :             impl Ord for Hole {
     912            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     913            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     914            0 :                 }
     915              :             }
     916              :             impl PartialOrd for Hole {
     917            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     918            0 :                     Some(self.cmp(other))
     919            0 :                 }
     920              :             }
     921           28 :             let max_holes = deltas_to_compact.len();
     922           28 :             let last_record_lsn = self.get_last_record_lsn();
     923           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     924           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     925           28 :                                             // min-heap (reserve space for one more element added before eviction)
     926           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     927           28 :             let mut prev: Option<Key> = None;
     928              : 
     929      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     930      2064038 :                 if let Some(prev_key) = prev {
     931              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     932              :                     // compaction is the gap between data key and metadata keys.
     933      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     934            0 :                         && !Key::is_metadata_key(&prev_key)
     935              :                     {
     936            0 :                         let key_range = prev_key..next_key;
     937            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     938            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     939            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     940            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     941            0 :                         let coverage_size =
     942            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     943            0 :                         if coverage_size >= min_hole_coverage_size {
     944            0 :                             heap.push(Hole {
     945            0 :                                 key_range,
     946            0 :                                 coverage_size,
     947            0 :                             });
     948            0 :                             if heap.len() > max_holes {
     949            0 :                                 heap.pop(); // remove smallest hole
     950            0 :                             }
     951            0 :                         }
     952      2064010 :                     }
     953           28 :                 }
     954      2064038 :                 prev = Some(next_key.next());
     955              :             }
     956           28 :             let mut holes = heap.into_vec();
     957           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
     958           28 :             holes
     959           28 :         };
     960           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     961           28 :         drop_rlock(guard);
     962           28 : 
     963           28 :         if self.cancel.is_cancelled() {
     964            0 :             return Err(CompactionError::ShuttingDown);
     965           28 :         }
     966           28 : 
     967           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     968              : 
     969              :         // This iterator walks through all key-value pairs from all the layers
     970              :         // we're compacting, in key, LSN order.
     971              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
     972              :         // then the Value::Image is ordered before Value::WalRecord.
     973           28 :         let mut all_values_iter = {
     974           28 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
     975          402 :             for l in deltas_to_compact.iter() {
     976          402 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     977          402 :                 deltas.push(l);
     978              :             }
     979           28 :             MergeIterator::create(&deltas, &[], ctx)
     980           28 :         };
     981           28 : 
     982           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
     983           28 :         let mut all_keys_iter = all_keys
     984           28 :             .iter()
     985      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     986      2064010 :             .coalesce(|mut prev, cur| {
     987      2064010 :                 // Coalesce keys that belong to the same key pair.
     988      2064010 :                 // This ensures that compaction doesn't put them
     989      2064010 :                 // into different layer files.
     990      2064010 :                 // Still limit this by the target file size,
     991      2064010 :                 // so that we keep the size of the files in
     992      2064010 :                 // check.
     993      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     994        40038 :                     prev.2 += cur.2;
     995        40038 :                     Ok(prev)
     996              :                 } else {
     997      2023972 :                     Err((prev, cur))
     998              :                 }
     999      2064010 :             });
    1000           28 : 
    1001           28 :         // Merge the contents of all the input delta layers into a new set
    1002           28 :         // of delta layers, based on the current partitioning.
    1003           28 :         //
    1004           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1005           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1006           28 :         // would be too large. In that case, we also split on the LSN dimension.
    1007           28 :         //
    1008           28 :         // LSN
    1009           28 :         //  ^
    1010           28 :         //  |
    1011           28 :         //  | +-----------+            +--+--+--+--+
    1012           28 :         //  | |           |            |  |  |  |  |
    1013           28 :         //  | +-----------+            |  |  |  |  |
    1014           28 :         //  | |           |            |  |  |  |  |
    1015           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1016           28 :         //  | |           |            |  |  |  |  |
    1017           28 :         //  | +-----------+            |  |  |  |  |
    1018           28 :         //  | |           |            |  |  |  |  |
    1019           28 :         //  | +-----------+            +--+--+--+--+
    1020           28 :         //  |
    1021           28 :         //  +--------------> key
    1022           28 :         //
    1023           28 :         //
    1024           28 :         // If one key (X) has a lot of page versions:
    1025           28 :         //
    1026           28 :         // LSN
    1027           28 :         //  ^
    1028           28 :         //  |                                 (X)
    1029           28 :         //  | +-----------+            +--+--+--+--+
    1030           28 :         //  | |           |            |  |  |  |  |
    1031           28 :         //  | +-----------+            |  |  +--+  |
    1032           28 :         //  | |           |            |  |  |  |  |
    1033           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1034           28 :         //  | |           |            |  |  +--+  |
    1035           28 :         //  | +-----------+            |  |  |  |  |
    1036           28 :         //  | |           |            |  |  |  |  |
    1037           28 :         //  | +-----------+            +--+--+--+--+
    1038           28 :         //  |
    1039           28 :         //  +--------------> key
    1040           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1041           28 :         // based on the partitioning.
    1042           28 :         //
    1043           28 :         // TODO: we should also opportunistically materialize and
    1044           28 :         // garbage collect what we can.
    1045           28 :         let mut new_layers = Vec::new();
    1046           28 :         let mut prev_key: Option<Key> = None;
    1047           28 :         let mut writer: Option<DeltaLayerWriter> = None;
    1048           28 :         let mut key_values_total_size = 0u64;
    1049           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1050           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1051           28 :         let mut next_hole = 0; // index of next hole in holes vector
    1052           28 : 
    1053           28 :         let mut keys = 0;
    1054              : 
    1055      2064066 :         while let Some((key, lsn, value)) = all_values_iter
    1056      2064066 :             .next()
    1057      2064066 :             .await
    1058      2064066 :             .map_err(CompactionError::Other)?
    1059              :         {
    1060      2064038 :             keys += 1;
    1061      2064038 : 
    1062      2064038 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1063              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1064              :                 // shuffling an order of million keys per layer, this means we'll check it
    1065              :                 // around tens of times per layer.
    1066            0 :                 return Err(CompactionError::ShuttingDown);
    1067      2064038 :             }
    1068      2064038 : 
    1069      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    1070      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1071      2064038 :             if !same_key || lsn == dup_end_lsn {
    1072      2024000 :                 let mut next_key_size = 0u64;
    1073      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1074      2024000 :                 dup_start_lsn = Lsn::INVALID;
    1075      2024000 :                 if !same_key {
    1076      2024000 :                     dup_end_lsn = Lsn::INVALID;
    1077      2024000 :                 }
    1078              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1079      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1080      2024000 :                     next_key_size = next_size;
    1081      2024000 :                     if key != next_key {
    1082      2023972 :                         if dup_end_lsn.is_valid() {
    1083            0 :                             // We are writting segment with duplicates:
    1084            0 :                             // place all remaining values of this key in separate segment
    1085            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1086            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1087      2023972 :                         }
    1088      2023972 :                         break;
    1089           28 :                     }
    1090           28 :                     key_values_total_size += next_size;
    1091           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1092           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1093           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1094              :                         // Split key between multiple layers: such layer can contain only single key
    1095            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1096            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1097              :                         } else {
    1098            0 :                             lsn // start with the first LSN for this key
    1099              :                         };
    1100            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1101            0 :                         break;
    1102           28 :                     }
    1103              :                 }
    1104              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1105      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1106            0 :                     dup_start_lsn = dup_end_lsn;
    1107            0 :                     dup_end_lsn = lsn_range.end;
    1108      2024000 :                 }
    1109      2024000 :                 if writer.is_some() {
    1110      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1111      2023972 :                     let contains_hole =
    1112      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1113              :                     // check if key cause layer overflow or contains hole...
    1114      2023972 :                     if is_dup_layer
    1115      2023972 :                         || dup_end_lsn.is_valid()
    1116      2023972 :                         || written_size + key_values_total_size > target_file_size
    1117      2023692 :                         || contains_hole
    1118              :                     {
    1119              :                         // ... if so, flush previous layer and prepare to write new one
    1120          280 :                         let (desc, path) = writer
    1121          280 :                             .take()
    1122          280 :                             .unwrap()
    1123          280 :                             .finish(prev_key.unwrap().next(), ctx)
    1124          280 :                             .await
    1125          280 :                             .map_err(CompactionError::Other)?;
    1126          280 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1127          280 :                             .map_err(CompactionError::Other)?;
    1128              : 
    1129          280 :                         new_layers.push(new_delta);
    1130          280 :                         writer = None;
    1131          280 : 
    1132          280 :                         if contains_hole {
    1133            0 :                             // skip hole
    1134            0 :                             next_hole += 1;
    1135          280 :                         }
    1136      2023692 :                     }
    1137           28 :                 }
    1138              :                 // Remember size of key value because at next iteration we will access next item
    1139      2024000 :                 key_values_total_size = next_key_size;
    1140        40038 :             }
    1141      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1142            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1143            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1144            0 :                 )))
    1145      2064038 :             });
    1146              : 
    1147      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1148      2064038 :                 if writer.is_none() {
    1149          308 :                     if self.cancel.is_cancelled() {
    1150              :                         // to be somewhat responsive to cancellation, check for each new layer
    1151            0 :                         return Err(CompactionError::ShuttingDown);
    1152          308 :                     }
    1153              :                     // Create writer if not initiaized yet
    1154          308 :                     writer = Some(
    1155              :                         DeltaLayerWriter::new(
    1156          308 :                             self.conf,
    1157          308 :                             self.timeline_id,
    1158          308 :                             self.tenant_shard_id,
    1159          308 :                             key,
    1160          308 :                             if dup_end_lsn.is_valid() {
    1161              :                                 // this is a layer containing slice of values of the same key
    1162            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1163            0 :                                 dup_start_lsn..dup_end_lsn
    1164              :                             } else {
    1165          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1166          308 :                                 lsn_range.clone()
    1167              :                             },
    1168          308 :                             ctx,
    1169          308 :                         )
    1170          308 :                         .await
    1171          308 :                         .map_err(CompactionError::Other)?,
    1172              :                     );
    1173              : 
    1174          308 :                     keys = 0;
    1175      2063730 :                 }
    1176              : 
    1177      2064038 :                 writer
    1178      2064038 :                     .as_mut()
    1179      2064038 :                     .unwrap()
    1180      2064038 :                     .put_value(key, lsn, value, ctx)
    1181      2064038 :                     .await
    1182      2064038 :                     .map_err(CompactionError::Other)?;
    1183              :             } else {
    1184            0 :                 let shard = self.shard_identity.shard_index();
    1185            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1186            0 :                 if cfg!(debug_assertions) {
    1187            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    1188            0 :                 }
    1189            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1190              :             }
    1191              : 
    1192      2064038 :             if !new_layers.is_empty() {
    1193        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1194      2044252 :             }
    1195              : 
    1196      2064038 :             prev_key = Some(key);
    1197              :         }
    1198           28 :         if let Some(writer) = writer {
    1199           28 :             let (desc, path) = writer
    1200           28 :                 .finish(prev_key.unwrap().next(), ctx)
    1201           28 :                 .await
    1202           28 :                 .map_err(CompactionError::Other)?;
    1203           28 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1204           28 :                 .map_err(CompactionError::Other)?;
    1205           28 :             new_layers.push(new_delta);
    1206            0 :         }
    1207              : 
    1208              :         // Sync layers
    1209           28 :         if !new_layers.is_empty() {
    1210              :             // Print a warning if the created layer is larger than double the target size
    1211              :             // Add two pages for potential overhead. This should in theory be already
    1212              :             // accounted for in the target calculation, but for very small targets,
    1213              :             // we still might easily hit the limit otherwise.
    1214           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1215          308 :             for layer in new_layers.iter() {
    1216          308 :                 if layer.layer_desc().file_size > warn_limit {
    1217            0 :                     warn!(
    1218              :                         %layer,
    1219            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1220              :                     );
    1221          308 :                 }
    1222              :             }
    1223              : 
    1224              :             // The writer.finish() above already did the fsync of the inodes.
    1225              :             // We just need to fsync the directory in which these inodes are linked,
    1226              :             // which we know to be the timeline directory.
    1227              :             //
    1228              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1229              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1230              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1231           28 :             let timeline_dir = VirtualFile::open(
    1232           28 :                 &self
    1233           28 :                     .conf
    1234           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1235           28 :                 ctx,
    1236           28 :             )
    1237           28 :             .await
    1238           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1239           28 :             timeline_dir
    1240           28 :                 .sync_all()
    1241           28 :                 .await
    1242           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1243            0 :         }
    1244              : 
    1245           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1246           28 :         stats.new_deltas_count = Some(new_layers.len());
    1247          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1248           28 : 
    1249           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1250           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1251              :         {
    1252           28 :             Ok(stats_json) => {
    1253           28 :                 info!(
    1254            0 :                     stats_json = stats_json.as_str(),
    1255            0 :                     "compact_level0_phase1 stats available"
    1256              :                 )
    1257              :             }
    1258            0 :             Err(e) => {
    1259            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1260              :             }
    1261              :         }
    1262              : 
    1263              :         // Without this, rustc complains about deltas_to_compact still
    1264              :         // being borrowed when we `.into_iter()` below.
    1265           28 :         drop(all_values_iter);
    1266           28 : 
    1267           28 :         Ok(CompactLevel0Phase1Result {
    1268           28 :             new_layers,
    1269           28 :             deltas_to_compact: deltas_to_compact
    1270           28 :                 .into_iter()
    1271          402 :                 .map(|x| x.drop_eviction_guard())
    1272           28 :                 .collect::<Vec<_>>(),
    1273           28 :             fully_compacted,
    1274           28 :         })
    1275          364 :     }
    1276              : }
    1277              : 
    1278              : #[derive(Default)]
    1279              : struct CompactLevel0Phase1Result {
    1280              :     new_layers: Vec<ResidentLayer>,
    1281              :     deltas_to_compact: Vec<Layer>,
    1282              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1283              :     // L0 compaction size limit.
    1284              :     fully_compacted: bool,
    1285              : }
    1286              : 
    1287              : #[derive(Default)]
    1288              : struct CompactLevel0Phase1StatsBuilder {
    1289              :     version: Option<u64>,
    1290              :     tenant_id: Option<TenantShardId>,
    1291              :     timeline_id: Option<TimelineId>,
    1292              :     read_lock_acquisition_micros: DurationRecorder,
    1293              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1294              :     read_lock_held_key_sort_micros: DurationRecorder,
    1295              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1296              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1297              :     read_lock_drop_micros: DurationRecorder,
    1298              :     write_layer_files_micros: DurationRecorder,
    1299              :     level0_deltas_count: Option<usize>,
    1300              :     new_deltas_count: Option<usize>,
    1301              :     new_deltas_size: Option<u64>,
    1302              : }
    1303              : 
    1304              : #[derive(serde::Serialize)]
    1305              : struct CompactLevel0Phase1Stats {
    1306              :     version: u64,
    1307              :     tenant_id: TenantShardId,
    1308              :     timeline_id: TimelineId,
    1309              :     read_lock_acquisition_micros: RecordedDuration,
    1310              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1311              :     read_lock_held_key_sort_micros: RecordedDuration,
    1312              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1313              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1314              :     read_lock_drop_micros: RecordedDuration,
    1315              :     write_layer_files_micros: RecordedDuration,
    1316              :     level0_deltas_count: usize,
    1317              :     new_deltas_count: usize,
    1318              :     new_deltas_size: u64,
    1319              : }
    1320              : 
    1321              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1322              :     type Error = anyhow::Error;
    1323              : 
    1324           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1325           28 :         Ok(Self {
    1326           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1327           28 :             tenant_id: value
    1328           28 :                 .tenant_id
    1329           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1330           28 :             timeline_id: value
    1331           28 :                 .timeline_id
    1332           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1333           28 :             read_lock_acquisition_micros: value
    1334           28 :                 .read_lock_acquisition_micros
    1335           28 :                 .into_recorded()
    1336           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1337           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1338           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1339           28 :                 .into_recorded()
    1340           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1341           28 :             read_lock_held_key_sort_micros: value
    1342           28 :                 .read_lock_held_key_sort_micros
    1343           28 :                 .into_recorded()
    1344           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1345           28 :             read_lock_held_prerequisites_micros: value
    1346           28 :                 .read_lock_held_prerequisites_micros
    1347           28 :                 .into_recorded()
    1348           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1349           28 :             read_lock_held_compute_holes_micros: value
    1350           28 :                 .read_lock_held_compute_holes_micros
    1351           28 :                 .into_recorded()
    1352           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1353           28 :             read_lock_drop_micros: value
    1354           28 :                 .read_lock_drop_micros
    1355           28 :                 .into_recorded()
    1356           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1357           28 :             write_layer_files_micros: value
    1358           28 :                 .write_layer_files_micros
    1359           28 :                 .into_recorded()
    1360           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1361           28 :             level0_deltas_count: value
    1362           28 :                 .level0_deltas_count
    1363           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1364           28 :             new_deltas_count: value
    1365           28 :                 .new_deltas_count
    1366           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1367           28 :             new_deltas_size: value
    1368           28 :                 .new_deltas_size
    1369           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1370              :         })
    1371           28 :     }
    1372              : }
    1373              : 
    1374              : impl Timeline {
    1375              :     /// Entry point for new tiered compaction algorithm.
    1376              :     ///
    1377              :     /// All the real work is in the implementation in the pageserver_compaction
    1378              :     /// crate. The code here would apply to any algorithm implemented by the
    1379              :     /// same interface, but tiered is the only one at the moment.
    1380              :     ///
    1381              :     /// TODO: cancellation
    1382            0 :     pub(crate) async fn compact_tiered(
    1383            0 :         self: &Arc<Self>,
    1384            0 :         _cancel: &CancellationToken,
    1385            0 :         ctx: &RequestContext,
    1386            0 :     ) -> Result<(), CompactionError> {
    1387            0 :         let fanout = self.get_compaction_threshold() as u64;
    1388            0 :         let target_file_size = self.get_checkpoint_distance();
    1389              : 
    1390              :         // Find the top of the historical layers
    1391            0 :         let end_lsn = {
    1392            0 :             let guard = self.layers.read().await;
    1393            0 :             let layers = guard.layer_map()?;
    1394              : 
    1395            0 :             let l0_deltas = layers.level0_deltas();
    1396            0 : 
    1397            0 :             // As an optimization, if we find that there are too few L0 layers,
    1398            0 :             // bail out early. We know that the compaction algorithm would do
    1399            0 :             // nothing in that case.
    1400            0 :             if l0_deltas.len() < fanout as usize {
    1401              :                 // doesn't need compacting
    1402            0 :                 return Ok(());
    1403            0 :             }
    1404            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1405            0 :         };
    1406            0 : 
    1407            0 :         // Is the timeline being deleted?
    1408            0 :         if self.is_stopping() {
    1409            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1410            0 :             return Err(CompactionError::ShuttingDown);
    1411            0 :         }
    1412              : 
    1413            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1414              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1415            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1416            0 : 
    1417            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1418            0 :             &mut adaptor,
    1419            0 :             end_lsn,
    1420            0 :             target_file_size,
    1421            0 :             fanout,
    1422            0 :             ctx,
    1423            0 :         )
    1424            0 :         .await
    1425              :         // TODO: compact_tiered needs to return CompactionError
    1426            0 :         .map_err(CompactionError::Other)?;
    1427              : 
    1428            0 :         adaptor.flush_updates().await?;
    1429            0 :         Ok(())
    1430            0 :     }
    1431              : 
    1432              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1433              :     ///
    1434              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1435              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1436              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1437              :     ///
    1438              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1439              :     ///
    1440              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1441              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1442              :     ///
    1443              :     /// The function will produce:
    1444              :     ///
    1445              :     /// ```plain
    1446              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1447              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1448              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1449              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1450              :     /// ```
    1451              :     ///
    1452              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1453          538 :     pub(crate) async fn generate_key_retention(
    1454          538 :         self: &Arc<Timeline>,
    1455          538 :         key: Key,
    1456          538 :         full_history: &[(Key, Lsn, Value)],
    1457          538 :         horizon: Lsn,
    1458          538 :         retain_lsn_below_horizon: &[Lsn],
    1459          538 :         delta_threshold_cnt: usize,
    1460          538 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1461          538 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1462          538 :         // Pre-checks for the invariants
    1463          538 :         if cfg!(debug_assertions) {
    1464         1306 :             for (log_key, _, _) in full_history {
    1465          768 :                 assert_eq!(log_key, &key, "mismatched key");
    1466              :             }
    1467          538 :             for i in 1..full_history.len() {
    1468          230 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1469          230 :                 if full_history[i - 1].1 == full_history[i].1 {
    1470            0 :                     assert!(
    1471            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1472            0 :                         "unordered delta/image, or duplicated delta"
    1473              :                     );
    1474          230 :                 }
    1475              :             }
    1476              :             // There was an assertion for no base image that checks if the first
    1477              :             // record in the history is `will_init` before, but it was removed.
    1478              :             // This is explained in the test cases for generate_key_retention.
    1479              :             // Search "incomplete history" for more information.
    1480         1148 :             for lsn in retain_lsn_below_horizon {
    1481          610 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1482              :             }
    1483          538 :             for i in 1..retain_lsn_below_horizon.len() {
    1484          278 :                 assert!(
    1485          278 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1486            0 :                     "unordered LSN"
    1487              :                 );
    1488              :             }
    1489            0 :         }
    1490          538 :         let has_ancestor = base_img_from_ancestor.is_some();
    1491              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1492              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1493          538 :         let (mut split_history, lsn_split_points) = {
    1494          538 :             let mut split_history = Vec::new();
    1495          538 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1496          538 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1497         1148 :             for lsn in retain_lsn_below_horizon {
    1498          610 :                 lsn_split_points.push(*lsn);
    1499          610 :             }
    1500          538 :             lsn_split_points.push(horizon);
    1501          538 :             let mut current_idx = 0;
    1502         1306 :             for item @ (_, lsn, _) in full_history {
    1503          944 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1504          176 :                     current_idx += 1;
    1505          176 :                 }
    1506          768 :                 split_history[current_idx].push(item);
    1507              :             }
    1508          538 :             (split_history, lsn_split_points)
    1509              :         };
    1510              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1511         2224 :         for split_for_lsn in &mut split_history {
    1512         1686 :             let mut prev_lsn = None;
    1513         1686 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1514         1686 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1515          768 :                 if let Some(prev_lsn) = &prev_lsn {
    1516          106 :                     if *prev_lsn == lsn {
    1517              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1518              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1519              :                         // drop this delta and keep the image.
    1520              :                         //
    1521              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1522              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1523              :                         // dropped.
    1524              :                         //
    1525              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1526              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1527            0 :                         continue;
    1528          106 :                     }
    1529          662 :                 }
    1530          768 :                 prev_lsn = Some(lsn);
    1531          768 :                 new_split_for_lsn.push(record);
    1532              :             }
    1533         1686 :             *split_for_lsn = new_split_for_lsn;
    1534              :         }
    1535              :         // Step 3: generate images when necessary
    1536          538 :         let mut retention = Vec::with_capacity(split_history.len());
    1537          538 :         let mut records_since_last_image = 0;
    1538          538 :         let batch_cnt = split_history.len();
    1539          538 :         assert!(
    1540          538 :             batch_cnt >= 2,
    1541            0 :             "should have at least below + above horizon batches"
    1542              :         );
    1543          538 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1544          538 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1545           18 :             replay_history.push((key, lsn, Value::Image(img)));
    1546          520 :         }
    1547              : 
    1548              :         /// Generate debug information for the replay history
    1549            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1550              :             use std::fmt::Write;
    1551            0 :             let mut output = String::new();
    1552            0 :             if let Some((key, _, _)) = replay_history.first() {
    1553            0 :                 write!(output, "key={} ", key).unwrap();
    1554            0 :                 let mut cnt = 0;
    1555            0 :                 for (_, lsn, val) in replay_history {
    1556            0 :                     if val.is_image() {
    1557            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1558            0 :                     } else if val.will_init() {
    1559            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1560            0 :                     } else {
    1561            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1562            0 :                     }
    1563            0 :                     cnt += 1;
    1564            0 :                     if cnt >= 128 {
    1565            0 :                         write!(output, "... and more").unwrap();
    1566            0 :                         break;
    1567            0 :                     }
    1568              :                 }
    1569            0 :             } else {
    1570            0 :                 write!(output, "<no history>").unwrap();
    1571            0 :             }
    1572            0 :             output
    1573            0 :         }
    1574              : 
    1575            0 :         fn generate_debug_trace(
    1576            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1577            0 :             full_history: &[(Key, Lsn, Value)],
    1578            0 :             lsns: &[Lsn],
    1579            0 :             horizon: Lsn,
    1580            0 :         ) -> String {
    1581              :             use std::fmt::Write;
    1582            0 :             let mut output = String::new();
    1583            0 :             if let Some(replay_history) = replay_history {
    1584            0 :                 writeln!(
    1585            0 :                     output,
    1586            0 :                     "replay_history: {}",
    1587            0 :                     generate_history_trace(replay_history)
    1588            0 :                 )
    1589            0 :                 .unwrap();
    1590            0 :             } else {
    1591            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1592            0 :             }
    1593            0 :             writeln!(
    1594            0 :                 output,
    1595            0 :                 "full_history: {}",
    1596            0 :                 generate_history_trace(full_history)
    1597            0 :             )
    1598            0 :             .unwrap();
    1599            0 :             writeln!(
    1600            0 :                 output,
    1601            0 :                 "when processing: [{}] horizon={}",
    1602            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1603            0 :                 horizon
    1604            0 :             )
    1605            0 :             .unwrap();
    1606            0 :             output
    1607            0 :         }
    1608              : 
    1609         1686 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1610              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1611         1686 :             records_since_last_image += split_for_lsn.len();
    1612         1686 :             let generate_image = if i == 0 && !has_ancestor {
    1613              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1614          520 :                 true
    1615         1166 :             } else if i == batch_cnt - 1 {
    1616              :                 // Do not generate images for the last batch (above horizon)
    1617          538 :                 false
    1618          628 :             } else if records_since_last_image >= delta_threshold_cnt {
    1619              :                 // Generate images when there are too many records
    1620            6 :                 true
    1621              :             } else {
    1622          622 :                 false
    1623              :             };
    1624         1686 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1625              :             // Only retain the items after the last image record
    1626         2030 :             for idx in (0..replay_history.len()).rev() {
    1627         2030 :                 if replay_history[idx].2.will_init() {
    1628         1686 :                     replay_history = replay_history[idx..].to_vec();
    1629         1686 :                     break;
    1630          344 :                 }
    1631              :             }
    1632         1686 :             if let Some((_, _, val)) = replay_history.first() {
    1633         1686 :                 if !val.will_init() {
    1634            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1635            0 :                         || {
    1636            0 :                             generate_debug_trace(
    1637            0 :                                 Some(&replay_history),
    1638            0 :                                 full_history,
    1639            0 :                                 retain_lsn_below_horizon,
    1640            0 :                                 horizon,
    1641            0 :                             )
    1642            0 :                         },
    1643            0 :                     );
    1644         1686 :                 }
    1645            0 :             }
    1646         1686 :             if generate_image && records_since_last_image > 0 {
    1647          526 :                 records_since_last_image = 0;
    1648          526 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1649          526 :                     Some(replay_history.clone())
    1650              :                 } else {
    1651            0 :                     None
    1652              :                 };
    1653          526 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1654          526 :                 let history = std::mem::take(&mut replay_history);
    1655          526 :                 let mut img = None;
    1656          526 :                 let mut records = Vec::with_capacity(history.len());
    1657          526 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1658          504 :                     img = Some((*lsn, val.clone()));
    1659          504 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1660           34 :                         let Value::WalRecord(rec) = val else {
    1661            0 :                             return Err(anyhow::anyhow!(
    1662            0 :                                 "invalid record, first record is image, expect walrecords"
    1663            0 :                             ))
    1664            0 :                             .with_context(|| {
    1665            0 :                                 generate_debug_trace(
    1666            0 :                                     replay_history_for_debug_ref,
    1667            0 :                                     full_history,
    1668            0 :                                     retain_lsn_below_horizon,
    1669            0 :                                     horizon,
    1670            0 :                                 )
    1671            0 :                             });
    1672              :                         };
    1673           34 :                         records.push((lsn, rec));
    1674              :                     }
    1675              :                 } else {
    1676           36 :                     for (_, lsn, val) in history.into_iter() {
    1677           36 :                         let Value::WalRecord(rec) = val else {
    1678            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1679            0 :                                 .with_context(|| generate_debug_trace(
    1680            0 :                                     replay_history_for_debug_ref,
    1681            0 :                                     full_history,
    1682            0 :                                     retain_lsn_below_horizon,
    1683            0 :                                     horizon,
    1684            0 :                                 ));
    1685              :                         };
    1686           36 :                         records.push((lsn, rec));
    1687              :                     }
    1688              :                 }
    1689          526 :                 records.reverse();
    1690          526 :                 let state = ValueReconstructState { img, records };
    1691          526 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1692          526 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1693          526 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1694          526 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1695         1160 :             } else {
    1696         1160 :                 let deltas = split_for_lsn
    1697         1160 :                     .iter()
    1698         1160 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1699         1160 :                     .collect_vec();
    1700         1160 :                 retention.push(deltas);
    1701         1160 :             }
    1702              :         }
    1703          538 :         let mut result = Vec::with_capacity(retention.len());
    1704          538 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1705         1686 :         for (idx, logs) in retention.into_iter().enumerate() {
    1706         1686 :             if idx == lsn_split_points.len() {
    1707          538 :                 return Ok(KeyHistoryRetention {
    1708          538 :                     below_horizon: result,
    1709          538 :                     above_horizon: KeyLogAtLsn(logs),
    1710          538 :                 });
    1711         1148 :             } else {
    1712         1148 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1713         1148 :             }
    1714              :         }
    1715            0 :         unreachable!("key retention is empty")
    1716          538 :     }
    1717              : 
    1718              :     /// Check how much space is left on the disk
    1719           40 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    1720           40 :         let tenants_dir = self.conf.tenants_path();
    1721              : 
    1722           40 :         let stat = Statvfs::get(&tenants_dir, None)
    1723           40 :             .context("statvfs failed, presumably directory got unlinked")?;
    1724              : 
    1725           40 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    1726           40 : 
    1727           40 :         Ok(avail_bytes)
    1728           40 :     }
    1729              : 
    1730              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    1731              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    1732              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    1733              :     /// compaction.
    1734           40 :     async fn check_compaction_space(
    1735           40 :         self: &Arc<Self>,
    1736           40 :         layer_selection: &[Layer],
    1737           40 :     ) -> anyhow::Result<()> {
    1738           40 :         let available_space = self.check_available_space().await?;
    1739           40 :         let mut remote_layer_size = 0;
    1740           40 :         let mut all_layer_size = 0;
    1741          160 :         for layer in layer_selection {
    1742          120 :             let needs_download = layer.needs_download().await?;
    1743          120 :             if needs_download.is_some() {
    1744            0 :                 remote_layer_size += layer.layer_desc().file_size;
    1745          120 :             }
    1746          120 :             all_layer_size += layer.layer_desc().file_size;
    1747              :         }
    1748           40 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    1749           40 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    1750              :         {
    1751            0 :             return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    1752            0 :                 available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
    1753           40 :         }
    1754           40 :         Ok(())
    1755           40 :     }
    1756              : 
    1757              :     /// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
    1758              :     /// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
    1759              :     /// ad-hoc information about gc compaction itself.
    1760            0 :     pub(crate) async fn gc_compaction_split_jobs(
    1761            0 :         self: &Arc<Self>,
    1762            0 :         options: CompactOptions,
    1763            0 :     ) -> anyhow::Result<Vec<CompactOptions>> {
    1764            0 :         if !options.sub_compaction {
    1765            0 :             return Ok(vec![options]);
    1766            0 :         }
    1767            0 :         let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
    1768            0 :             start: Key::MIN,
    1769            0 :             end: Key::MAX,
    1770            0 :         });
    1771            0 :         let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
    1772            0 :             compact_below_lsn
    1773              :         } else {
    1774            0 :             *self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
    1775              :         };
    1776            0 :         let mut compact_jobs = Vec::new();
    1777              :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    1778              :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    1779            0 :         let Ok(partition) = self.partitioning.try_lock() else {
    1780            0 :             bail!("failed to acquire partition lock");
    1781              :         };
    1782            0 :         let ((dense_ks, sparse_ks), _) = &*partition;
    1783              :         // Truncate the key range to be within user specified compaction range.
    1784            0 :         fn truncate_to(
    1785            0 :             source_start: &Key,
    1786            0 :             source_end: &Key,
    1787            0 :             target_start: &Key,
    1788            0 :             target_end: &Key,
    1789            0 :         ) -> Option<(Key, Key)> {
    1790            0 :             let start = source_start.max(target_start);
    1791            0 :             let end = source_end.min(target_end);
    1792            0 :             if start < end {
    1793            0 :                 Some((*start, *end))
    1794              :             } else {
    1795            0 :                 None
    1796              :             }
    1797            0 :         }
    1798            0 :         let mut split_key_ranges = Vec::new();
    1799            0 :         let ranges = dense_ks
    1800            0 :             .parts
    1801            0 :             .iter()
    1802            0 :             .map(|partition| partition.ranges.iter())
    1803            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    1804            0 :             .flatten()
    1805            0 :             .cloned()
    1806            0 :             .collect_vec();
    1807            0 :         for range in ranges.iter() {
    1808            0 :             let Some((start, end)) = truncate_to(
    1809            0 :                 &range.start,
    1810            0 :                 &range.end,
    1811            0 :                 &compact_range.start,
    1812            0 :                 &compact_range.end,
    1813            0 :             ) else {
    1814            0 :                 continue;
    1815              :             };
    1816            0 :             split_key_ranges.push((start, end));
    1817              :         }
    1818            0 :         split_key_ranges.sort();
    1819            0 :         let guard = self.layers.read().await;
    1820            0 :         let layer_map = guard.layer_map()?;
    1821            0 :         let mut current_start = None;
    1822              :         // Split compaction job to about 2GB each
    1823              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
    1824            0 :         let ranges_num = split_key_ranges.len();
    1825            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    1826            0 :             if current_start.is_none() {
    1827            0 :                 current_start = Some(start);
    1828            0 :             }
    1829            0 :             let start = current_start.unwrap();
    1830            0 :             if start >= end {
    1831              :                 // We have already processed this partition.
    1832            0 :                 continue;
    1833            0 :             }
    1834            0 :             let res = layer_map.range_search(start..end, compact_below_lsn);
    1835            0 :             let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
    1836            0 :             if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
    1837            0 :                 let mut compact_options = options.clone();
    1838            0 :                 // Try to extend the compaction range so that we include at least one full layer file.
    1839            0 :                 let extended_end = res
    1840            0 :                     .found
    1841            0 :                     .keys()
    1842            0 :                     .map(|layer| layer.layer.key_range.end)
    1843            0 :                     .min();
    1844              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    1845              :                 // In this case, we simply use the specified key range end.
    1846            0 :                 let end = if let Some(extended_end) = extended_end {
    1847            0 :                     extended_end.max(end)
    1848              :                 } else {
    1849            0 :                     end
    1850              :                 };
    1851            0 :                 info!(
    1852            0 :                     "splitting compaction job: {}..{}, estimated_size={}",
    1853              :                     start, end, total_size
    1854              :                 );
    1855            0 :                 compact_options.compact_range = Some(CompactRange { start, end });
    1856            0 :                 compact_options.compact_below_lsn = Some(compact_below_lsn);
    1857            0 :                 compact_options.sub_compaction = false;
    1858            0 :                 compact_jobs.push(compact_options);
    1859            0 :                 current_start = Some(end);
    1860            0 :             }
    1861              :         }
    1862            0 :         drop(guard);
    1863            0 :         Ok(compact_jobs)
    1864            0 :     }
    1865              : 
    1866              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1867              :     ///
    1868              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1869              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1870              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1871              :     /// and create delta layers with all deltas >= gc horizon.
    1872              :     ///
    1873              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    1874              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    1875              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    1876              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    1877              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    1878              :     /// part of the range.
    1879              :     ///
    1880              :     /// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with
    1881              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    1882           40 :     pub(crate) async fn compact_with_gc(
    1883           40 :         self: &Arc<Self>,
    1884           40 :         cancel: &CancellationToken,
    1885           40 :         options: CompactOptions,
    1886           40 :         ctx: &RequestContext,
    1887           40 :     ) -> anyhow::Result<()> {
    1888           40 :         if options.sub_compaction {
    1889            0 :             info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
    1890            0 :             let jobs = self.gc_compaction_split_jobs(options).await?;
    1891            0 :             let jobs_len = jobs.len();
    1892            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    1893            0 :                 info!(
    1894            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    1895            0 :                     idx + 1,
    1896              :                     jobs_len
    1897              :                 );
    1898            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    1899              :             }
    1900            0 :             if jobs_len == 0 {
    1901            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    1902            0 :             }
    1903            0 :             return Ok(());
    1904           40 :         }
    1905           40 :         self.compact_with_gc_inner(cancel, options, ctx).await
    1906           40 :     }
    1907              : 
    1908           40 :     async fn compact_with_gc_inner(
    1909           40 :         self: &Arc<Self>,
    1910           40 :         cancel: &CancellationToken,
    1911           40 :         options: CompactOptions,
    1912           40 :         ctx: &RequestContext,
    1913           40 :     ) -> anyhow::Result<()> {
    1914           40 :         assert!(
    1915           40 :             !options.sub_compaction,
    1916            0 :             "sub-compaction should be handled by the outer function"
    1917              :         );
    1918              :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1919              :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1920              :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1921              : 
    1922           40 :         let gc_lock = async {
    1923           40 :             tokio::select! {
    1924           40 :                 guard = self.gc_lock.lock() => Ok(guard),
    1925              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1926           40 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1927              :             }
    1928           40 :         };
    1929              : 
    1930           40 :         let gc_lock = crate::timed(
    1931           40 :             gc_lock,
    1932           40 :             "acquires gc lock",
    1933           40 :             std::time::Duration::from_secs(5),
    1934           40 :         )
    1935           40 :         .await?;
    1936              : 
    1937           40 :         let flags = options.flags;
    1938           40 :         let compaction_key_range = options
    1939           40 :             .compact_range
    1940           40 :             .map(|range| range.start..range.end)
    1941           40 :             .unwrap_or_else(|| Key::MIN..Key::MAX);
    1942           40 : 
    1943           40 :         let dry_run = flags.contains(CompactFlags::DryRun);
    1944           40 : 
    1945           40 :         if compaction_key_range == (Key::MIN..Key::MAX) {
    1946           30 :             info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end);
    1947              :         } else {
    1948           10 :             info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
    1949              :         }
    1950              : 
    1951           40 :         scopeguard::defer! {
    1952           40 :             info!("done enhanced gc bottom-most compaction");
    1953           40 :         };
    1954           40 : 
    1955           40 :         let mut stat = CompactionStatistics::default();
    1956              : 
    1957              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1958              :         // The layer selection has the following properties:
    1959              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1960              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1961           40 :         let job_desc = {
    1962           40 :             let guard = self.layers.read().await;
    1963           40 :             let layers = guard.layer_map()?;
    1964           40 :             let gc_info = self.gc_info.read().unwrap();
    1965           40 :             let mut retain_lsns_below_horizon = Vec::new();
    1966           40 :             let gc_cutoff = {
    1967              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    1968              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    1969              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    1970              :                 // to get the truth data.
    1971           40 :                 let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    1972           40 :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    1973           40 :                 // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
    1974           40 :                 // the real cutoff.
    1975           40 :                 let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff);
    1976           40 :                 if gc_cutoff > real_gc_cutoff {
    1977            0 :                     warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
    1978            0 :                     gc_cutoff = real_gc_cutoff;
    1979           40 :                 }
    1980           40 :                 gc_cutoff
    1981              :             };
    1982           44 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    1983           44 :                 if lsn < &gc_cutoff {
    1984           44 :                     retain_lsns_below_horizon.push(*lsn);
    1985           44 :                 }
    1986              :             }
    1987           40 :             for lsn in gc_info.leases.keys() {
    1988            0 :                 if lsn < &gc_cutoff {
    1989            0 :                     retain_lsns_below_horizon.push(*lsn);
    1990            0 :                 }
    1991              :             }
    1992           40 :             let mut selected_layers: Vec<Layer> = Vec::new();
    1993           40 :             drop(gc_info);
    1994              :             // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    1995           40 :             let Some(max_layer_lsn) = layers
    1996           40 :                 .iter_historic_layers()
    1997          182 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    1998          150 :                 .map(|desc| desc.get_lsn_range().end)
    1999           40 :                 .max()
    2000              :             else {
    2001            0 :                 info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
    2002            0 :                 return Ok(());
    2003              :             };
    2004              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2005              :             // layers to compact.
    2006           40 :             let mut rewrite_layers = Vec::new();
    2007          182 :             for desc in layers.iter_historic_layers() {
    2008          182 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2009          150 :                     && overlaps_with(&desc.get_key_range(), &compaction_key_range)
    2010              :                 {
    2011              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2012              :                     // even if it might contain extra keys
    2013          120 :                     selected_layers.push(guard.get_from_desc(&desc));
    2014          120 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2015          120 :                     // to overlap image layers)
    2016          120 :                     if desc.is_delta()
    2017           62 :                         && !fully_contains(&compaction_key_range, &desc.get_key_range())
    2018            2 :                     {
    2019            2 :                         rewrite_layers.push(desc);
    2020          118 :                     }
    2021           62 :                 }
    2022              :             }
    2023           40 :             if selected_layers.is_empty() {
    2024            0 :                 info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end);
    2025            0 :                 return Ok(());
    2026           40 :             }
    2027           40 :             retain_lsns_below_horizon.sort();
    2028           40 :             GcCompactionJobDescription {
    2029           40 :                 selected_layers,
    2030           40 :                 gc_cutoff,
    2031           40 :                 retain_lsns_below_horizon,
    2032           40 :                 max_layer_lsn,
    2033           40 :                 compaction_key_range,
    2034           40 :                 rewrite_layers,
    2035           40 :             }
    2036              :         };
    2037           40 :         let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
    2038            2 :             Lsn(self.ancestor_lsn.0 + 1)
    2039              :         } else {
    2040           38 :             let res = job_desc
    2041           38 :                 .retain_lsns_below_horizon
    2042           38 :                 .first()
    2043           38 :                 .copied()
    2044           38 :                 .unwrap_or(job_desc.gc_cutoff);
    2045           38 :             if cfg!(debug_assertions) {
    2046           38 :                 assert_eq!(
    2047           38 :                     res,
    2048           38 :                     job_desc
    2049           38 :                         .retain_lsns_below_horizon
    2050           38 :                         .iter()
    2051           38 :                         .min()
    2052           38 :                         .copied()
    2053           38 :                         .unwrap_or(job_desc.gc_cutoff)
    2054           38 :                 );
    2055            0 :             }
    2056           38 :             res
    2057              :         };
    2058           40 :         info!(
    2059            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}",
    2060            0 :             job_desc.selected_layers.len(),
    2061            0 :             job_desc.rewrite_layers.len(),
    2062              :             job_desc.max_layer_lsn,
    2063              :             job_desc.gc_cutoff,
    2064              :             lowest_retain_lsn,
    2065              :             job_desc.compaction_key_range.start,
    2066              :             job_desc.compaction_key_range.end
    2067              :         );
    2068              : 
    2069          160 :         for layer in &job_desc.selected_layers {
    2070          120 :             debug!("read layer: {}", layer.layer_desc().key());
    2071              :         }
    2072           42 :         for layer in &job_desc.rewrite_layers {
    2073            2 :             debug!("rewrite layer: {}", layer.key());
    2074              :         }
    2075              : 
    2076           40 :         self.check_compaction_space(&job_desc.selected_layers)
    2077           40 :             .await?;
    2078              : 
    2079              :         // Generate statistics for the compaction
    2080          160 :         for layer in &job_desc.selected_layers {
    2081          120 :             let desc = layer.layer_desc();
    2082          120 :             if desc.is_delta() {
    2083           62 :                 stat.visit_delta_layer(desc.file_size());
    2084           62 :             } else {
    2085           58 :                 stat.visit_image_layer(desc.file_size());
    2086           58 :             }
    2087              :         }
    2088              : 
    2089              :         // Step 1: construct a k-merge iterator over all layers.
    2090              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2091              :         // disable the check for now because we need to adjust the check for partial compactions, will enable later.
    2092              :         // let layer_names = job_desc
    2093              :         //     .selected_layers
    2094              :         //     .iter()
    2095              :         //     .map(|layer| layer.layer_desc().layer_name())
    2096              :         //     .collect_vec();
    2097              :         // if let Some(err) = check_valid_layermap(&layer_names) {
    2098              :         //     warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
    2099              :         // }
    2100              :         // The maximum LSN we are processing in this compaction loop
    2101           40 :         let end_lsn = job_desc
    2102           40 :             .selected_layers
    2103           40 :             .iter()
    2104          120 :             .map(|l| l.layer_desc().lsn_range.end)
    2105           40 :             .max()
    2106           40 :             .unwrap();
    2107           40 :         let mut delta_layers = Vec::new();
    2108           40 :         let mut image_layers = Vec::new();
    2109           40 :         let mut downloaded_layers = Vec::new();
    2110          160 :         for layer in &job_desc.selected_layers {
    2111          120 :             let resident_layer = layer.download_and_keep_resident().await?;
    2112          120 :             downloaded_layers.push(resident_layer);
    2113              :         }
    2114          160 :         for resident_layer in &downloaded_layers {
    2115          120 :             if resident_layer.layer_desc().is_delta() {
    2116           62 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2117           62 :                 delta_layers.push(layer);
    2118              :             } else {
    2119           58 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2120           58 :                 image_layers.push(layer);
    2121              :             }
    2122              :         }
    2123           40 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2124           40 :         let mut merge_iter = FilterIterator::create(
    2125           40 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2126           40 :             dense_ks,
    2127           40 :             sparse_ks,
    2128           40 :         )?;
    2129              : 
    2130              :         // Step 2: Produce images+deltas.
    2131           40 :         let mut accumulated_values = Vec::new();
    2132           40 :         let mut last_key: Option<Key> = None;
    2133              : 
    2134              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2135              :         // when some condition meet.
    2136           40 :         let mut image_layer_writer = if self.ancestor_timeline.is_none() {
    2137              :             Some(
    2138           38 :                 SplitImageLayerWriter::new(
    2139           38 :                     self.conf,
    2140           38 :                     self.timeline_id,
    2141           38 :                     self.tenant_shard_id,
    2142           38 :                     job_desc.compaction_key_range.start,
    2143           38 :                     lowest_retain_lsn,
    2144           38 :                     self.get_compaction_target_size(),
    2145           38 :                     ctx,
    2146           38 :                 )
    2147           38 :                 .await?,
    2148              :             )
    2149              :         } else {
    2150            2 :             None
    2151              :         };
    2152              : 
    2153           40 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2154           40 :             self.conf,
    2155           40 :             self.timeline_id,
    2156           40 :             self.tenant_shard_id,
    2157           40 :             lowest_retain_lsn..end_lsn,
    2158           40 :             self.get_compaction_target_size(),
    2159           40 :         )
    2160           40 :         .await?;
    2161              : 
    2162              :         #[derive(Default)]
    2163              :         struct RewritingLayers {
    2164              :             before: Option<DeltaLayerWriter>,
    2165              :             after: Option<DeltaLayerWriter>,
    2166              :         }
    2167           40 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2168              : 
    2169              :         /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
    2170              :         ///
    2171              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2172              :         /// is needed for reconstruction. This should be fixed in the future.
    2173              :         ///
    2174              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2175              :         /// images.
    2176          530 :         async fn get_ancestor_image(
    2177          530 :             tline: &Arc<Timeline>,
    2178          530 :             key: Key,
    2179          530 :             ctx: &RequestContext,
    2180          530 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2181          530 :             if tline.ancestor_timeline.is_none() {
    2182          516 :                 return Ok(None);
    2183           14 :             };
    2184              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2185              :             // as much existing code as possible.
    2186           14 :             let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
    2187           14 :             Ok(Some((key, tline.ancestor_lsn, img)))
    2188          530 :         }
    2189              : 
    2190              :         // Actually, we can decide not to write to the image layer at all at this point because
    2191              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2192              :         // create this writer, and discard the writer in the end.
    2193              : 
    2194          822 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2195          782 :             if cancel.is_cancelled() {
    2196            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2197          782 :             }
    2198          782 :             if self.shard_identity.is_key_disposable(&key) {
    2199              :                 // If this shard does not need to store this key, simply skip it.
    2200              :                 //
    2201              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2202              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2203              :                 // a whole layer file as handling key spaces (ranges).
    2204            0 :                 if cfg!(debug_assertions) {
    2205            0 :                     let shard = self.shard_identity.shard_index();
    2206            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2207            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2208            0 :                 }
    2209            0 :                 continue;
    2210          782 :             }
    2211          782 :             if !job_desc.compaction_key_range.contains(&key) {
    2212           64 :                 if !desc.is_delta {
    2213           60 :                     continue;
    2214            4 :                 }
    2215            4 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2216            4 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2217            0 :                     if rewriter.before.is_none() {
    2218            0 :                         rewriter.before = Some(
    2219            0 :                             DeltaLayerWriter::new(
    2220            0 :                                 self.conf,
    2221            0 :                                 self.timeline_id,
    2222            0 :                                 self.tenant_shard_id,
    2223            0 :                                 desc.key_range.start,
    2224            0 :                                 desc.lsn_range.clone(),
    2225            0 :                                 ctx,
    2226            0 :                             )
    2227            0 :                             .await?,
    2228              :                         );
    2229            0 :                     }
    2230            0 :                     rewriter.before.as_mut().unwrap()
    2231            4 :                 } else if key >= job_desc.compaction_key_range.end {
    2232            4 :                     if rewriter.after.is_none() {
    2233            2 :                         rewriter.after = Some(
    2234            2 :                             DeltaLayerWriter::new(
    2235            2 :                                 self.conf,
    2236            2 :                                 self.timeline_id,
    2237            2 :                                 self.tenant_shard_id,
    2238            2 :                                 job_desc.compaction_key_range.end,
    2239            2 :                                 desc.lsn_range.clone(),
    2240            2 :                                 ctx,
    2241            2 :                             )
    2242            2 :                             .await?,
    2243              :                         );
    2244            2 :                     }
    2245            4 :                     rewriter.after.as_mut().unwrap()
    2246              :                 } else {
    2247            0 :                     unreachable!()
    2248              :                 };
    2249            4 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2250            4 :                 continue;
    2251          718 :             }
    2252          718 :             match val {
    2253          542 :                 Value::Image(_) => stat.visit_image_key(&val),
    2254          176 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2255              :             }
    2256          718 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2257          228 :                 if last_key.is_none() {
    2258           40 :                     last_key = Some(key);
    2259          188 :                 }
    2260          228 :                 accumulated_values.push((key, lsn, val));
    2261              :             } else {
    2262          490 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2263          490 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    2264          490 :                 let retention = self
    2265          490 :                     .generate_key_retention(
    2266          490 :                         *last_key,
    2267          490 :                         &accumulated_values,
    2268          490 :                         job_desc.gc_cutoff,
    2269          490 :                         &job_desc.retain_lsns_below_horizon,
    2270          490 :                         COMPACTION_DELTA_THRESHOLD,
    2271          490 :                         get_ancestor_image(self, *last_key, ctx).await?,
    2272              :                     )
    2273          490 :                     .await?;
    2274          490 :                 retention
    2275          490 :                     .pipe_to(
    2276          490 :                         *last_key,
    2277          490 :                         &mut delta_layer_writer,
    2278          490 :                         image_layer_writer.as_mut(),
    2279          490 :                         &mut stat,
    2280          490 :                         ctx,
    2281          490 :                     )
    2282          490 :                     .await?;
    2283          490 :                 accumulated_values.clear();
    2284          490 :                 *last_key = key;
    2285          490 :                 accumulated_values.push((key, lsn, val));
    2286              :             }
    2287              :         }
    2288              : 
    2289              :         // TODO: move the below part to the loop body
    2290           40 :         let last_key = last_key.expect("no keys produced during compaction");
    2291           40 :         stat.on_unique_key_visited();
    2292              : 
    2293           40 :         let retention = self
    2294           40 :             .generate_key_retention(
    2295           40 :                 last_key,
    2296           40 :                 &accumulated_values,
    2297           40 :                 job_desc.gc_cutoff,
    2298           40 :                 &job_desc.retain_lsns_below_horizon,
    2299           40 :                 COMPACTION_DELTA_THRESHOLD,
    2300           40 :                 get_ancestor_image(self, last_key, ctx).await?,
    2301              :             )
    2302           40 :             .await?;
    2303           40 :         retention
    2304           40 :             .pipe_to(
    2305           40 :                 last_key,
    2306           40 :                 &mut delta_layer_writer,
    2307           40 :                 image_layer_writer.as_mut(),
    2308           40 :                 &mut stat,
    2309           40 :                 ctx,
    2310           40 :             )
    2311           40 :             .await?;
    2312              :         // end: move the above part to the loop body
    2313              : 
    2314           40 :         let mut rewrote_delta_layers = Vec::new();
    2315           42 :         for (key, writers) in delta_layer_rewriters {
    2316            2 :             if let Some(delta_writer_before) = writers.before {
    2317            0 :                 let (desc, path) = delta_writer_before
    2318            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    2319            0 :                     .await?;
    2320            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2321            0 :                 rewrote_delta_layers.push(layer);
    2322            2 :             }
    2323            2 :             if let Some(delta_writer_after) = writers.after {
    2324            2 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    2325            2 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2326            2 :                 rewrote_delta_layers.push(layer);
    2327            0 :             }
    2328              :         }
    2329              : 
    2330           58 :         let discard = |key: &PersistentLayerKey| {
    2331           58 :             let key = key.clone();
    2332           58 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    2333           58 :         };
    2334              : 
    2335           40 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    2336           38 :             if !dry_run {
    2337           34 :                 let end_key = job_desc.compaction_key_range.end;
    2338           34 :                 writer
    2339           34 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    2340           34 :                     .await?
    2341              :             } else {
    2342            4 :                 drop(writer);
    2343            4 :                 Vec::new()
    2344              :             }
    2345              :         } else {
    2346            2 :             Vec::new()
    2347              :         };
    2348              : 
    2349           40 :         let produced_delta_layers = if !dry_run {
    2350           36 :             delta_layer_writer
    2351           36 :                 .finish_with_discard_fn(self, ctx, discard)
    2352           36 :                 .await?
    2353              :         } else {
    2354            4 :             drop(delta_layer_writer);
    2355            4 :             Vec::new()
    2356              :         };
    2357              : 
    2358              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    2359              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    2360           40 :         let mut compact_to = Vec::new();
    2361           40 :         let mut keep_layers = HashSet::new();
    2362           40 :         let produced_delta_layers_len = produced_delta_layers.len();
    2363           40 :         let produced_image_layers_len = produced_image_layers.len();
    2364           64 :         for action in produced_delta_layers {
    2365           24 :             match action {
    2366           12 :                 BatchWriterResult::Produced(layer) => {
    2367           12 :                     if cfg!(debug_assertions) {
    2368           12 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    2369            0 :                     }
    2370           12 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    2371           12 :                     compact_to.push(layer);
    2372              :                 }
    2373           12 :                 BatchWriterResult::Discarded(l) => {
    2374           12 :                     if cfg!(debug_assertions) {
    2375           12 :                         info!("discarded delta layer: {}", l);
    2376            0 :                     }
    2377           12 :                     keep_layers.insert(l);
    2378           12 :                     stat.discard_delta_layer();
    2379              :                 }
    2380              :             }
    2381              :         }
    2382           42 :         for layer in &rewrote_delta_layers {
    2383            2 :             debug!(
    2384            0 :                 "produced rewritten delta layer: {}",
    2385            0 :                 layer.layer_desc().key()
    2386              :             );
    2387              :         }
    2388           40 :         compact_to.extend(rewrote_delta_layers);
    2389           74 :         for action in produced_image_layers {
    2390           34 :             match action {
    2391           26 :                 BatchWriterResult::Produced(layer) => {
    2392           26 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    2393           26 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2394           26 :                     compact_to.push(layer);
    2395              :                 }
    2396            8 :                 BatchWriterResult::Discarded(l) => {
    2397            8 :                     debug!("discarded image layer: {}", l);
    2398            8 :                     keep_layers.insert(l);
    2399            8 :                     stat.discard_image_layer();
    2400              :                 }
    2401              :             }
    2402              :         }
    2403              : 
    2404           40 :         let mut layer_selection = job_desc.selected_layers;
    2405              : 
    2406              :         // Partial compaction might select more data than it processes, e.g., if
    2407              :         // the compaction_key_range only partially overlaps:
    2408              :         //
    2409              :         //         [---compaction_key_range---]
    2410              :         //   [---A----][----B----][----C----][----D----]
    2411              :         //
    2412              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    2413              :         // the compaction key range, so we can always discard them. However, for image
    2414              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    2415              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    2416              :         //
    2417              :         // The created image layers contain whatever is needed from B, C, and from
    2418              :         // `----]` of A, and from  `[---` of D.
    2419              :         //
    2420              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    2421              :         // keep that data.
    2422              :         //
    2423              :         // The solution for now is to keep A and D completely if they are image layers.
    2424              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    2425              :         // is _not_ fully covered by compaction_key_range).
    2426          160 :         for layer in &layer_selection {
    2427          120 :             if !layer.layer_desc().is_delta() {
    2428           58 :                 if !overlaps_with(
    2429           58 :                     &layer.layer_desc().key_range,
    2430           58 :                     &job_desc.compaction_key_range,
    2431           58 :                 ) {
    2432            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    2433           58 :                 }
    2434           58 :                 if !fully_contains(
    2435           58 :                     &job_desc.compaction_key_range,
    2436           58 :                     &layer.layer_desc().key_range,
    2437           58 :                 ) {
    2438            8 :                     keep_layers.insert(layer.layer_desc().key());
    2439           50 :                 }
    2440           62 :             }
    2441              :         }
    2442              : 
    2443          120 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2444           40 : 
    2445           40 :         info!(
    2446            0 :             "gc-compaction statistics: {}",
    2447            0 :             serde_json::to_string(&stat)?
    2448              :         );
    2449              : 
    2450           40 :         if dry_run {
    2451            4 :             return Ok(());
    2452           36 :         }
    2453           36 : 
    2454           36 :         info!(
    2455            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2456            0 :             produced_delta_layers_len,
    2457            0 :             produced_image_layers_len,
    2458            0 :             layer_selection.len()
    2459              :         );
    2460              : 
    2461              :         // Step 3: Place back to the layer map.
    2462              :         {
    2463              :             // TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
    2464           36 :             let mut guard = self.layers.write().await;
    2465           36 :             guard
    2466           36 :                 .open_mut()?
    2467           36 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2468           36 :         };
    2469           36 :         self.remote_client
    2470           36 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2471              : 
    2472           36 :         drop(gc_lock);
    2473           36 : 
    2474           36 :         Ok(())
    2475           40 :     }
    2476              : }
    2477              : 
    2478              : struct TimelineAdaptor {
    2479              :     timeline: Arc<Timeline>,
    2480              : 
    2481              :     keyspace: (Lsn, KeySpace),
    2482              : 
    2483              :     new_deltas: Vec<ResidentLayer>,
    2484              :     new_images: Vec<ResidentLayer>,
    2485              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2486              : }
    2487              : 
    2488              : impl TimelineAdaptor {
    2489            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2490            0 :         Self {
    2491            0 :             timeline: timeline.clone(),
    2492            0 :             keyspace,
    2493            0 :             new_images: Vec::new(),
    2494            0 :             new_deltas: Vec::new(),
    2495            0 :             layers_to_delete: Vec::new(),
    2496            0 :         }
    2497            0 :     }
    2498              : 
    2499            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2500            0 :         let layers_to_delete = {
    2501            0 :             let guard = self.timeline.layers.read().await;
    2502            0 :             self.layers_to_delete
    2503            0 :                 .iter()
    2504            0 :                 .map(|x| guard.get_from_desc(x))
    2505            0 :                 .collect::<Vec<Layer>>()
    2506            0 :         };
    2507            0 :         self.timeline
    2508            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2509            0 :             .await?;
    2510              : 
    2511            0 :         self.timeline
    2512            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2513              : 
    2514            0 :         self.new_deltas.clear();
    2515            0 :         self.layers_to_delete.clear();
    2516            0 :         Ok(())
    2517            0 :     }
    2518              : }
    2519              : 
    2520              : #[derive(Clone)]
    2521              : struct ResidentDeltaLayer(ResidentLayer);
    2522              : #[derive(Clone)]
    2523              : struct ResidentImageLayer(ResidentLayer);
    2524              : 
    2525              : impl CompactionJobExecutor for TimelineAdaptor {
    2526              :     type Key = pageserver_api::key::Key;
    2527              : 
    2528              :     type Layer = OwnArc<PersistentLayerDesc>;
    2529              :     type DeltaLayer = ResidentDeltaLayer;
    2530              :     type ImageLayer = ResidentImageLayer;
    2531              : 
    2532              :     type RequestContext = crate::context::RequestContext;
    2533              : 
    2534            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2535            0 :         self.timeline.get_shard_identity()
    2536            0 :     }
    2537              : 
    2538            0 :     async fn get_layers(
    2539            0 :         &mut self,
    2540            0 :         key_range: &Range<Key>,
    2541            0 :         lsn_range: &Range<Lsn>,
    2542            0 :         _ctx: &RequestContext,
    2543            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2544            0 :         self.flush_updates().await?;
    2545              : 
    2546            0 :         let guard = self.timeline.layers.read().await;
    2547            0 :         let layer_map = guard.layer_map()?;
    2548              : 
    2549            0 :         let result = layer_map
    2550            0 :             .iter_historic_layers()
    2551            0 :             .filter(|l| {
    2552            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2553            0 :             })
    2554            0 :             .map(OwnArc)
    2555            0 :             .collect();
    2556            0 :         Ok(result)
    2557            0 :     }
    2558              : 
    2559            0 :     async fn get_keyspace(
    2560            0 :         &mut self,
    2561            0 :         key_range: &Range<Key>,
    2562            0 :         lsn: Lsn,
    2563            0 :         _ctx: &RequestContext,
    2564            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2565            0 :         if lsn == self.keyspace.0 {
    2566            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2567            0 :                 &self.keyspace.1.ranges,
    2568            0 :                 key_range,
    2569            0 :             ))
    2570              :         } else {
    2571              :             // The current compaction implementation only ever requests the key space
    2572              :             // at the compaction end LSN.
    2573            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2574              :         }
    2575            0 :     }
    2576              : 
    2577            0 :     async fn downcast_delta_layer(
    2578            0 :         &self,
    2579            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2580            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2581            0 :         // this is a lot more complex than a simple downcast...
    2582            0 :         if layer.is_delta() {
    2583            0 :             let l = {
    2584            0 :                 let guard = self.timeline.layers.read().await;
    2585            0 :                 guard.get_from_desc(layer)
    2586              :             };
    2587            0 :             let result = l.download_and_keep_resident().await?;
    2588              : 
    2589            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2590              :         } else {
    2591            0 :             Ok(None)
    2592              :         }
    2593            0 :     }
    2594              : 
    2595            0 :     async fn create_image(
    2596            0 :         &mut self,
    2597            0 :         lsn: Lsn,
    2598            0 :         key_range: &Range<Key>,
    2599            0 :         ctx: &RequestContext,
    2600            0 :     ) -> anyhow::Result<()> {
    2601            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2602            0 :     }
    2603              : 
    2604            0 :     async fn create_delta(
    2605            0 :         &mut self,
    2606            0 :         lsn_range: &Range<Lsn>,
    2607            0 :         key_range: &Range<Key>,
    2608            0 :         input_layers: &[ResidentDeltaLayer],
    2609            0 :         ctx: &RequestContext,
    2610            0 :     ) -> anyhow::Result<()> {
    2611            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2612              : 
    2613            0 :         let mut all_entries = Vec::new();
    2614            0 :         for dl in input_layers.iter() {
    2615            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2616              :         }
    2617              : 
    2618              :         // The current stdlib sorting implementation is designed in a way where it is
    2619              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2620            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2621              : 
    2622            0 :         let mut writer = DeltaLayerWriter::new(
    2623            0 :             self.timeline.conf,
    2624            0 :             self.timeline.timeline_id,
    2625            0 :             self.timeline.tenant_shard_id,
    2626            0 :             key_range.start,
    2627            0 :             lsn_range.clone(),
    2628            0 :             ctx,
    2629            0 :         )
    2630            0 :         .await?;
    2631              : 
    2632            0 :         let mut dup_values = 0;
    2633            0 : 
    2634            0 :         // This iterator walks through all key-value pairs from all the layers
    2635            0 :         // we're compacting, in key, LSN order.
    2636            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2637              :         for &DeltaEntry {
    2638            0 :             key, lsn, ref val, ..
    2639            0 :         } in all_entries.iter()
    2640              :         {
    2641            0 :             if prev == Some((key, lsn)) {
    2642              :                 // This is a duplicate. Skip it.
    2643              :                 //
    2644              :                 // It can happen if compaction is interrupted after writing some
    2645              :                 // layers but not all, and we are compacting the range again.
    2646              :                 // The calculations in the algorithm assume that there are no
    2647              :                 // duplicates, so the math on targeted file size is likely off,
    2648              :                 // and we will create smaller files than expected.
    2649            0 :                 dup_values += 1;
    2650            0 :                 continue;
    2651            0 :             }
    2652              : 
    2653            0 :             let value = val.load(ctx).await?;
    2654              : 
    2655            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2656              : 
    2657            0 :             prev = Some((key, lsn));
    2658              :         }
    2659              : 
    2660            0 :         if dup_values > 0 {
    2661            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2662            0 :         }
    2663              : 
    2664            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2665            0 :             Err(anyhow::anyhow!(
    2666            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2667            0 :             ))
    2668            0 :         });
    2669              : 
    2670            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2671            0 :         let new_delta_layer =
    2672            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2673              : 
    2674            0 :         self.new_deltas.push(new_delta_layer);
    2675            0 :         Ok(())
    2676            0 :     }
    2677              : 
    2678            0 :     async fn delete_layer(
    2679            0 :         &mut self,
    2680            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2681            0 :         _ctx: &RequestContext,
    2682            0 :     ) -> anyhow::Result<()> {
    2683            0 :         self.layers_to_delete.push(layer.clone().0);
    2684            0 :         Ok(())
    2685            0 :     }
    2686              : }
    2687              : 
    2688              : impl TimelineAdaptor {
    2689            0 :     async fn create_image_impl(
    2690            0 :         &mut self,
    2691            0 :         lsn: Lsn,
    2692            0 :         key_range: &Range<Key>,
    2693            0 :         ctx: &RequestContext,
    2694            0 :     ) -> Result<(), CreateImageLayersError> {
    2695            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2696              : 
    2697            0 :         let image_layer_writer = ImageLayerWriter::new(
    2698            0 :             self.timeline.conf,
    2699            0 :             self.timeline.timeline_id,
    2700            0 :             self.timeline.tenant_shard_id,
    2701            0 :             key_range,
    2702            0 :             lsn,
    2703            0 :             ctx,
    2704            0 :         )
    2705            0 :         .await?;
    2706              : 
    2707            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2708            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2709            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2710            0 :             )))
    2711            0 :         });
    2712              : 
    2713            0 :         let keyspace = KeySpace {
    2714            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2715              :         };
    2716              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2717            0 :         let start = Key::MIN;
    2718              :         let ImageLayerCreationOutcome {
    2719            0 :             image,
    2720              :             next_start_key: _,
    2721            0 :         } = self
    2722            0 :             .timeline
    2723            0 :             .create_image_layer_for_rel_blocks(
    2724            0 :                 &keyspace,
    2725            0 :                 image_layer_writer,
    2726            0 :                 lsn,
    2727            0 :                 ctx,
    2728            0 :                 key_range.clone(),
    2729            0 :                 start,
    2730            0 :             )
    2731            0 :             .await?;
    2732              : 
    2733            0 :         if let Some(image_layer) = image {
    2734            0 :             self.new_images.push(image_layer);
    2735            0 :         }
    2736              : 
    2737            0 :         timer.stop_and_record();
    2738            0 : 
    2739            0 :         Ok(())
    2740            0 :     }
    2741              : }
    2742              : 
    2743              : impl CompactionRequestContext for crate::context::RequestContext {}
    2744              : 
    2745              : #[derive(Debug, Clone)]
    2746              : pub struct OwnArc<T>(pub Arc<T>);
    2747              : 
    2748              : impl<T> Deref for OwnArc<T> {
    2749              :     type Target = <Arc<T> as Deref>::Target;
    2750            0 :     fn deref(&self) -> &Self::Target {
    2751            0 :         &self.0
    2752            0 :     }
    2753              : }
    2754              : 
    2755              : impl<T> AsRef<T> for OwnArc<T> {
    2756            0 :     fn as_ref(&self) -> &T {
    2757            0 :         self.0.as_ref()
    2758            0 :     }
    2759              : }
    2760              : 
    2761              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2762            0 :     fn key_range(&self) -> &Range<Key> {
    2763            0 :         &self.key_range
    2764            0 :     }
    2765            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2766            0 :         &self.lsn_range
    2767            0 :     }
    2768            0 :     fn file_size(&self) -> u64 {
    2769            0 :         self.file_size
    2770            0 :     }
    2771            0 :     fn short_id(&self) -> std::string::String {
    2772            0 :         self.as_ref().short_id().to_string()
    2773            0 :     }
    2774            0 :     fn is_delta(&self) -> bool {
    2775            0 :         self.as_ref().is_delta()
    2776            0 :     }
    2777              : }
    2778              : 
    2779              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2780            0 :     fn key_range(&self) -> &Range<Key> {
    2781            0 :         &self.layer_desc().key_range
    2782            0 :     }
    2783            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2784            0 :         &self.layer_desc().lsn_range
    2785            0 :     }
    2786            0 :     fn file_size(&self) -> u64 {
    2787            0 :         self.layer_desc().file_size
    2788            0 :     }
    2789            0 :     fn short_id(&self) -> std::string::String {
    2790            0 :         self.layer_desc().short_id().to_string()
    2791            0 :     }
    2792            0 :     fn is_delta(&self) -> bool {
    2793            0 :         true
    2794            0 :     }
    2795              : }
    2796              : 
    2797              : use crate::tenant::timeline::DeltaEntry;
    2798              : 
    2799              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2800            0 :     fn key_range(&self) -> &Range<Key> {
    2801            0 :         &self.0.layer_desc().key_range
    2802            0 :     }
    2803            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2804            0 :         &self.0.layer_desc().lsn_range
    2805            0 :     }
    2806            0 :     fn file_size(&self) -> u64 {
    2807            0 :         self.0.layer_desc().file_size
    2808            0 :     }
    2809            0 :     fn short_id(&self) -> std::string::String {
    2810            0 :         self.0.layer_desc().short_id().to_string()
    2811            0 :     }
    2812            0 :     fn is_delta(&self) -> bool {
    2813            0 :         true
    2814            0 :     }
    2815              : }
    2816              : 
    2817              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2818              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2819              : 
    2820            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2821            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    2822            0 :     }
    2823              : }
    2824              : 
    2825              : impl CompactionLayer<Key> for ResidentImageLayer {
    2826            0 :     fn key_range(&self) -> &Range<Key> {
    2827            0 :         &self.0.layer_desc().key_range
    2828            0 :     }
    2829            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2830            0 :         &self.0.layer_desc().lsn_range
    2831            0 :     }
    2832            0 :     fn file_size(&self) -> u64 {
    2833            0 :         self.0.layer_desc().file_size
    2834            0 :     }
    2835            0 :     fn short_id(&self) -> std::string::String {
    2836            0 :         self.0.layer_desc().short_id().to_string()
    2837            0 :     }
    2838            0 :     fn is_delta(&self) -> bool {
    2839            0 :         false
    2840            0 :     }
    2841              : }
    2842              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta