LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 892dcde01f16175bbb7038896f6f080ec7094ee6.info Lines: 51.8 % 3040 1574
Test Date: 2025-05-22 14:16:19 Functions: 40.7 % 189 77

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : use std::time::{Duration, Instant};
      11              : 
      12              : use super::layer_manager::LayerManager;
      13              : use super::{
      14              :     CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
      15              :     GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
      16              :     Timeline,
      17              : };
      18              : 
      19              : use crate::tenant::timeline::DeltaEntry;
      20              : use crate::walredo::RedoAttemptType;
      21              : use anyhow::{Context, anyhow};
      22              : use bytes::Bytes;
      23              : use enumset::EnumSet;
      24              : use fail::fail_point;
      25              : use futures::FutureExt;
      26              : use itertools::Itertools;
      27              : use once_cell::sync::Lazy;
      28              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
      29              : use pageserver_api::key::{KEY_SIZE, Key};
      30              : use pageserver_api::keyspace::{KeySpace, ShardedRange};
      31              : use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
      32              : use pageserver_api::record::NeonWalRecord;
      33              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      34              : use pageserver_api::value::Value;
      35              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      36              : use pageserver_compaction::interface::*;
      37              : use serde::Serialize;
      38              : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
      39              : use tokio_util::sync::CancellationToken;
      40              : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
      41              : use utils::critical;
      42              : use utils::id::TimelineId;
      43              : use utils::lsn::Lsn;
      44              : 
      45              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      46              : use crate::page_cache;
      47              : use crate::statvfs::Statvfs;
      48              : use crate::tenant::checks::check_valid_layermap;
      49              : use crate::tenant::gc_block::GcBlock;
      50              : use crate::tenant::layer_map::LayerMap;
      51              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      52              : use crate::tenant::remote_timeline_client::index::GcCompactionState;
      53              : use crate::tenant::storage_layer::batch_split_writer::{
      54              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      55              : };
      56              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      57              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      58              : use crate::tenant::storage_layer::{
      59              :     AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
      60              :     ValueReconstructState,
      61              : };
      62              : use crate::tenant::tasks::log_compaction_error;
      63              : use crate::tenant::timeline::{
      64              :     DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
      65              :     ResidentLayer, drop_rlock,
      66              : };
      67              : use crate::tenant::{DeltaLayer, MaybeOffloaded};
      68              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      69              : 
      70              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      71              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      72              : 
      73              : /// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
      74              : /// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
      75              : ///
      76              : /// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
      77              : /// shard split, which gets expensive for large tenants.
      78              : const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
      79              : 
      80              : #[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
      81              : pub struct GcCompactionJobId(pub usize);
      82              : 
      83              : impl std::fmt::Display for GcCompactionJobId {
      84            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      85            0 :         write!(f, "{}", self.0)
      86            0 :     }
      87              : }
      88              : 
      89              : pub struct GcCompactionCombinedSettings {
      90              :     pub gc_compaction_enabled: bool,
      91              :     pub gc_compaction_verification: bool,
      92              :     pub gc_compaction_initial_threshold_kb: u64,
      93              :     pub gc_compaction_ratio_percent: u64,
      94              : }
      95              : 
      96              : #[derive(Debug, Clone)]
      97              : pub enum GcCompactionQueueItem {
      98              :     MetaJob {
      99              :         /// Compaction options
     100              :         options: CompactOptions,
     101              :         /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
     102              :         auto: bool,
     103              :     },
     104              :     SubCompactionJob(CompactOptions),
     105              :     Notify(GcCompactionJobId, Option<Lsn>),
     106              : }
     107              : 
     108              : /// Statistics for gc-compaction meta jobs, which contains several sub compaction jobs.
     109              : #[derive(Debug, Clone, Serialize, Default)]
     110              : pub struct GcCompactionMetaStatistics {
     111              :     /// The total number of sub compaction jobs.
     112              :     pub total_sub_compaction_jobs: usize,
     113              :     /// The total number of sub compaction jobs that failed.
     114              :     pub failed_sub_compaction_jobs: usize,
     115              :     /// The total number of sub compaction jobs that succeeded.
     116              :     pub succeeded_sub_compaction_jobs: usize,
     117              :     /// The layer size before compaction.
     118              :     pub before_compaction_layer_size: u64,
     119              :     /// The layer size after compaction.
     120              :     pub after_compaction_layer_size: u64,
     121              :     /// The start time of the meta job.
     122              :     pub start_time: Option<chrono::DateTime<chrono::Utc>>,
     123              :     /// The end time of the meta job.
     124              :     pub end_time: Option<chrono::DateTime<chrono::Utc>>,
     125              :     /// The duration of the meta job.
     126              :     pub duration_secs: f64,
     127              :     /// The id of the meta job.
     128              :     pub meta_job_id: GcCompactionJobId,
     129              :     /// The LSN below which the layers are compacted, used to compute the statistics.
     130              :     pub below_lsn: Lsn,
     131              :     /// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
     132              :     pub retention_ratio: f64,
     133              : }
     134              : 
     135              : impl GcCompactionMetaStatistics {
     136            0 :     fn finalize(&mut self) {
     137            0 :         let end_time = chrono::Utc::now();
     138            0 :         if let Some(start_time) = self.start_time {
     139            0 :             if end_time > start_time {
     140            0 :                 let delta = end_time - start_time;
     141            0 :                 if let Ok(std_dur) = delta.to_std() {
     142            0 :                     self.duration_secs = std_dur.as_secs_f64();
     143            0 :                 }
     144            0 :             }
     145            0 :         }
     146            0 :         self.retention_ratio = self.after_compaction_layer_size as f64
     147            0 :             / (self.before_compaction_layer_size as f64 + 1.0);
     148            0 :         self.end_time = Some(end_time);
     149            0 :     }
     150              : }
     151              : 
     152              : impl GcCompactionQueueItem {
     153            0 :     pub fn into_compact_info_resp(
     154            0 :         self,
     155            0 :         id: GcCompactionJobId,
     156            0 :         running: bool,
     157            0 :     ) -> Option<CompactInfoResponse> {
     158            0 :         match self {
     159            0 :             GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
     160            0 :                 compact_key_range: options.compact_key_range,
     161            0 :                 compact_lsn_range: options.compact_lsn_range,
     162            0 :                 sub_compaction: options.sub_compaction,
     163            0 :                 running,
     164            0 :                 job_id: id.0,
     165            0 :             }),
     166            0 :             GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
     167            0 :                 compact_key_range: options.compact_key_range,
     168            0 :                 compact_lsn_range: options.compact_lsn_range,
     169            0 :                 sub_compaction: options.sub_compaction,
     170            0 :                 running,
     171            0 :                 job_id: id.0,
     172            0 :             }),
     173            0 :             GcCompactionQueueItem::Notify(_, _) => None,
     174              :         }
     175            0 :     }
     176              : }
     177              : 
     178              : #[derive(Default)]
     179              : struct GcCompactionGuardItems {
     180              :     notify: Option<tokio::sync::oneshot::Sender<()>>,
     181              :     permit: Option<OwnedSemaphorePermit>,
     182              : }
     183              : 
     184              : struct GcCompactionQueueInner {
     185              :     running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     186              :     queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     187              :     guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
     188              :     last_id: GcCompactionJobId,
     189              :     meta_statistics: Option<GcCompactionMetaStatistics>,
     190              : }
     191              : 
     192              : impl GcCompactionQueueInner {
     193            0 :     fn next_id(&mut self) -> GcCompactionJobId {
     194            0 :         let id = self.last_id;
     195            0 :         self.last_id = GcCompactionJobId(id.0 + 1);
     196            0 :         id
     197            0 :     }
     198              : }
     199              : 
     200              : /// A structure to store gc_compaction jobs.
     201              : pub struct GcCompactionQueue {
     202              :     /// All items in the queue, and the currently-running job.
     203              :     inner: std::sync::Mutex<GcCompactionQueueInner>,
     204              :     /// Ensure only one thread is consuming the queue.
     205              :     consumer_lock: tokio::sync::Mutex<()>,
     206              : }
     207              : 
     208            0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
     209            0 :     // Only allow two timelines on one pageserver to run gc compaction at a time.
     210            0 :     Arc::new(Semaphore::new(2))
     211            0 : });
     212              : 
     213              : impl GcCompactionQueue {
     214            0 :     pub fn new() -> Self {
     215            0 :         GcCompactionQueue {
     216            0 :             inner: std::sync::Mutex::new(GcCompactionQueueInner {
     217            0 :                 running: None,
     218            0 :                 queued: VecDeque::new(),
     219            0 :                 guards: HashMap::new(),
     220            0 :                 last_id: GcCompactionJobId(0),
     221            0 :                 meta_statistics: None,
     222            0 :             }),
     223            0 :             consumer_lock: tokio::sync::Mutex::new(()),
     224            0 :         }
     225            0 :     }
     226              : 
     227            0 :     pub fn cancel_scheduled(&self) {
     228            0 :         let mut guard = self.inner.lock().unwrap();
     229            0 :         guard.queued.clear();
     230            0 :         // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
     231            0 :         // API is only used for testing purposes, so we can drop everything here.
     232            0 :         guard.guards.clear();
     233            0 :     }
     234              : 
     235              :     /// Schedule a manual compaction job.
     236            0 :     pub fn schedule_manual_compaction(
     237            0 :         &self,
     238            0 :         options: CompactOptions,
     239            0 :         notify: Option<tokio::sync::oneshot::Sender<()>>,
     240            0 :     ) -> GcCompactionJobId {
     241            0 :         let mut guard = self.inner.lock().unwrap();
     242            0 :         let id = guard.next_id();
     243            0 :         guard.queued.push_back((
     244            0 :             id,
     245            0 :             GcCompactionQueueItem::MetaJob {
     246            0 :                 options,
     247            0 :                 auto: false,
     248            0 :             },
     249            0 :         ));
     250            0 :         guard.guards.entry(id).or_default().notify = notify;
     251            0 :         info!("scheduled compaction job id={}", id);
     252            0 :         id
     253            0 :     }
     254              : 
     255              :     /// Schedule an auto compaction job.
     256            0 :     fn schedule_auto_compaction(
     257            0 :         &self,
     258            0 :         options: CompactOptions,
     259            0 :         permit: OwnedSemaphorePermit,
     260            0 :     ) -> GcCompactionJobId {
     261            0 :         let mut guard = self.inner.lock().unwrap();
     262            0 :         let id = guard.next_id();
     263            0 :         guard.queued.push_back((
     264            0 :             id,
     265            0 :             GcCompactionQueueItem::MetaJob {
     266            0 :                 options,
     267            0 :                 auto: true,
     268            0 :             },
     269            0 :         ));
     270            0 :         guard.guards.entry(id).or_default().permit = Some(permit);
     271            0 :         id
     272            0 :     }
     273              : 
     274              :     /// Trigger an auto compaction.
     275            0 :     pub async fn trigger_auto_compaction(
     276            0 :         &self,
     277            0 :         timeline: &Arc<Timeline>,
     278            0 :     ) -> Result<(), CompactionError> {
     279            0 :         let GcCompactionCombinedSettings {
     280            0 :             gc_compaction_enabled,
     281            0 :             gc_compaction_initial_threshold_kb,
     282            0 :             gc_compaction_ratio_percent,
     283            0 :             ..
     284            0 :         } = timeline.get_gc_compaction_settings();
     285            0 :         if !gc_compaction_enabled {
     286            0 :             return Ok(());
     287            0 :         }
     288            0 :         if self.remaining_jobs_num() > 0 {
     289              :             // Only schedule auto compaction when the queue is empty
     290            0 :             return Ok(());
     291            0 :         }
     292            0 :         if timeline.ancestor_timeline().is_some() {
     293              :             // Do not trigger auto compaction for child timelines. We haven't tested
     294              :             // it enough in staging yet.
     295            0 :             return Ok(());
     296            0 :         }
     297            0 :         if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
     298              :             // If the gc watermark is not set, we don't need to trigger auto compaction.
     299              :             // This check is the same as in `gc_compaction_split_jobs` but we don't log
     300              :             // here and we can also skip the computation of the trigger condition earlier.
     301            0 :             return Ok(());
     302            0 :         }
     303              : 
     304            0 :         let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
     305              :             // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
     306              :             // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
     307              :             // to ensure the fairness while avoid starving other tasks.
     308            0 :             return Ok(());
     309              :         };
     310              : 
     311            0 :         let gc_compaction_state = timeline.get_gc_compaction_state();
     312            0 :         let l2_lsn = gc_compaction_state
     313            0 :             .map(|x| x.last_completed_lsn)
     314            0 :             .unwrap_or(Lsn::INVALID);
     315              : 
     316            0 :         let layers = {
     317            0 :             let guard = timeline.layers.read().await;
     318            0 :             let layer_map = guard.layer_map()?;
     319            0 :             layer_map.iter_historic_layers().collect_vec()
     320            0 :         };
     321            0 :         let mut l2_size: u64 = 0;
     322            0 :         let mut l1_size = 0;
     323            0 :         let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
     324            0 :         for layer in layers {
     325            0 :             if layer.lsn_range.start <= l2_lsn {
     326            0 :                 l2_size += layer.file_size();
     327            0 :             } else if layer.lsn_range.start <= gc_cutoff {
     328            0 :                 l1_size += layer.file_size();
     329            0 :             }
     330              :         }
     331              : 
     332            0 :         fn trigger_compaction(
     333            0 :             l1_size: u64,
     334            0 :             l2_size: u64,
     335            0 :             gc_compaction_initial_threshold_kb: u64,
     336            0 :             gc_compaction_ratio_percent: u64,
     337            0 :         ) -> bool {
     338              :             const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
     339            0 :             if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
     340              :                 // Do not auto-trigger when physical size >= 150GB
     341            0 :                 return false;
     342            0 :             }
     343            0 :             // initial trigger
     344            0 :             if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
     345            0 :                 info!(
     346            0 :                     "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
     347              :                     l1_size, gc_compaction_initial_threshold_kb
     348              :                 );
     349            0 :                 return true;
     350            0 :             }
     351            0 :             // size ratio trigger
     352            0 :             if l2_size == 0 {
     353            0 :                 return false;
     354            0 :             }
     355            0 :             if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
     356            0 :                 info!(
     357            0 :                     "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
     358              :                     l1_size, l2_size, gc_compaction_ratio_percent
     359              :                 );
     360            0 :                 return true;
     361            0 :             }
     362            0 :             false
     363            0 :         }
     364              : 
     365            0 :         if trigger_compaction(
     366            0 :             l1_size,
     367            0 :             l2_size,
     368            0 :             gc_compaction_initial_threshold_kb,
     369            0 :             gc_compaction_ratio_percent,
     370            0 :         ) {
     371            0 :             self.schedule_auto_compaction(
     372            0 :                 CompactOptions {
     373            0 :                     flags: {
     374            0 :                         let mut flags = EnumSet::new();
     375            0 :                         flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     376            0 :                         if timeline.get_compaction_l0_first() {
     377            0 :                             flags |= CompactFlags::YieldForL0;
     378            0 :                         }
     379            0 :                         flags
     380            0 :                     },
     381            0 :                     sub_compaction: true,
     382            0 :                     // Only auto-trigger gc-compaction over the data keyspace due to concerns in
     383            0 :                     // https://github.com/neondatabase/neon/issues/11318.
     384            0 :                     compact_key_range: Some(CompactKeyRange {
     385            0 :                         start: Key::MIN,
     386            0 :                         end: Key::metadata_key_range().start,
     387            0 :                     }),
     388            0 :                     compact_lsn_range: None,
     389            0 :                     sub_compaction_max_job_size_mb: None,
     390            0 :                 },
     391            0 :                 permit,
     392            0 :             );
     393            0 :             info!(
     394            0 :                 "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     395              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     396              :             );
     397              :         } else {
     398            0 :             debug!(
     399            0 :                 "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     400              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     401              :             );
     402              :         }
     403            0 :         Ok(())
     404            0 :     }
     405              : 
     406            0 :     async fn collect_layer_below_lsn(
     407            0 :         &self,
     408            0 :         timeline: &Arc<Timeline>,
     409            0 :         lsn: Lsn,
     410            0 :     ) -> Result<u64, CompactionError> {
     411            0 :         let guard = timeline.layers.read().await;
     412            0 :         let layer_map = guard.layer_map()?;
     413            0 :         let layers = layer_map.iter_historic_layers().collect_vec();
     414            0 :         let mut size = 0;
     415            0 :         for layer in layers {
     416            0 :             if layer.lsn_range.start <= lsn {
     417            0 :                 size += layer.file_size();
     418            0 :             }
     419              :         }
     420            0 :         Ok(size)
     421            0 :     }
     422              : 
     423              :     /// Notify the caller the job has finished and unblock GC.
     424            0 :     fn notify_and_unblock(&self, id: GcCompactionJobId) {
     425            0 :         info!("compaction job id={} finished", id);
     426            0 :         let mut guard = self.inner.lock().unwrap();
     427            0 :         if let Some(items) = guard.guards.remove(&id) {
     428            0 :             if let Some(tx) = items.notify {
     429            0 :                 let _ = tx.send(());
     430            0 :             }
     431            0 :         }
     432            0 :         if let Some(ref meta_statistics) = guard.meta_statistics {
     433            0 :             if meta_statistics.meta_job_id == id {
     434            0 :                 if let Ok(stats) = serde_json::to_string(&meta_statistics) {
     435            0 :                     info!(
     436            0 :                         "gc-compaction meta statistics for job id = {}: {}",
     437              :                         id, stats
     438              :                     );
     439            0 :                 }
     440            0 :             }
     441            0 :         }
     442            0 :     }
     443              : 
     444            0 :     fn clear_running_job(&self) {
     445            0 :         let mut guard = self.inner.lock().unwrap();
     446            0 :         guard.running = None;
     447            0 :     }
     448              : 
     449            0 :     async fn handle_sub_compaction(
     450            0 :         &self,
     451            0 :         id: GcCompactionJobId,
     452            0 :         options: CompactOptions,
     453            0 :         timeline: &Arc<Timeline>,
     454            0 :         auto: bool,
     455            0 :     ) -> Result<(), CompactionError> {
     456            0 :         info!(
     457            0 :             "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     458              :         );
     459            0 :         let res = timeline
     460            0 :             .gc_compaction_split_jobs(
     461            0 :                 GcCompactJob::from_compact_options(options.clone()),
     462            0 :                 options.sub_compaction_max_job_size_mb,
     463            0 :             )
     464            0 :             .await;
     465            0 :         let jobs = match res {
     466            0 :             Ok(jobs) => jobs,
     467            0 :             Err(err) => {
     468            0 :                 warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
     469            0 :                 self.notify_and_unblock(id);
     470            0 :                 return Err(err);
     471              :             }
     472              :         };
     473            0 :         if jobs.is_empty() {
     474            0 :             info!("no jobs to run, skipping scheduled compaction task");
     475            0 :             self.notify_and_unblock(id);
     476              :         } else {
     477            0 :             let jobs_len = jobs.len();
     478            0 :             let mut pending_tasks = Vec::new();
     479            0 :             // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
     480            0 :             // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
     481            0 :             let expected_l2_lsn = jobs
     482            0 :                 .iter()
     483            0 :                 .map(|job| job.compact_lsn_range.end)
     484            0 :                 .max()
     485            0 :                 .unwrap();
     486            0 :             for job in jobs {
     487              :                 // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
     488              :                 // until we do further refactors to allow directly call `compact_with_gc`.
     489            0 :                 let mut flags: EnumSet<CompactFlags> = EnumSet::default();
     490            0 :                 flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     491            0 :                 if job.dry_run {
     492            0 :                     flags |= CompactFlags::DryRun;
     493            0 :                 }
     494            0 :                 if options.flags.contains(CompactFlags::YieldForL0) {
     495            0 :                     flags |= CompactFlags::YieldForL0;
     496            0 :                 }
     497            0 :                 let options = CompactOptions {
     498            0 :                     flags,
     499            0 :                     sub_compaction: false,
     500            0 :                     compact_key_range: Some(job.compact_key_range.into()),
     501            0 :                     compact_lsn_range: Some(job.compact_lsn_range.into()),
     502            0 :                     sub_compaction_max_job_size_mb: None,
     503            0 :                 };
     504            0 :                 pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
     505              :             }
     506              : 
     507            0 :             if !auto {
     508            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
     509            0 :             } else {
     510            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, Some(expected_l2_lsn)));
     511            0 :             }
     512              : 
     513            0 :             let layer_size = self
     514            0 :                 .collect_layer_below_lsn(timeline, expected_l2_lsn)
     515            0 :                 .await?;
     516              : 
     517              :             {
     518            0 :                 let mut guard = self.inner.lock().unwrap();
     519            0 :                 let mut tasks = Vec::new();
     520            0 :                 for task in pending_tasks {
     521            0 :                     let id = guard.next_id();
     522            0 :                     tasks.push((id, task));
     523            0 :                 }
     524            0 :                 tasks.reverse();
     525            0 :                 for item in tasks {
     526            0 :                     guard.queued.push_front(item);
     527            0 :                 }
     528            0 :                 guard.meta_statistics = Some(GcCompactionMetaStatistics {
     529            0 :                     meta_job_id: id,
     530            0 :                     start_time: Some(chrono::Utc::now()),
     531            0 :                     before_compaction_layer_size: layer_size,
     532            0 :                     below_lsn: expected_l2_lsn,
     533            0 :                     total_sub_compaction_jobs: jobs_len,
     534            0 :                     ..Default::default()
     535            0 :                 });
     536            0 :             }
     537            0 : 
     538            0 :             info!(
     539            0 :                 "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
     540              :                 jobs_len
     541              :             );
     542              :         }
     543            0 :         Ok(())
     544            0 :     }
     545              : 
     546              :     /// Take a job from the queue and process it. Returns if there are still pending tasks.
     547            0 :     pub async fn iteration(
     548            0 :         &self,
     549            0 :         cancel: &CancellationToken,
     550            0 :         ctx: &RequestContext,
     551            0 :         gc_block: &GcBlock,
     552            0 :         timeline: &Arc<Timeline>,
     553            0 :     ) -> Result<CompactionOutcome, CompactionError> {
     554            0 :         let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
     555            0 :         if let Err(err) = &res {
     556            0 :             log_compaction_error(err, None, cancel.is_cancelled(), true);
     557            0 :         }
     558            0 :         match res {
     559            0 :             Ok(res) => Ok(res),
     560            0 :             Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
     561              :             Err(_) => {
     562              :                 // There are some cases where traditional gc might collect some layer
     563              :                 // files causing gc-compaction cannot read the full history of the key.
     564              :                 // This needs to be resolved in the long-term by improving the compaction
     565              :                 // process. For now, let's simply avoid such errors triggering the
     566              :                 // circuit breaker.
     567            0 :                 Ok(CompactionOutcome::Skipped)
     568              :             }
     569              :         }
     570            0 :     }
     571              : 
     572            0 :     async fn iteration_inner(
     573            0 :         &self,
     574            0 :         cancel: &CancellationToken,
     575            0 :         ctx: &RequestContext,
     576            0 :         gc_block: &GcBlock,
     577            0 :         timeline: &Arc<Timeline>,
     578            0 :     ) -> Result<CompactionOutcome, CompactionError> {
     579            0 :         let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
     580            0 :             return Err(CompactionError::AlreadyRunning(
     581            0 :                 "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
     582            0 :             ));
     583              :         };
     584              :         let has_pending_tasks;
     585            0 :         let mut yield_for_l0 = false;
     586            0 :         let Some((id, item)) = ({
     587            0 :             let mut guard = self.inner.lock().unwrap();
     588            0 :             if let Some((id, item)) = guard.queued.pop_front() {
     589            0 :                 guard.running = Some((id, item.clone()));
     590            0 :                 has_pending_tasks = !guard.queued.is_empty();
     591            0 :                 Some((id, item))
     592              :             } else {
     593            0 :                 has_pending_tasks = false;
     594            0 :                 None
     595              :             }
     596              :         }) else {
     597            0 :             self.trigger_auto_compaction(timeline).await?;
     598              :             // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
     599              :             // have not implemented preemption mechanism yet. We always want to yield it to more important
     600              :             // tasks if there is one.
     601            0 :             return Ok(CompactionOutcome::Done);
     602              :         };
     603            0 :         match item {
     604            0 :             GcCompactionQueueItem::MetaJob { options, auto } => {
     605            0 :                 if !options
     606            0 :                     .flags
     607            0 :                     .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     608              :                 {
     609            0 :                     warn!(
     610            0 :                         "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
     611              :                         options
     612              :                     );
     613            0 :                 } else if options.sub_compaction {
     614            0 :                     info!(
     615            0 :                         "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     616              :                     );
     617            0 :                     self.handle_sub_compaction(id, options, timeline, auto)
     618            0 :                         .await?;
     619              :                 } else {
     620              :                     // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
     621              :                     // in this branch.
     622            0 :                     let _gc_guard = match gc_block.start().await {
     623            0 :                         Ok(guard) => guard,
     624            0 :                         Err(e) => {
     625            0 :                             self.notify_and_unblock(id);
     626            0 :                             self.clear_running_job();
     627            0 :                             return Err(CompactionError::Other(anyhow!(
     628            0 :                                 "cannot run gc-compaction because gc is blocked: {}",
     629            0 :                                 e
     630            0 :                             )));
     631              :                         }
     632              :                     };
     633            0 :                     let res = timeline.compact_with_options(cancel, options, ctx).await;
     634            0 :                     let compaction_result = match res {
     635            0 :                         Ok(res) => res,
     636            0 :                         Err(err) => {
     637            0 :                             warn!(%err, "failed to run gc-compaction");
     638            0 :                             self.notify_and_unblock(id);
     639            0 :                             self.clear_running_job();
     640            0 :                             return Err(err);
     641              :                         }
     642              :                     };
     643            0 :                     if compaction_result == CompactionOutcome::YieldForL0 {
     644            0 :                         yield_for_l0 = true;
     645            0 :                     }
     646              :                 }
     647              :             }
     648            0 :             GcCompactionQueueItem::SubCompactionJob(options) => {
     649              :                 // TODO: error handling, clear the queue if any task fails?
     650            0 :                 let _gc_guard = match gc_block.start().await {
     651            0 :                     Ok(guard) => guard,
     652            0 :                     Err(e) => {
     653            0 :                         self.clear_running_job();
     654            0 :                         return Err(CompactionError::Other(anyhow!(
     655            0 :                             "cannot run gc-compaction because gc is blocked: {}",
     656            0 :                             e
     657            0 :                         )));
     658              :                     }
     659              :                 };
     660            0 :                 let res = timeline.compact_with_options(cancel, options, ctx).await;
     661            0 :                 let compaction_result = match res {
     662            0 :                     Ok(res) => res,
     663            0 :                     Err(err) => {
     664            0 :                         warn!(%err, "failed to run gc-compaction subcompaction job");
     665            0 :                         self.clear_running_job();
     666            0 :                         let mut guard = self.inner.lock().unwrap();
     667            0 :                         if let Some(ref mut meta_statistics) = guard.meta_statistics {
     668            0 :                             meta_statistics.failed_sub_compaction_jobs += 1;
     669            0 :                         }
     670            0 :                         return Err(err);
     671              :                     }
     672              :                 };
     673            0 :                 if compaction_result == CompactionOutcome::YieldForL0 {
     674            0 :                     // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
     675            0 :                     // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
     676            0 :                     // we need to clean things up before returning from the function.
     677            0 :                     yield_for_l0 = true;
     678            0 :                 }
     679              :                 {
     680            0 :                     let mut guard = self.inner.lock().unwrap();
     681            0 :                     if let Some(ref mut meta_statistics) = guard.meta_statistics {
     682            0 :                         meta_statistics.succeeded_sub_compaction_jobs += 1;
     683            0 :                     }
     684              :                 }
     685              :             }
     686            0 :             GcCompactionQueueItem::Notify(id, l2_lsn) => {
     687            0 :                 let below_lsn = {
     688            0 :                     let mut guard = self.inner.lock().unwrap();
     689            0 :                     if let Some(ref mut meta_statistics) = guard.meta_statistics {
     690            0 :                         meta_statistics.below_lsn
     691              :                     } else {
     692            0 :                         Lsn::INVALID
     693              :                     }
     694              :                 };
     695            0 :                 let layer_size = if below_lsn != Lsn::INVALID {
     696            0 :                     self.collect_layer_below_lsn(timeline, below_lsn).await?
     697              :                 } else {
     698            0 :                     0
     699              :                 };
     700              :                 {
     701            0 :                     let mut guard = self.inner.lock().unwrap();
     702            0 :                     if let Some(ref mut meta_statistics) = guard.meta_statistics {
     703            0 :                         meta_statistics.after_compaction_layer_size = layer_size;
     704            0 :                         meta_statistics.finalize();
     705            0 :                     }
     706              :                 }
     707            0 :                 self.notify_and_unblock(id);
     708            0 :                 if let Some(l2_lsn) = l2_lsn {
     709            0 :                     let current_l2_lsn = timeline
     710            0 :                         .get_gc_compaction_state()
     711            0 :                         .map(|x| x.last_completed_lsn)
     712            0 :                         .unwrap_or(Lsn::INVALID);
     713            0 :                     if l2_lsn >= current_l2_lsn {
     714            0 :                         info!("l2_lsn updated to {}", l2_lsn);
     715            0 :                         timeline
     716            0 :                             .update_gc_compaction_state(GcCompactionState {
     717            0 :                                 last_completed_lsn: l2_lsn,
     718            0 :                             })
     719            0 :                             .map_err(CompactionError::Other)?;
     720              :                     } else {
     721            0 :                         warn!(
     722            0 :                             "l2_lsn updated to {} but it is less than the current l2_lsn {}",
     723              :                             l2_lsn, current_l2_lsn
     724              :                         );
     725              :                     }
     726            0 :                 }
     727              :             }
     728              :         }
     729            0 :         self.clear_running_job();
     730            0 :         Ok(if yield_for_l0 {
     731            0 :             tracing::info!("give up gc-compaction: yield for L0 compaction");
     732            0 :             CompactionOutcome::YieldForL0
     733            0 :         } else if has_pending_tasks {
     734            0 :             CompactionOutcome::Pending
     735              :         } else {
     736            0 :             CompactionOutcome::Done
     737              :         })
     738            0 :     }
     739              : 
     740              :     #[allow(clippy::type_complexity)]
     741            0 :     pub fn remaining_jobs(
     742            0 :         &self,
     743            0 :     ) -> (
     744            0 :         Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     745            0 :         VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     746            0 :     ) {
     747            0 :         let guard = self.inner.lock().unwrap();
     748            0 :         (guard.running.clone(), guard.queued.clone())
     749            0 :     }
     750              : 
     751            0 :     pub fn remaining_jobs_num(&self) -> usize {
     752            0 :         let guard = self.inner.lock().unwrap();
     753            0 :         guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
     754            0 :     }
     755              : }
     756              : 
     757              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
     758              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
     759              : /// called.
     760              : #[derive(Debug, Clone)]
     761              : pub(crate) struct GcCompactJob {
     762              :     pub dry_run: bool,
     763              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
     764              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
     765              :     pub compact_key_range: Range<Key>,
     766              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
     767              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
     768              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
     769              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
     770              :     pub compact_lsn_range: Range<Lsn>,
     771              : }
     772              : 
     773              : impl GcCompactJob {
     774           28 :     pub fn from_compact_options(options: CompactOptions) -> Self {
     775           28 :         GcCompactJob {
     776           28 :             dry_run: options.flags.contains(CompactFlags::DryRun),
     777           28 :             compact_key_range: options
     778           28 :                 .compact_key_range
     779           28 :                 .map(|x| x.into())
     780           28 :                 .unwrap_or(Key::MIN..Key::MAX),
     781           28 :             compact_lsn_range: options
     782           28 :                 .compact_lsn_range
     783           28 :                 .map(|x| x.into())
     784           28 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     785           28 :         }
     786           28 :     }
     787              : }
     788              : 
     789              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     790              : /// and contains the exact layers we want to compact.
     791              : pub struct GcCompactionJobDescription {
     792              :     /// All layers to read in the compaction job
     793              :     selected_layers: Vec<Layer>,
     794              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     795              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     796              :     gc_cutoff: Lsn,
     797              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     798              :     /// generate an image == this LSN.
     799              :     retain_lsns_below_horizon: Vec<Lsn>,
     800              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     801              :     /// \>= this LSN will be kept and will not be rewritten.
     802              :     max_layer_lsn: Lsn,
     803              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     804              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     805              :     /// k-merge within gc-compaction.
     806              :     min_layer_lsn: Lsn,
     807              :     /// Only compact layers overlapping with this range.
     808              :     compaction_key_range: Range<Key>,
     809              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     810              :     /// This field is here solely for debugging. The field will not be read once the compaction
     811              :     /// description is generated.
     812              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     813              : }
     814              : 
     815              : /// The result of bottom-most compaction for a single key at each LSN.
     816              : #[derive(Debug)]
     817              : #[cfg_attr(test, derive(PartialEq))]
     818              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     819              : 
     820              : /// The result of bottom-most compaction.
     821              : #[derive(Debug)]
     822              : #[cfg_attr(test, derive(PartialEq))]
     823              : pub(crate) struct KeyHistoryRetention {
     824              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     825              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     826              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     827              :     pub(crate) above_horizon: KeyLogAtLsn,
     828              : }
     829              : 
     830              : impl KeyHistoryRetention {
     831              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     832              :     ///
     833              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     834              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     835              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     836              :     /// Then we delete branch @ 0x20.
     837              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     838              :     /// And that wouldnt' change the shape of the layer.
     839              :     ///
     840              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     841              :     ///
     842              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     843           37 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     844           37 :         if dry_run {
     845            0 :             return true;
     846           37 :         }
     847           37 :         if LayerMap::is_l0(&key.key_range, key.is_delta) {
     848              :             // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
     849              :             // We should ignore such layers.
     850            0 :             return true;
     851           37 :         }
     852              :         let layer_generation;
     853              :         {
     854           37 :             let guard = tline.layers.read().await;
     855           37 :             if !guard.contains_key(key) {
     856           26 :                 return false;
     857           11 :             }
     858           11 :             layer_generation = guard.get_from_key(key).metadata().generation;
     859           11 :         }
     860           11 :         if layer_generation == tline.generation {
     861           11 :             info!(
     862              :                 key=%key,
     863              :                 ?layer_generation,
     864            0 :                 "discard layer due to duplicated layer key in the same generation",
     865              :             );
     866           11 :             true
     867              :         } else {
     868            0 :             false
     869              :         }
     870           37 :     }
     871              : 
     872              :     /// Pipe a history of a single key to the writers.
     873              :     ///
     874              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     875              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     876              :     #[allow(clippy::too_many_arguments)]
     877          319 :     async fn pipe_to(
     878          319 :         self,
     879          319 :         key: Key,
     880          319 :         delta_writer: &mut SplitDeltaLayerWriter<'_>,
     881          319 :         mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
     882          319 :         stat: &mut CompactionStatistics,
     883          319 :         ctx: &RequestContext,
     884          319 :     ) -> anyhow::Result<()> {
     885          319 :         let mut first_batch = true;
     886         1022 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     887          703 :             if first_batch {
     888          319 :                 if logs.len() == 1 && logs[0].1.is_image() {
     889          300 :                     let Value::Image(img) = &logs[0].1 else {
     890            0 :                         unreachable!()
     891              :                     };
     892          300 :                     stat.produce_image_key(img);
     893          300 :                     if let Some(image_writer) = image_writer.as_mut() {
     894          300 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     895              :                     } else {
     896            0 :                         delta_writer
     897            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     898            0 :                             .await?;
     899              :                     }
     900              :                 } else {
     901           33 :                     for (lsn, val) in logs {
     902           14 :                         stat.produce_key(&val);
     903           14 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     904              :                     }
     905              :                 }
     906          319 :                 first_batch = false;
     907              :             } else {
     908          442 :                 for (lsn, val) in logs {
     909           58 :                     stat.produce_key(&val);
     910           58 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     911              :                 }
     912              :             }
     913              :         }
     914          319 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     915          348 :         for (lsn, val) in above_horizon_logs {
     916           29 :             stat.produce_key(&val);
     917           29 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     918              :         }
     919          319 :         Ok(())
     920          319 :     }
     921              : 
     922              :     /// Verify if every key in the retention is readable by replaying the logs.
     923          323 :     async fn verify(
     924          323 :         &self,
     925          323 :         key: Key,
     926          323 :         base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
     927          323 :         full_history: &[(Key, Lsn, Value)],
     928          323 :         tline: &Arc<Timeline>,
     929          323 :     ) -> anyhow::Result<()> {
     930              :         // Usually the min_lsn should be the first record but we do a full iteration to be safe.
     931          458 :         let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
     932              :             // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
     933            0 :             return Ok(());
     934              :         };
     935          458 :         let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
     936              :             // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
     937            0 :             return Ok(());
     938              :         };
     939          323 :         let mut base_img = base_img_from_ancestor
     940          323 :             .as_ref()
     941          323 :             .map(|(_, lsn, img)| (*lsn, img));
     942          323 :         let mut history = Vec::new();
     943              : 
     944         1027 :         async fn collect_and_verify(
     945         1027 :             key: Key,
     946         1027 :             lsn: Lsn,
     947         1027 :             base_img: &Option<(Lsn, &Bytes)>,
     948         1027 :             history: &[(Lsn, &NeonWalRecord)],
     949         1027 :             tline: &Arc<Timeline>,
     950         1027 :             skip_empty: bool,
     951         1027 :         ) -> anyhow::Result<()> {
     952         1027 :             if base_img.is_none() && history.is_empty() {
     953            0 :                 if skip_empty {
     954            0 :                     return Ok(());
     955            0 :                 }
     956            0 :                 anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
     957         1027 :             };
     958         1027 : 
     959         1027 :             let mut records = history
     960         1027 :                 .iter()
     961         1027 :                 .map(|(lsn, val)| (*lsn, (*val).clone()))
     962         1027 :                 .collect::<Vec<_>>();
     963         1027 : 
     964         1027 :             // WAL redo requires records in the reverse LSN order
     965         1027 :             records.reverse();
     966         1027 :             let data = ValueReconstructState {
     967         1027 :                 img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
     968         1027 :                 records,
     969         1027 :             };
     970         1027 : 
     971         1027 :             tline
     972         1027 :                 .reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
     973         1027 :                 .await
     974         1027 :                 .with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
     975              : 
     976         1027 :             Ok(())
     977         1027 :         }
     978              : 
     979         1036 :         for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
     980         1096 :             for (lsn, val) in logs {
     981           76 :                 match val {
     982          307 :                     Value::Image(img) => {
     983          307 :                         base_img = Some((*lsn, img));
     984          307 :                         history.clear();
     985          307 :                     }
     986           76 :                     Value::WalRecord(rec) if val.will_init() => {
     987            0 :                         base_img = None;
     988            0 :                         history.clear();
     989            0 :                         history.push((*lsn, rec));
     990            0 :                     }
     991           76 :                     Value::WalRecord(rec) => {
     992           76 :                         history.push((*lsn, rec));
     993           76 :                     }
     994              :                 }
     995              :             }
     996          713 :             if *retain_lsn >= min_lsn {
     997              :                 // Only verify after the key appears in the full history for the first time.
     998              : 
     999              :                 // We don't modify history: in theory, we could replace the history with a single
    1000              :                 // image as in `generate_key_retention` to make redos at later LSNs faster. But we
    1001              :                 // want to verify everything as if they are read from the real layer map.
    1002          699 :                 collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
    1003          699 :                     .await
    1004          699 :                     .context("below horizon retain_lsn")?;
    1005           14 :             }
    1006              :         }
    1007              : 
    1008          360 :         for (lsn, val) in &self.above_horizon.0 {
    1009           32 :             match val {
    1010            5 :                 Value::Image(img) => {
    1011            5 :                     // Above the GC horizon, we verify every time we see an image.
    1012            5 :                     collect_and_verify(key, *lsn, &base_img, &history, tline, true)
    1013            5 :                         .await
    1014            5 :                         .context("above horizon full image")?;
    1015            5 :                     base_img = Some((*lsn, img));
    1016            5 :                     history.clear();
    1017              :                 }
    1018           32 :                 Value::WalRecord(rec) if val.will_init() => {
    1019            0 :                     // Above the GC horizon, we verify every time we see an init record.
    1020            0 :                     collect_and_verify(key, *lsn, &base_img, &history, tline, true)
    1021            0 :                         .await
    1022            0 :                         .context("above horizon init record")?;
    1023            0 :                     base_img = None;
    1024            0 :                     history.clear();
    1025            0 :                     history.push((*lsn, rec));
    1026              :                 }
    1027           32 :                 Value::WalRecord(rec) => {
    1028           32 :                     history.push((*lsn, rec));
    1029           32 :                 }
    1030              :             }
    1031              :         }
    1032              :         // Ensure the latest record is readable.
    1033          323 :         collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
    1034          323 :             .await
    1035          323 :             .context("latest record")?;
    1036          323 :         Ok(())
    1037          323 :     }
    1038              : }
    1039              : 
    1040              : #[derive(Debug, Serialize, Default)]
    1041              : struct CompactionStatisticsNumSize {
    1042              :     num: u64,
    1043              :     size: u64,
    1044              : }
    1045              : 
    1046              : #[derive(Debug, Serialize, Default)]
    1047              : pub struct CompactionStatistics {
    1048              :     /// Delta layer visited (maybe compressed, physical size)
    1049              :     delta_layer_visited: CompactionStatisticsNumSize,
    1050              :     /// Image layer visited (maybe compressed, physical size)
    1051              :     image_layer_visited: CompactionStatisticsNumSize,
    1052              :     /// Delta layer produced (maybe compressed, physical size)
    1053              :     delta_layer_produced: CompactionStatisticsNumSize,
    1054              :     /// Image layer produced (maybe compressed, physical size)
    1055              :     image_layer_produced: CompactionStatisticsNumSize,
    1056              :     /// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
    1057              :     delta_layer_discarded: CompactionStatisticsNumSize,
    1058              :     /// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
    1059              :     image_layer_discarded: CompactionStatisticsNumSize,
    1060              :     num_unique_keys_visited: usize,
    1061              :     /// Delta visited (uncompressed, original size)
    1062              :     wal_keys_visited: CompactionStatisticsNumSize,
    1063              :     /// Image visited (uncompressed, original size)
    1064              :     image_keys_visited: CompactionStatisticsNumSize,
    1065              :     /// Delta produced (uncompressed, original size)
    1066              :     wal_produced: CompactionStatisticsNumSize,
    1067              :     /// Image produced (uncompressed, original size)
    1068              :     image_produced: CompactionStatisticsNumSize,
    1069              : 
    1070              :     // Time spent in each phase
    1071              :     time_acquire_lock_secs: f64,
    1072              :     time_analyze_secs: f64,
    1073              :     time_download_layer_secs: f64,
    1074              :     time_to_first_kv_pair_secs: f64,
    1075              :     time_main_loop_secs: f64,
    1076              :     time_final_phase_secs: f64,
    1077              :     time_total_secs: f64,
    1078              : 
    1079              :     // Summary
    1080              :     /// Ratio of the key-value size after/before gc-compaction.
    1081              :     uncompressed_retention_ratio: f64,
    1082              :     /// Ratio of the physical size after/before gc-compaction.
    1083              :     compressed_retention_ratio: f64,
    1084              : }
    1085              : 
    1086              : impl CompactionStatistics {
    1087          534 :     fn estimated_size_of_value(val: &Value) -> usize {
    1088          219 :         match val {
    1089          315 :             Value::Image(img) => img.len(),
    1090            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
    1091          219 :             _ => std::mem::size_of::<NeonWalRecord>(),
    1092              :         }
    1093          534 :     }
    1094          839 :     fn estimated_size_of_key() -> usize {
    1095          839 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
    1096          839 :     }
    1097           44 :     fn visit_delta_layer(&mut self, size: u64) {
    1098           44 :         self.delta_layer_visited.num += 1;
    1099           44 :         self.delta_layer_visited.size += size;
    1100           44 :     }
    1101           35 :     fn visit_image_layer(&mut self, size: u64) {
    1102           35 :         self.image_layer_visited.num += 1;
    1103           35 :         self.image_layer_visited.size += size;
    1104           35 :     }
    1105          320 :     fn on_unique_key_visited(&mut self) {
    1106          320 :         self.num_unique_keys_visited += 1;
    1107          320 :     }
    1108          123 :     fn visit_wal_key(&mut self, val: &Value) {
    1109          123 :         self.wal_keys_visited.num += 1;
    1110          123 :         self.wal_keys_visited.size +=
    1111          123 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
    1112          123 :     }
    1113          315 :     fn visit_image_key(&mut self, val: &Value) {
    1114          315 :         self.image_keys_visited.num += 1;
    1115          315 :         self.image_keys_visited.size +=
    1116          315 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
    1117          315 :     }
    1118          101 :     fn produce_key(&mut self, val: &Value) {
    1119          101 :         match val {
    1120            5 :             Value::Image(img) => self.produce_image_key(img),
    1121           96 :             Value::WalRecord(_) => self.produce_wal_key(val),
    1122              :         }
    1123          101 :     }
    1124           96 :     fn produce_wal_key(&mut self, val: &Value) {
    1125           96 :         self.wal_produced.num += 1;
    1126           96 :         self.wal_produced.size +=
    1127           96 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
    1128           96 :     }
    1129          305 :     fn produce_image_key(&mut self, val: &Bytes) {
    1130          305 :         self.image_produced.num += 1;
    1131          305 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
    1132          305 :     }
    1133            7 :     fn discard_delta_layer(&mut self, original_size: u64) {
    1134            7 :         self.delta_layer_discarded.num += 1;
    1135            7 :         self.delta_layer_discarded.size += original_size;
    1136            7 :     }
    1137            4 :     fn discard_image_layer(&mut self, original_size: u64) {
    1138            4 :         self.image_layer_discarded.num += 1;
    1139            4 :         self.image_layer_discarded.size += original_size;
    1140            4 :     }
    1141           12 :     fn produce_delta_layer(&mut self, size: u64) {
    1142           12 :         self.delta_layer_produced.num += 1;
    1143           12 :         self.delta_layer_produced.size += size;
    1144           12 :     }
    1145           15 :     fn produce_image_layer(&mut self, size: u64) {
    1146           15 :         self.image_layer_produced.num += 1;
    1147           15 :         self.image_layer_produced.size += size;
    1148           15 :     }
    1149           26 :     fn finalize(&mut self) {
    1150           26 :         let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
    1151           26 :         let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
    1152           26 :         self.uncompressed_retention_ratio =
    1153           26 :             produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
    1154           26 :         let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
    1155           26 :         let produced_physical_size = self.image_layer_produced.size
    1156           26 :             + self.delta_layer_produced.size
    1157           26 :             + self.image_layer_discarded.size
    1158           26 :             + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
    1159           26 :         self.compressed_retention_ratio =
    1160           26 :             produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
    1161           26 :     }
    1162              : }
    1163              : 
    1164              : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
    1165              : pub enum CompactionOutcome {
    1166              :     #[default]
    1167              :     /// No layers need to be compacted after this round. Compaction doesn't need
    1168              :     /// to be immediately scheduled.
    1169              :     Done,
    1170              :     /// Still has pending layers to be compacted after this round. Ideally, the scheduler
    1171              :     /// should immediately schedule another compaction.
    1172              :     Pending,
    1173              :     /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
    1174              :     /// guaranteed when `compaction_l0_first` is enabled).
    1175              :     YieldForL0,
    1176              :     /// Compaction was skipped, because the timeline is ineligible for compaction.
    1177              :     Skipped,
    1178              : }
    1179              : 
    1180              : impl Timeline {
    1181              :     /// TODO: cancellation
    1182              :     ///
    1183              :     /// Returns whether the compaction has pending tasks.
    1184          182 :     pub(crate) async fn compact_legacy(
    1185          182 :         self: &Arc<Self>,
    1186          182 :         cancel: &CancellationToken,
    1187          182 :         options: CompactOptions,
    1188          182 :         ctx: &RequestContext,
    1189          182 :     ) -> Result<CompactionOutcome, CompactionError> {
    1190          182 :         if options
    1191          182 :             .flags
    1192          182 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
    1193              :         {
    1194            0 :             self.compact_with_gc(cancel, options, ctx).await?;
    1195            0 :             return Ok(CompactionOutcome::Done);
    1196          182 :         }
    1197          182 : 
    1198          182 :         if options.flags.contains(CompactFlags::DryRun) {
    1199            0 :             return Err(CompactionError::Other(anyhow!(
    1200            0 :                 "dry-run mode is not supported for legacy compaction for now"
    1201            0 :             )));
    1202          182 :         }
    1203          182 : 
    1204          182 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
    1205              :             // maybe useful in the future? could implement this at some point
    1206            0 :             return Err(CompactionError::Other(anyhow!(
    1207            0 :                 "compaction range is not supported for legacy compaction for now"
    1208            0 :             )));
    1209          182 :         }
    1210          182 : 
    1211          182 :         // High level strategy for compaction / image creation:
    1212          182 :         //
    1213          182 :         // 1. First, do a L0 compaction to ensure we move the L0
    1214          182 :         // layers into the historic layer map get flat levels of
    1215          182 :         // layers. If we did not compact all L0 layers, we will
    1216          182 :         // prioritize compacting the timeline again and not do
    1217          182 :         // any of the compactions below.
    1218          182 :         //
    1219          182 :         // 2. Then, calculate the desired "partitioning" of the
    1220          182 :         // currently in-use key space. The goal is to partition the
    1221          182 :         // key space into roughly fixed-size chunks, but also take into
    1222          182 :         // account any existing image layers, and try to align the
    1223          182 :         // chunk boundaries with the existing image layers to avoid
    1224          182 :         // too much churn. Also try to align chunk boundaries with
    1225          182 :         // relation boundaries.  In principle, we don't know about
    1226          182 :         // relation boundaries here, we just deal with key-value
    1227          182 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
    1228          182 :         // map relations into key-value pairs. But in practice we know
    1229          182 :         // that 'field6' is the block number, and the fields 1-5
    1230          182 :         // identify a relation. This is just an optimization,
    1231          182 :         // though.
    1232          182 :         //
    1233          182 :         // 3. Once we know the partitioning, for each partition,
    1234          182 :         // decide if it's time to create a new image layer. The
    1235          182 :         // criteria is: there has been too much "churn" since the last
    1236          182 :         // image layer? The "churn" is fuzzy concept, it's a
    1237          182 :         // combination of too many delta files, or too much WAL in
    1238          182 :         // total in the delta file. Or perhaps: if creating an image
    1239          182 :         // file would allow to delete some older files.
    1240          182 :         //
    1241          182 :         // 4. In the end, if the tenant gets auto-sharded, we will run
    1242          182 :         // a shard-ancestor compaction.
    1243          182 : 
    1244          182 :         // Is the timeline being deleted?
    1245          182 :         if self.is_stopping() {
    1246            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1247            0 :             return Err(CompactionError::ShuttingDown);
    1248          182 :         }
    1249          182 : 
    1250          182 :         let target_file_size = self.get_checkpoint_distance();
    1251              : 
    1252              :         // Define partitioning schema if needed
    1253              : 
    1254              :         // 1. L0 Compact
    1255          182 :         let l0_outcome = {
    1256          182 :             let timer = self.metrics.compact_time_histo.start_timer();
    1257          182 :             let l0_outcome = self
    1258          182 :                 .compact_level0(
    1259          182 :                     target_file_size,
    1260          182 :                     options.flags.contains(CompactFlags::ForceL0Compaction),
    1261          182 :                     ctx,
    1262          182 :                 )
    1263          182 :                 .await?;
    1264          182 :             timer.stop_and_record();
    1265          182 :             l0_outcome
    1266          182 :         };
    1267          182 : 
    1268          182 :         if options.flags.contains(CompactFlags::OnlyL0Compaction) {
    1269            0 :             return Ok(l0_outcome);
    1270          182 :         }
    1271          182 : 
    1272          182 :         // Yield if we have pending L0 compaction. The scheduler will do another pass.
    1273          182 :         if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
    1274            0 :             && options.flags.contains(CompactFlags::YieldForL0)
    1275              :         {
    1276            0 :             info!("image/ancestor compaction yielding for L0 compaction");
    1277            0 :             return Ok(CompactionOutcome::YieldForL0);
    1278          182 :         }
    1279          182 : 
    1280          182 :         let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
    1281          182 : 
    1282          182 :         // 2. Repartition and create image layers if necessary
    1283          182 :         match self
    1284          182 :             .repartition(
    1285          182 :                 self.get_last_record_lsn(),
    1286          182 :                 self.get_compaction_target_size(),
    1287          182 :                 options.flags,
    1288          182 :                 ctx,
    1289          182 :             )
    1290          182 :             .await
    1291              :         {
    1292          182 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
    1293          182 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
    1294          182 :                 let image_ctx = RequestContextBuilder::from(ctx)
    1295          182 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
    1296          182 :                     .attached_child();
    1297          182 : 
    1298          182 :                 let mut partitioning = dense_partitioning;
    1299          182 :                 partitioning
    1300          182 :                     .parts
    1301          182 :                     .extend(sparse_partitioning.into_dense().parts);
    1302              : 
    1303              :                 // 3. Create new image layers for partitions that have been modified "enough".
    1304          182 :                 let (image_layers, outcome) = self
    1305          182 :                     .create_image_layers(
    1306          182 :                         &partitioning,
    1307          182 :                         lsn,
    1308          182 :                         if options
    1309          182 :                             .flags
    1310          182 :                             .contains(CompactFlags::ForceImageLayerCreation)
    1311              :                         {
    1312            7 :                             ImageLayerCreationMode::Force
    1313              :                         } else {
    1314          175 :                             ImageLayerCreationMode::Try
    1315              :                         },
    1316          182 :                         &image_ctx,
    1317          182 :                         self.last_image_layer_creation_status
    1318          182 :                             .load()
    1319          182 :                             .as_ref()
    1320          182 :                             .clone(),
    1321          182 :                         options.flags.contains(CompactFlags::YieldForL0),
    1322          182 :                     )
    1323          182 :                     .await
    1324          182 :                     .inspect_err(|err| {
    1325              :                         if let CreateImageLayersError::GetVectoredError(
    1326              :                             GetVectoredError::MissingKey(_),
    1327            0 :                         ) = err
    1328              :                         {
    1329            0 :                             critical!("missing key during compaction: {err:?}");
    1330            0 :                         }
    1331          182 :                     })?;
    1332              : 
    1333          182 :                 self.last_image_layer_creation_status
    1334          182 :                     .store(Arc::new(outcome.clone()));
    1335          182 : 
    1336          182 :                 self.upload_new_image_layers(image_layers)?;
    1337          182 :                 if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
    1338              :                     // Yield and do not do any other kind of compaction.
    1339            0 :                     info!(
    1340            0 :                         "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
    1341              :                     );
    1342            0 :                     return Ok(CompactionOutcome::YieldForL0);
    1343          182 :                 }
    1344              :             }
    1345              : 
    1346              :             Ok(_) => {
    1347            0 :                 info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
    1348              :             }
    1349              : 
    1350              :             // Suppress errors when cancelled.
    1351            0 :             Err(_) if self.cancel.is_cancelled() => {}
    1352            0 :             Err(err) if err.is_cancel() => {}
    1353              : 
    1354              :             // Alert on critical errors that indicate data corruption.
    1355            0 :             Err(err) if err.is_critical() => {
    1356            0 :                 critical!("could not compact, repartitioning keyspace failed: {err:?}");
    1357              :             }
    1358              : 
    1359              :             // Log other errors. No partitioning? This is normal, if the timeline was just created
    1360              :             // as an empty timeline. Also in unit tests, when we use the timeline as a simple
    1361              :             // key-value store, ignoring the datadir layout. Log the error but continue.
    1362            0 :             Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
    1363              :         };
    1364              : 
    1365          182 :         let partition_count = self.partitioning.read().0.0.parts.len();
    1366          182 : 
    1367          182 :         // 4. Shard ancestor compaction
    1368          182 :         if self.get_compaction_shard_ancestor() && self.shard_identity.count >= ShardCount::new(2) {
    1369              :             // Limit the number of layer rewrites to the number of partitions: this means its
    1370              :             // runtime should be comparable to a full round of image layer creations, rather than
    1371              :             // being potentially much longer.
    1372            0 :             let rewrite_max = partition_count;
    1373              : 
    1374            0 :             let outcome = self
    1375            0 :                 .compact_shard_ancestors(
    1376            0 :                     rewrite_max,
    1377            0 :                     options.flags.contains(CompactFlags::YieldForL0),
    1378            0 :                     ctx,
    1379            0 :                 )
    1380            0 :                 .await?;
    1381            0 :             match outcome {
    1382            0 :                 CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
    1383            0 :                 CompactionOutcome::Done | CompactionOutcome::Skipped => {}
    1384              :             }
    1385          182 :         }
    1386              : 
    1387          182 :         Ok(CompactionOutcome::Done)
    1388          182 :     }
    1389              : 
    1390              :     /// Check for layers that are elegible to be rewritten:
    1391              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
    1392              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
    1393              :     /// - For future use: layers beyond pitr_interval that are in formats we would
    1394              :     ///   rather not maintain compatibility with indefinitely.
    1395              :     ///
    1396              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
    1397              :     /// how much work it will try to do in each compaction pass.
    1398            0 :     async fn compact_shard_ancestors(
    1399            0 :         self: &Arc<Self>,
    1400            0 :         rewrite_max: usize,
    1401            0 :         yield_for_l0: bool,
    1402            0 :         ctx: &RequestContext,
    1403            0 :     ) -> Result<CompactionOutcome, CompactionError> {
    1404            0 :         let mut outcome = CompactionOutcome::Done;
    1405            0 :         let mut drop_layers = Vec::new();
    1406            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
    1407            0 : 
    1408            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
    1409            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
    1410            0 :         // pitr_interval, for example because a branchpoint references it.
    1411            0 :         //
    1412            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
    1413            0 :         // are rewriting layers.
    1414            0 :         let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
    1415            0 :         let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
    1416              : 
    1417            0 :         let layers = self.layers.read().await;
    1418            0 :         let layers_iter = layers.layer_map()?.iter_historic_layers();
    1419            0 :         let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
    1420            0 :         for layer_desc in layers_iter {
    1421            0 :             layers_checked += 1;
    1422            0 :             let layer = layers.get_from_desc(&layer_desc);
    1423            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
    1424              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
    1425            0 :                 continue;
    1426            0 :             }
    1427            0 : 
    1428            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
    1429            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
    1430            0 :             let layer_local_page_count = sharded_range.page_count();
    1431            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
    1432            0 :             if layer_local_page_count == 0 {
    1433              :                 // This ancestral layer only covers keys that belong to other shards.
    1434              :                 // We include the full metadata in the log: if we had some critical bug that caused
    1435              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
    1436            0 :                 debug!(%layer, old_metadata=?layer.metadata(),
    1437            0 :                     "dropping layer after shard split, contains no keys for this shard",
    1438              :                 );
    1439              : 
    1440            0 :                 if cfg!(debug_assertions) {
    1441              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
    1442              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
    1443              :                     // should be !is_key_disposable()
    1444              :                     // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
    1445            0 :                     let range = layer_desc.get_key_range();
    1446            0 :                     let mut key = range.start;
    1447            0 :                     while key < range.end {
    1448            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
    1449            0 :                         key = key.next();
    1450              :                     }
    1451            0 :                 }
    1452              : 
    1453            0 :                 drop_layers.push(layer);
    1454            0 :                 continue;
    1455            0 :             } else if layer_local_page_count != u32::MAX
    1456            0 :                 && layer_local_page_count == layer_raw_page_count
    1457              :             {
    1458            0 :                 debug!(%layer,
    1459            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
    1460              :                     layer_local_page_count
    1461              :                 );
    1462            0 :                 continue;
    1463            0 :             }
    1464            0 : 
    1465            0 :             // Only rewrite a layer if we can reclaim significant space.
    1466            0 :             if layer_local_page_count != u32::MAX
    1467            0 :                 && layer_local_page_count as f64 / layer_raw_page_count as f64
    1468            0 :                     <= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
    1469              :             {
    1470            0 :                 debug!(%layer,
    1471            0 :                     "layer has a large share of local pages \
    1472            0 :                         ({layer_local_page_count}/{layer_raw_page_count} > \
    1473            0 :                         {ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
    1474              :                 );
    1475            0 :             }
    1476              : 
    1477              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
    1478              :             // without incurring the I/O cost of a rewrite.
    1479            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
    1480            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
    1481            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
    1482            0 :                 continue;
    1483            0 :             }
    1484            0 : 
    1485            0 :             // We do not yet implement rewrite of delta layers.
    1486            0 :             if layer_desc.is_delta() {
    1487            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
    1488            0 :                 continue;
    1489            0 :             }
    1490            0 : 
    1491            0 :             // We don't bother rewriting layers that aren't visible, since these won't be needed by
    1492            0 :             // reads and will likely be garbage collected soon.
    1493            0 :             if layer.visibility() != LayerVisibilityHint::Visible {
    1494            0 :                 debug!(%layer, "Skipping rewrite of invisible layer");
    1495            0 :                 continue;
    1496            0 :             }
    1497            0 : 
    1498            0 :             // Only rewrite layers if their generations differ.  This guarantees:
    1499            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
    1500            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
    1501            0 :             if layer.metadata().generation == self.generation {
    1502            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
    1503            0 :                 continue;
    1504            0 :             }
    1505            0 : 
    1506            0 :             if layers_to_rewrite.len() >= rewrite_max {
    1507            0 :                 debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
    1508            0 :                     layers_to_rewrite.len()
    1509              :                 );
    1510            0 :                 outcome = CompactionOutcome::Pending;
    1511            0 :                 break;
    1512            0 :             }
    1513            0 : 
    1514            0 :             // Fall through: all our conditions for doing a rewrite passed.
    1515            0 :             layers_to_rewrite.push(layer);
    1516              :         }
    1517              : 
    1518              :         // Drop read lock on layer map before we start doing time-consuming I/O.
    1519            0 :         drop(layers);
    1520            0 : 
    1521            0 :         // Drop out early if there's nothing to do.
    1522            0 :         if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
    1523            0 :             return Ok(CompactionOutcome::Done);
    1524            0 :         }
    1525            0 : 
    1526            0 :         info!(
    1527            0 :             "starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
    1528            0 :                 checked {layers_checked}/{layers_total} layers \
    1529            0 :                 (latest_gc_cutoff={} pitr_cutoff={:?})",
    1530            0 :             layers_to_rewrite.len(),
    1531            0 :             drop_layers.len(),
    1532            0 :             *latest_gc_cutoff,
    1533              :             pitr_cutoff,
    1534              :         );
    1535            0 :         let started = Instant::now();
    1536            0 : 
    1537            0 :         let mut replace_image_layers = Vec::new();
    1538              : 
    1539            0 :         for layer in layers_to_rewrite {
    1540            0 :             if self.cancel.is_cancelled() {
    1541            0 :                 return Err(CompactionError::ShuttingDown);
    1542            0 :             }
    1543            0 : 
    1544            0 :             info!(layer=%layer, "rewriting layer after shard split");
    1545            0 :             let mut image_layer_writer = ImageLayerWriter::new(
    1546            0 :                 self.conf,
    1547            0 :                 self.timeline_id,
    1548            0 :                 self.tenant_shard_id,
    1549            0 :                 &layer.layer_desc().key_range,
    1550            0 :                 layer.layer_desc().image_layer_lsn(),
    1551            0 :                 &self.gate,
    1552            0 :                 self.cancel.clone(),
    1553            0 :                 ctx,
    1554            0 :             )
    1555            0 :             .await
    1556            0 :             .map_err(CompactionError::Other)?;
    1557              : 
    1558              :             // Safety of layer rewrites:
    1559              :             // - We are writing to a different local file path than we are reading from, so the old Layer
    1560              :             //   cannot interfere with the new one.
    1561              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
    1562              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
    1563              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
    1564              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
    1565              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
    1566              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
    1567              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
    1568              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
    1569              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
    1570              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
    1571            0 :             let resident = layer.download_and_keep_resident(ctx).await?;
    1572              : 
    1573            0 :             let keys_written = resident
    1574            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
    1575            0 :                 .await?;
    1576              : 
    1577            0 :             if keys_written > 0 {
    1578            0 :                 let (desc, path) = image_layer_writer
    1579            0 :                     .finish(ctx)
    1580            0 :                     .await
    1581            0 :                     .map_err(CompactionError::Other)?;
    1582            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
    1583            0 :                     .map_err(CompactionError::Other)?;
    1584            0 :                 info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
    1585            0 :                     layer.metadata().file_size,
    1586            0 :                     new_layer.metadata().file_size);
    1587              : 
    1588            0 :                 replace_image_layers.push((layer, new_layer));
    1589            0 :             } else {
    1590            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
    1591            0 :                 // the layer has no data for us with the ShardedRange check above, but
    1592            0 :                 drop_layers.push(layer);
    1593            0 :             }
    1594              : 
    1595              :             // Yield for L0 compaction if necessary, but make sure we update the layer map below
    1596              :             // with the work we've already done.
    1597            0 :             if yield_for_l0
    1598            0 :                 && self
    1599            0 :                     .l0_compaction_trigger
    1600            0 :                     .notified()
    1601            0 :                     .now_or_never()
    1602            0 :                     .is_some()
    1603              :             {
    1604            0 :                 info!("shard ancestor compaction yielding for L0 compaction");
    1605            0 :                 outcome = CompactionOutcome::YieldForL0;
    1606            0 :                 break;
    1607            0 :             }
    1608              :         }
    1609              : 
    1610            0 :         for layer in &drop_layers {
    1611            0 :             info!(%layer, old_metadata=?layer.metadata(),
    1612            0 :                 "dropping layer after shard split (no keys for this shard)",
    1613              :             );
    1614              :         }
    1615              : 
    1616              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
    1617              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
    1618              :         // to remote index) and be removed. This is inefficient but safe.
    1619            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
    1620            0 : 
    1621            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
    1622            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
    1623            0 :             .await?;
    1624              : 
    1625            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
    1626            0 : 
    1627            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
    1628            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
    1629            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
    1630            0 :         // load.
    1631            0 :         if outcome != CompactionOutcome::YieldForL0 {
    1632            0 :             info!("shard ancestor compaction waiting for uploads");
    1633            0 :             tokio::select! {
    1634            0 :                 result = self.remote_client.wait_completion() => match result {
    1635            0 :                     Ok(()) => {},
    1636            0 :                     Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
    1637              :                     Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
    1638            0 :                         return Err(CompactionError::ShuttingDown);
    1639              :                     }
    1640              :                 },
    1641              :                 // Don't wait if there's L0 compaction to do. We don't need to update the outcome
    1642              :                 // here, because we've already done the actual work.
    1643            0 :                 _ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
    1644              :             }
    1645            0 :         }
    1646              : 
    1647            0 :         info!(
    1648            0 :             "shard ancestor compaction done in {:.3}s{}",
    1649            0 :             started.elapsed().as_secs_f64(),
    1650            0 :             match outcome {
    1651              :                 CompactionOutcome::Pending =>
    1652            0 :                     format!(", with pending work (rewrite_max={rewrite_max})"),
    1653            0 :                 CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
    1654            0 :                 CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
    1655              :             }
    1656              :         );
    1657              : 
    1658            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
    1659            0 : 
    1660            0 :         Ok(outcome)
    1661            0 :     }
    1662              : 
    1663              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
    1664              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
    1665              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
    1666              :     ///
    1667              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
    1668              :     /// that we know won't be needed for reads.
    1669          121 :     pub(crate) async fn update_layer_visibility(
    1670          121 :         &self,
    1671          121 :     ) -> Result<(), super::layer_manager::Shutdown> {
    1672          121 :         let head_lsn = self.get_last_record_lsn();
    1673              : 
    1674              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
    1675              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
    1676              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
    1677              :         // they will be subject to L0->L1 compaction in the near future.
    1678          121 :         let layer_manager = self.layers.read().await;
    1679          121 :         let layer_map = layer_manager.layer_map()?;
    1680              : 
    1681          121 :         let readable_points = {
    1682          121 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
    1683          121 : 
    1684          121 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
    1685          121 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
    1686            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
    1687            0 :                     continue;
    1688            0 :                 }
    1689            0 :                 readable_points.push(*child_lsn);
    1690              :             }
    1691          121 :             readable_points.push(head_lsn);
    1692          121 :             readable_points
    1693          121 :         };
    1694          121 : 
    1695          121 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
    1696          304 :         for (layer_desc, visibility) in layer_visibility {
    1697          183 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
    1698          183 :             let layer = layer_manager.get_from_desc(&layer_desc);
    1699          183 :             layer.set_visibility(visibility);
    1700          183 :         }
    1701              : 
    1702              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
    1703              :         // avoid assuming that everything at a branch point is visible.
    1704          121 :         drop(covered);
    1705          121 :         Ok(())
    1706          121 :     }
    1707              : 
    1708              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    1709              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
    1710          182 :     async fn compact_level0(
    1711          182 :         self: &Arc<Self>,
    1712          182 :         target_file_size: u64,
    1713          182 :         force_compaction_ignore_threshold: bool,
    1714          182 :         ctx: &RequestContext,
    1715          182 :     ) -> Result<CompactionOutcome, CompactionError> {
    1716              :         let CompactLevel0Phase1Result {
    1717          182 :             new_layers,
    1718          182 :             deltas_to_compact,
    1719          182 :             outcome,
    1720              :         } = {
    1721          182 :             let phase1_span = info_span!("compact_level0_phase1");
    1722          182 :             let ctx = ctx.attached_child();
    1723          182 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    1724          182 :                 version: Some(2),
    1725          182 :                 tenant_id: Some(self.tenant_shard_id),
    1726          182 :                 timeline_id: Some(self.timeline_id),
    1727          182 :                 ..Default::default()
    1728          182 :             };
    1729          182 : 
    1730          182 :             let begin = tokio::time::Instant::now();
    1731          182 :             let phase1_layers_locked = self.layers.read().await;
    1732          182 :             let now = tokio::time::Instant::now();
    1733          182 :             stats.read_lock_acquisition_micros =
    1734          182 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    1735          182 :             self.compact_level0_phase1(
    1736          182 :                 phase1_layers_locked,
    1737          182 :                 stats,
    1738          182 :                 target_file_size,
    1739          182 :                 force_compaction_ignore_threshold,
    1740          182 :                 &ctx,
    1741          182 :             )
    1742          182 :             .instrument(phase1_span)
    1743          182 :             .await?
    1744              :         };
    1745              : 
    1746          182 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    1747              :             // nothing to do
    1748          168 :             return Ok(CompactionOutcome::Done);
    1749           14 :         }
    1750           14 : 
    1751           14 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    1752           14 :             .await?;
    1753           14 :         Ok(outcome)
    1754          182 :     }
    1755              : 
    1756              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    1757          182 :     async fn compact_level0_phase1<'a>(
    1758          182 :         self: &'a Arc<Self>,
    1759          182 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
    1760          182 :         mut stats: CompactLevel0Phase1StatsBuilder,
    1761          182 :         target_file_size: u64,
    1762          182 :         force_compaction_ignore_threshold: bool,
    1763          182 :         ctx: &RequestContext,
    1764          182 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    1765          182 :         stats.read_lock_held_spawn_blocking_startup_micros =
    1766          182 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    1767          182 :         let layers = guard.layer_map()?;
    1768          182 :         let level0_deltas = layers.level0_deltas();
    1769          182 :         stats.level0_deltas_count = Some(level0_deltas.len());
    1770          182 : 
    1771          182 :         // Only compact if enough layers have accumulated.
    1772          182 :         let threshold = self.get_compaction_threshold();
    1773          182 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    1774          168 :             if force_compaction_ignore_threshold {
    1775            0 :                 if !level0_deltas.is_empty() {
    1776            0 :                     info!(
    1777            0 :                         level0_deltas = level0_deltas.len(),
    1778            0 :                         threshold, "too few deltas to compact, but forcing compaction"
    1779              :                     );
    1780              :                 } else {
    1781            0 :                     info!(
    1782            0 :                         level0_deltas = level0_deltas.len(),
    1783            0 :                         threshold, "too few deltas to compact, cannot force compaction"
    1784              :                     );
    1785            0 :                     return Ok(CompactLevel0Phase1Result::default());
    1786              :                 }
    1787              :             } else {
    1788          168 :                 debug!(
    1789            0 :                     level0_deltas = level0_deltas.len(),
    1790            0 :                     threshold, "too few deltas to compact"
    1791              :                 );
    1792          168 :                 return Ok(CompactLevel0Phase1Result::default());
    1793              :             }
    1794           14 :         }
    1795              : 
    1796           14 :         let mut level0_deltas = level0_deltas
    1797           14 :             .iter()
    1798          201 :             .map(|x| guard.get_from_desc(x))
    1799           14 :             .collect::<Vec<_>>();
    1800           14 : 
    1801           14 :         // Gather the files to compact in this iteration.
    1802           14 :         //
    1803           14 :         // Start with the oldest Level 0 delta file, and collect any other
    1804           14 :         // level 0 files that form a contiguous sequence, such that the end
    1805           14 :         // LSN of previous file matches the start LSN of the next file.
    1806           14 :         //
    1807           14 :         // Note that if the files don't form such a sequence, we might
    1808           14 :         // "compact" just a single file. That's a bit pointless, but it allows
    1809           14 :         // us to get rid of the level 0 file, and compact the other files on
    1810           14 :         // the next iteration. This could probably made smarter, but such
    1811           14 :         // "gaps" in the sequence of level 0 files should only happen in case
    1812           14 :         // of a crash, partial download from cloud storage, or something like
    1813           14 :         // that, so it's not a big deal in practice.
    1814          374 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    1815           14 :         let mut level0_deltas_iter = level0_deltas.iter();
    1816           14 : 
    1817           14 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    1818           14 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    1819           14 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    1820           14 : 
    1821           14 :         // Accumulate the size of layers in `deltas_to_compact`
    1822           14 :         let mut deltas_to_compact_bytes = 0;
    1823           14 : 
    1824           14 :         // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
    1825           14 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
    1826           14 :         // work in this function to only operate on this much delta data at once.
    1827           14 :         //
    1828           14 :         // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
    1829           14 :         // the constraint is not respected, we use the larger of the two.
    1830           14 :         let delta_size_limit = std::cmp::max(
    1831           14 :             self.get_compaction_upper_limit(),
    1832           14 :             self.get_compaction_threshold(),
    1833           14 :         ) as u64
    1834           14 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
    1835           14 : 
    1836           14 :         let mut fully_compacted = true;
    1837           14 : 
    1838           14 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
    1839          201 :         for l in level0_deltas_iter {
    1840          187 :             let lsn_range = &l.layer_desc().lsn_range;
    1841          187 : 
    1842          187 :             if lsn_range.start != prev_lsn_end {
    1843            0 :                 break;
    1844          187 :             }
    1845          187 :             deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
    1846          187 :             deltas_to_compact_bytes += l.metadata().file_size;
    1847          187 :             prev_lsn_end = lsn_range.end;
    1848          187 : 
    1849          187 :             if deltas_to_compact_bytes >= delta_size_limit {
    1850            0 :                 info!(
    1851            0 :                     l0_deltas_selected = deltas_to_compact.len(),
    1852            0 :                     l0_deltas_total = level0_deltas.len(),
    1853            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
    1854              :                     delta_size_limit
    1855              :                 );
    1856            0 :                 fully_compacted = false;
    1857            0 : 
    1858            0 :                 // Proceed with compaction, but only a subset of L0s
    1859            0 :                 break;
    1860          187 :             }
    1861              :         }
    1862           14 :         let lsn_range = Range {
    1863           14 :             start: deltas_to_compact
    1864           14 :                 .first()
    1865           14 :                 .unwrap()
    1866           14 :                 .layer_desc()
    1867           14 :                 .lsn_range
    1868           14 :                 .start,
    1869           14 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    1870           14 :         };
    1871           14 : 
    1872           14 :         info!(
    1873            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    1874            0 :             lsn_range.start,
    1875            0 :             lsn_range.end,
    1876            0 :             deltas_to_compact.len(),
    1877            0 :             level0_deltas.len()
    1878              :         );
    1879              : 
    1880          201 :         for l in deltas_to_compact.iter() {
    1881          201 :             info!("compact includes {l}");
    1882              :         }
    1883              : 
    1884              :         // We don't need the original list of layers anymore. Drop it so that
    1885              :         // we don't accidentally use it later in the function.
    1886           14 :         drop(level0_deltas);
    1887           14 : 
    1888           14 :         stats.read_lock_held_prerequisites_micros = stats
    1889           14 :             .read_lock_held_spawn_blocking_startup_micros
    1890           14 :             .till_now();
    1891              : 
    1892              :         // TODO: replace with streaming k-merge
    1893           14 :         let all_keys = {
    1894           14 :             let mut all_keys = Vec::new();
    1895          201 :             for l in deltas_to_compact.iter() {
    1896          201 :                 if self.cancel.is_cancelled() {
    1897            0 :                     return Err(CompactionError::ShuttingDown);
    1898          201 :                 }
    1899          201 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1900          201 :                 let keys = delta
    1901          201 :                     .index_entries(ctx)
    1902          201 :                     .await
    1903          201 :                     .map_err(CompactionError::Other)?;
    1904          201 :                 all_keys.extend(keys);
    1905              :             }
    1906              :             // The current stdlib sorting implementation is designed in a way where it is
    1907              :             // particularly fast where the slice is made up of sorted sub-ranges.
    1908      2211908 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1909           14 :             all_keys
    1910           14 :         };
    1911           14 : 
    1912           14 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    1913              : 
    1914              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
    1915              :         //
    1916              :         // A hole is a key range for which this compaction doesn't have any WAL records.
    1917              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
    1918              :         // cover the hole, but actually don't contain any WAL records for that key range.
    1919              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
    1920              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
    1921              :         //
    1922              :         // The algorithm chooses holes as follows.
    1923              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
    1924              :         // - Filter: min threshold on range length
    1925              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
    1926              :         //
    1927              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
    1928              :         #[derive(PartialEq, Eq)]
    1929              :         struct Hole {
    1930              :             key_range: Range<Key>,
    1931              :             coverage_size: usize,
    1932              :         }
    1933           14 :         let holes: Vec<Hole> = {
    1934              :             use std::cmp::Ordering;
    1935              :             impl Ord for Hole {
    1936            0 :                 fn cmp(&self, other: &Self) -> Ordering {
    1937            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
    1938            0 :                 }
    1939              :             }
    1940              :             impl PartialOrd for Hole {
    1941            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    1942            0 :                     Some(self.cmp(other))
    1943            0 :                 }
    1944              :             }
    1945           14 :             let max_holes = deltas_to_compact.len();
    1946           14 :             let last_record_lsn = self.get_last_record_lsn();
    1947           14 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    1948           14 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
    1949           14 :             // min-heap (reserve space for one more element added before eviction)
    1950           14 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    1951           14 :             let mut prev: Option<Key> = None;
    1952              : 
    1953      1032019 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    1954      1032019 :                 if let Some(prev_key) = prev {
    1955              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
    1956              :                     // compaction is the gap between data key and metadata keys.
    1957      1032005 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
    1958            0 :                         && !Key::is_metadata_key(&prev_key)
    1959              :                     {
    1960            0 :                         let key_range = prev_key..next_key;
    1961            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
    1962            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
    1963            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    1964            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
    1965            0 :                         let coverage_size =
    1966            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
    1967            0 :                         if coverage_size >= min_hole_coverage_size {
    1968            0 :                             heap.push(Hole {
    1969            0 :                                 key_range,
    1970            0 :                                 coverage_size,
    1971            0 :                             });
    1972            0 :                             if heap.len() > max_holes {
    1973            0 :                                 heap.pop(); // remove smallest hole
    1974            0 :                             }
    1975            0 :                         }
    1976      1032005 :                     }
    1977           14 :                 }
    1978      1032019 :                 prev = Some(next_key.next());
    1979              :             }
    1980           14 :             let mut holes = heap.into_vec();
    1981           14 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1982           14 :             holes
    1983           14 :         };
    1984           14 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1985           14 :         drop_rlock(guard);
    1986           14 : 
    1987           14 :         if self.cancel.is_cancelled() {
    1988            0 :             return Err(CompactionError::ShuttingDown);
    1989           14 :         }
    1990           14 : 
    1991           14 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1992              : 
    1993              :         // This iterator walks through all key-value pairs from all the layers
    1994              :         // we're compacting, in key, LSN order.
    1995              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1996              :         // then the Value::Image is ordered before Value::WalRecord.
    1997           14 :         let mut all_values_iter = {
    1998           14 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1999          201 :             for l in deltas_to_compact.iter() {
    2000          201 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    2001          201 :                 deltas.push(l);
    2002              :             }
    2003           14 :             MergeIterator::create_with_options(
    2004           14 :                 &deltas,
    2005           14 :                 &[],
    2006           14 :                 ctx,
    2007           14 :                 1024 * 8192, /* 8 MiB buffer per layer iterator */
    2008           14 :                 1024,
    2009           14 :             )
    2010           14 :         };
    2011           14 : 
    2012           14 :         // This iterator walks through all keys and is needed to calculate size used by each key
    2013           14 :         let mut all_keys_iter = all_keys
    2014           14 :             .iter()
    2015      1032019 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    2016      1032005 :             .coalesce(|mut prev, cur| {
    2017      1032005 :                 // Coalesce keys that belong to the same key pair.
    2018      1032005 :                 // This ensures that compaction doesn't put them
    2019      1032005 :                 // into different layer files.
    2020      1032005 :                 // Still limit this by the target file size,
    2021      1032005 :                 // so that we keep the size of the files in
    2022      1032005 :                 // check.
    2023      1032005 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    2024        20019 :                     prev.2 += cur.2;
    2025        20019 :                     Ok(prev)
    2026              :                 } else {
    2027      1011986 :                     Err((prev, cur))
    2028              :                 }
    2029      1032005 :             });
    2030           14 : 
    2031           14 :         // Merge the contents of all the input delta layers into a new set
    2032           14 :         // of delta layers, based on the current partitioning.
    2033           14 :         //
    2034           14 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    2035           14 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    2036           14 :         // would be too large. In that case, we also split on the LSN dimension.
    2037           14 :         //
    2038           14 :         // LSN
    2039           14 :         //  ^
    2040           14 :         //  |
    2041           14 :         //  | +-----------+            +--+--+--+--+
    2042           14 :         //  | |           |            |  |  |  |  |
    2043           14 :         //  | +-----------+            |  |  |  |  |
    2044           14 :         //  | |           |            |  |  |  |  |
    2045           14 :         //  | +-----------+     ==>    |  |  |  |  |
    2046           14 :         //  | |           |            |  |  |  |  |
    2047           14 :         //  | +-----------+            |  |  |  |  |
    2048           14 :         //  | |           |            |  |  |  |  |
    2049           14 :         //  | +-----------+            +--+--+--+--+
    2050           14 :         //  |
    2051           14 :         //  +--------------> key
    2052           14 :         //
    2053           14 :         //
    2054           14 :         // If one key (X) has a lot of page versions:
    2055           14 :         //
    2056           14 :         // LSN
    2057           14 :         //  ^
    2058           14 :         //  |                                 (X)
    2059           14 :         //  | +-----------+            +--+--+--+--+
    2060           14 :         //  | |           |            |  |  |  |  |
    2061           14 :         //  | +-----------+            |  |  +--+  |
    2062           14 :         //  | |           |            |  |  |  |  |
    2063           14 :         //  | +-----------+     ==>    |  |  |  |  |
    2064           14 :         //  | |           |            |  |  +--+  |
    2065           14 :         //  | +-----------+            |  |  |  |  |
    2066           14 :         //  | |           |            |  |  |  |  |
    2067           14 :         //  | +-----------+            +--+--+--+--+
    2068           14 :         //  |
    2069           14 :         //  +--------------> key
    2070           14 :         // TODO: this actually divides the layers into fixed-size chunks, not
    2071           14 :         // based on the partitioning.
    2072           14 :         //
    2073           14 :         // TODO: we should also opportunistically materialize and
    2074           14 :         // garbage collect what we can.
    2075           14 :         let mut new_layers = Vec::new();
    2076           14 :         let mut prev_key: Option<Key> = None;
    2077           14 :         let mut writer: Option<DeltaLayerWriter> = None;
    2078           14 :         let mut key_values_total_size = 0u64;
    2079           14 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    2080           14 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    2081           14 :         let mut next_hole = 0; // index of next hole in holes vector
    2082           14 : 
    2083           14 :         let mut keys = 0;
    2084              : 
    2085      1032033 :         while let Some((key, lsn, value)) = all_values_iter
    2086      1032033 :             .next()
    2087      1032033 :             .await
    2088      1032033 :             .map_err(CompactionError::Other)?
    2089              :         {
    2090      1032019 :             keys += 1;
    2091      1032019 : 
    2092      1032019 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    2093              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    2094              :                 // shuffling an order of million keys per layer, this means we'll check it
    2095              :                 // around tens of times per layer.
    2096            0 :                 return Err(CompactionError::ShuttingDown);
    2097      1032019 :             }
    2098      1032019 : 
    2099      1032019 :             let same_key = prev_key == Some(key);
    2100      1032019 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    2101      1032019 :             if !same_key || lsn == dup_end_lsn {
    2102      1012000 :                 let mut next_key_size = 0u64;
    2103      1012000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    2104      1012000 :                 dup_start_lsn = Lsn::INVALID;
    2105      1012000 :                 if !same_key {
    2106      1012000 :                     dup_end_lsn = Lsn::INVALID;
    2107      1012000 :                 }
    2108              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    2109      1012000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    2110      1012000 :                     next_key_size = next_size;
    2111      1012000 :                     if key != next_key {
    2112      1011986 :                         if dup_end_lsn.is_valid() {
    2113            0 :                             // We are writting segment with duplicates:
    2114            0 :                             // place all remaining values of this key in separate segment
    2115            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    2116            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    2117      1011986 :                         }
    2118      1011986 :                         break;
    2119           14 :                     }
    2120           14 :                     key_values_total_size += next_size;
    2121           14 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    2122           14 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    2123           14 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    2124              :                         // Split key between multiple layers: such layer can contain only single key
    2125            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    2126            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    2127              :                         } else {
    2128            0 :                             lsn // start with the first LSN for this key
    2129              :                         };
    2130            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    2131            0 :                         break;
    2132           14 :                     }
    2133              :                 }
    2134              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    2135      1012000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    2136            0 :                     dup_start_lsn = dup_end_lsn;
    2137            0 :                     dup_end_lsn = lsn_range.end;
    2138      1012000 :                 }
    2139      1012000 :                 if writer.is_some() {
    2140      1011986 :                     let written_size = writer.as_mut().unwrap().size();
    2141      1011986 :                     let contains_hole =
    2142      1011986 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    2143              :                     // check if key cause layer overflow or contains hole...
    2144      1011986 :                     if is_dup_layer
    2145      1011986 :                         || dup_end_lsn.is_valid()
    2146      1011986 :                         || written_size + key_values_total_size > target_file_size
    2147      1011846 :                         || contains_hole
    2148              :                     {
    2149              :                         // ... if so, flush previous layer and prepare to write new one
    2150          140 :                         let (desc, path) = writer
    2151          140 :                             .take()
    2152          140 :                             .unwrap()
    2153          140 :                             .finish(prev_key.unwrap().next(), ctx)
    2154          140 :                             .await
    2155          140 :                             .map_err(CompactionError::Other)?;
    2156          140 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    2157          140 :                             .map_err(CompactionError::Other)?;
    2158              : 
    2159          140 :                         new_layers.push(new_delta);
    2160          140 :                         writer = None;
    2161          140 : 
    2162          140 :                         if contains_hole {
    2163            0 :                             // skip hole
    2164            0 :                             next_hole += 1;
    2165          140 :                         }
    2166      1011846 :                     }
    2167           14 :                 }
    2168              :                 // Remember size of key value because at next iteration we will access next item
    2169      1012000 :                 key_values_total_size = next_key_size;
    2170        20019 :             }
    2171      1032019 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2172            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    2173            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    2174            0 :                 )))
    2175      1032019 :             });
    2176              : 
    2177      1032019 :             if !self.shard_identity.is_key_disposable(&key) {
    2178      1032019 :                 if writer.is_none() {
    2179          154 :                     if self.cancel.is_cancelled() {
    2180              :                         // to be somewhat responsive to cancellation, check for each new layer
    2181            0 :                         return Err(CompactionError::ShuttingDown);
    2182          154 :                     }
    2183              :                     // Create writer if not initiaized yet
    2184          154 :                     writer = Some(
    2185              :                         DeltaLayerWriter::new(
    2186          154 :                             self.conf,
    2187          154 :                             self.timeline_id,
    2188          154 :                             self.tenant_shard_id,
    2189          154 :                             key,
    2190          154 :                             if dup_end_lsn.is_valid() {
    2191              :                                 // this is a layer containing slice of values of the same key
    2192            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    2193            0 :                                 dup_start_lsn..dup_end_lsn
    2194              :                             } else {
    2195          154 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2196          154 :                                 lsn_range.clone()
    2197              :                             },
    2198          154 :                             &self.gate,
    2199          154 :                             self.cancel.clone(),
    2200          154 :                             ctx,
    2201          154 :                         )
    2202          154 :                         .await
    2203          154 :                         .map_err(CompactionError::Other)?,
    2204              :                     );
    2205              : 
    2206          154 :                     keys = 0;
    2207      1031865 :                 }
    2208              : 
    2209      1032019 :                 writer
    2210      1032019 :                     .as_mut()
    2211      1032019 :                     .unwrap()
    2212      1032019 :                     .put_value(key, lsn, value, ctx)
    2213      1032019 :                     .await?;
    2214              :             } else {
    2215            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    2216            0 : 
    2217            0 :                 // This happens after a shard split, when we're compacting an L0 created by our parent shard
    2218            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    2219              :             }
    2220              : 
    2221      1032019 :             if !new_layers.is_empty() {
    2222         9893 :                 fail_point!("after-timeline-compacted-first-L1");
    2223      1022126 :             }
    2224              : 
    2225      1032019 :             prev_key = Some(key);
    2226              :         }
    2227           14 :         if let Some(writer) = writer {
    2228           14 :             let (desc, path) = writer
    2229           14 :                 .finish(prev_key.unwrap().next(), ctx)
    2230           14 :                 .await
    2231           14 :                 .map_err(CompactionError::Other)?;
    2232           14 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    2233           14 :                 .map_err(CompactionError::Other)?;
    2234           14 :             new_layers.push(new_delta);
    2235            0 :         }
    2236              : 
    2237              :         // Sync layers
    2238           14 :         if !new_layers.is_empty() {
    2239              :             // Print a warning if the created layer is larger than double the target size
    2240              :             // Add two pages for potential overhead. This should in theory be already
    2241              :             // accounted for in the target calculation, but for very small targets,
    2242              :             // we still might easily hit the limit otherwise.
    2243           14 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    2244          154 :             for layer in new_layers.iter() {
    2245          154 :                 if layer.layer_desc().file_size > warn_limit {
    2246            0 :                     warn!(
    2247              :                         %layer,
    2248            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    2249              :                     );
    2250          154 :                 }
    2251              :             }
    2252              : 
    2253              :             // The writer.finish() above already did the fsync of the inodes.
    2254              :             // We just need to fsync the directory in which these inodes are linked,
    2255              :             // which we know to be the timeline directory.
    2256              :             //
    2257              :             // We use fatal_err() below because the after writer.finish() returns with success,
    2258              :             // the in-memory state of the filesystem already has the layer file in its final place,
    2259              :             // and subsequent pageserver code could think it's durable while it really isn't.
    2260           14 :             let timeline_dir = VirtualFile::open(
    2261           14 :                 &self
    2262           14 :                     .conf
    2263           14 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    2264           14 :                 ctx,
    2265           14 :             )
    2266           14 :             .await
    2267           14 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    2268           14 :             timeline_dir
    2269           14 :                 .sync_all()
    2270           14 :                 .await
    2271           14 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    2272            0 :         }
    2273              : 
    2274           14 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    2275           14 :         stats.new_deltas_count = Some(new_layers.len());
    2276          154 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    2277           14 : 
    2278           14 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    2279           14 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    2280              :         {
    2281           14 :             Ok(stats_json) => {
    2282           14 :                 info!(
    2283            0 :                     stats_json = stats_json.as_str(),
    2284            0 :                     "compact_level0_phase1 stats available"
    2285              :                 )
    2286              :             }
    2287            0 :             Err(e) => {
    2288            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    2289              :             }
    2290              :         }
    2291              : 
    2292              :         // Without this, rustc complains about deltas_to_compact still
    2293              :         // being borrowed when we `.into_iter()` below.
    2294           14 :         drop(all_values_iter);
    2295           14 : 
    2296           14 :         Ok(CompactLevel0Phase1Result {
    2297           14 :             new_layers,
    2298           14 :             deltas_to_compact: deltas_to_compact
    2299           14 :                 .into_iter()
    2300          201 :                 .map(|x| x.drop_eviction_guard())
    2301           14 :                 .collect::<Vec<_>>(),
    2302           14 :             outcome: if fully_compacted {
    2303           14 :                 CompactionOutcome::Done
    2304              :             } else {
    2305            0 :                 CompactionOutcome::Pending
    2306              :             },
    2307              :         })
    2308          182 :     }
    2309              : }
    2310              : 
    2311              : #[derive(Default)]
    2312              : struct CompactLevel0Phase1Result {
    2313              :     new_layers: Vec<ResidentLayer>,
    2314              :     deltas_to_compact: Vec<Layer>,
    2315              :     // Whether we have included all L0 layers, or selected only part of them due to the
    2316              :     // L0 compaction size limit.
    2317              :     outcome: CompactionOutcome,
    2318              : }
    2319              : 
    2320              : #[derive(Default)]
    2321              : struct CompactLevel0Phase1StatsBuilder {
    2322              :     version: Option<u64>,
    2323              :     tenant_id: Option<TenantShardId>,
    2324              :     timeline_id: Option<TimelineId>,
    2325              :     read_lock_acquisition_micros: DurationRecorder,
    2326              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    2327              :     read_lock_held_key_sort_micros: DurationRecorder,
    2328              :     read_lock_held_prerequisites_micros: DurationRecorder,
    2329              :     read_lock_held_compute_holes_micros: DurationRecorder,
    2330              :     read_lock_drop_micros: DurationRecorder,
    2331              :     write_layer_files_micros: DurationRecorder,
    2332              :     level0_deltas_count: Option<usize>,
    2333              :     new_deltas_count: Option<usize>,
    2334              :     new_deltas_size: Option<u64>,
    2335              : }
    2336              : 
    2337              : #[derive(serde::Serialize)]
    2338              : struct CompactLevel0Phase1Stats {
    2339              :     version: u64,
    2340              :     tenant_id: TenantShardId,
    2341              :     timeline_id: TimelineId,
    2342              :     read_lock_acquisition_micros: RecordedDuration,
    2343              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    2344              :     read_lock_held_key_sort_micros: RecordedDuration,
    2345              :     read_lock_held_prerequisites_micros: RecordedDuration,
    2346              :     read_lock_held_compute_holes_micros: RecordedDuration,
    2347              :     read_lock_drop_micros: RecordedDuration,
    2348              :     write_layer_files_micros: RecordedDuration,
    2349              :     level0_deltas_count: usize,
    2350              :     new_deltas_count: usize,
    2351              :     new_deltas_size: u64,
    2352              : }
    2353              : 
    2354              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    2355              :     type Error = anyhow::Error;
    2356              : 
    2357           14 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    2358           14 :         Ok(Self {
    2359           14 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    2360           14 :             tenant_id: value
    2361           14 :                 .tenant_id
    2362           14 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    2363           14 :             timeline_id: value
    2364           14 :                 .timeline_id
    2365           14 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    2366           14 :             read_lock_acquisition_micros: value
    2367           14 :                 .read_lock_acquisition_micros
    2368           14 :                 .into_recorded()
    2369           14 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    2370           14 :             read_lock_held_spawn_blocking_startup_micros: value
    2371           14 :                 .read_lock_held_spawn_blocking_startup_micros
    2372           14 :                 .into_recorded()
    2373           14 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    2374           14 :             read_lock_held_key_sort_micros: value
    2375           14 :                 .read_lock_held_key_sort_micros
    2376           14 :                 .into_recorded()
    2377           14 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    2378           14 :             read_lock_held_prerequisites_micros: value
    2379           14 :                 .read_lock_held_prerequisites_micros
    2380           14 :                 .into_recorded()
    2381           14 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    2382           14 :             read_lock_held_compute_holes_micros: value
    2383           14 :                 .read_lock_held_compute_holes_micros
    2384           14 :                 .into_recorded()
    2385           14 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    2386           14 :             read_lock_drop_micros: value
    2387           14 :                 .read_lock_drop_micros
    2388           14 :                 .into_recorded()
    2389           14 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    2390           14 :             write_layer_files_micros: value
    2391           14 :                 .write_layer_files_micros
    2392           14 :                 .into_recorded()
    2393           14 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    2394           14 :             level0_deltas_count: value
    2395           14 :                 .level0_deltas_count
    2396           14 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    2397           14 :             new_deltas_count: value
    2398           14 :                 .new_deltas_count
    2399           14 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    2400           14 :             new_deltas_size: value
    2401           14 :                 .new_deltas_size
    2402           14 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    2403              :         })
    2404           14 :     }
    2405              : }
    2406              : 
    2407              : impl Timeline {
    2408              :     /// Entry point for new tiered compaction algorithm.
    2409              :     ///
    2410              :     /// All the real work is in the implementation in the pageserver_compaction
    2411              :     /// crate. The code here would apply to any algorithm implemented by the
    2412              :     /// same interface, but tiered is the only one at the moment.
    2413              :     ///
    2414              :     /// TODO: cancellation
    2415            0 :     pub(crate) async fn compact_tiered(
    2416            0 :         self: &Arc<Self>,
    2417            0 :         _cancel: &CancellationToken,
    2418            0 :         ctx: &RequestContext,
    2419            0 :     ) -> Result<(), CompactionError> {
    2420            0 :         let fanout = self.get_compaction_threshold() as u64;
    2421            0 :         let target_file_size = self.get_checkpoint_distance();
    2422              : 
    2423              :         // Find the top of the historical layers
    2424            0 :         let end_lsn = {
    2425            0 :             let guard = self.layers.read().await;
    2426            0 :             let layers = guard.layer_map()?;
    2427              : 
    2428            0 :             let l0_deltas = layers.level0_deltas();
    2429            0 : 
    2430            0 :             // As an optimization, if we find that there are too few L0 layers,
    2431            0 :             // bail out early. We know that the compaction algorithm would do
    2432            0 :             // nothing in that case.
    2433            0 :             if l0_deltas.len() < fanout as usize {
    2434              :                 // doesn't need compacting
    2435            0 :                 return Ok(());
    2436            0 :             }
    2437            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    2438            0 :         };
    2439            0 : 
    2440            0 :         // Is the timeline being deleted?
    2441            0 :         if self.is_stopping() {
    2442            0 :             trace!("Dropping out of compaction on timeline shutdown");
    2443            0 :             return Err(CompactionError::ShuttingDown);
    2444            0 :         }
    2445              : 
    2446            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    2447              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    2448            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    2449            0 : 
    2450            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    2451            0 :             &mut adaptor,
    2452            0 :             end_lsn,
    2453            0 :             target_file_size,
    2454            0 :             fanout,
    2455            0 :             ctx,
    2456            0 :         )
    2457            0 :         .await
    2458              :         // TODO: compact_tiered needs to return CompactionError
    2459            0 :         .map_err(CompactionError::Other)?;
    2460              : 
    2461            0 :         adaptor.flush_updates().await?;
    2462            0 :         Ok(())
    2463            0 :     }
    2464              : 
    2465              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    2466              :     ///
    2467              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    2468              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    2469              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    2470              :     ///
    2471              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    2472              :     ///
    2473              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    2474              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    2475              :     ///
    2476              :     /// The function will produce:
    2477              :     ///
    2478              :     /// ```plain
    2479              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    2480              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    2481              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    2482              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    2483              :     /// ```
    2484              :     ///
    2485              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    2486              :     #[allow(clippy::too_many_arguments)]
    2487          324 :     pub(crate) async fn generate_key_retention(
    2488          324 :         self: &Arc<Timeline>,
    2489          324 :         key: Key,
    2490          324 :         full_history: &[(Key, Lsn, Value)],
    2491          324 :         horizon: Lsn,
    2492          324 :         retain_lsn_below_horizon: &[Lsn],
    2493          324 :         delta_threshold_cnt: usize,
    2494          324 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    2495          324 :         verification: bool,
    2496          324 :     ) -> anyhow::Result<KeyHistoryRetention> {
    2497              :         // Pre-checks for the invariants
    2498              : 
    2499          324 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2500              : 
    2501          324 :         if debug_mode {
    2502          786 :             for (log_key, _, _) in full_history {
    2503          462 :                 assert_eq!(log_key, &key, "mismatched key");
    2504              :             }
    2505          324 :             for i in 1..full_history.len() {
    2506          138 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    2507          138 :                 if full_history[i - 1].1 == full_history[i].1 {
    2508            0 :                     assert!(
    2509            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    2510            0 :                         "unordered delta/image, or duplicated delta"
    2511              :                     );
    2512          138 :                 }
    2513              :             }
    2514              :             // There was an assertion for no base image that checks if the first
    2515              :             // record in the history is `will_init` before, but it was removed.
    2516              :             // This is explained in the test cases for generate_key_retention.
    2517              :             // Search "incomplete history" for more information.
    2518          714 :             for lsn in retain_lsn_below_horizon {
    2519          390 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    2520              :             }
    2521          324 :             for i in 1..retain_lsn_below_horizon.len() {
    2522          178 :                 assert!(
    2523          178 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    2524            0 :                     "unordered LSN"
    2525              :                 );
    2526              :             }
    2527            0 :         }
    2528          324 :         let has_ancestor = base_img_from_ancestor.is_some();
    2529              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    2530              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    2531          324 :         let (mut split_history, lsn_split_points) = {
    2532          324 :             let mut split_history = Vec::new();
    2533          324 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    2534          324 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    2535          714 :             for lsn in retain_lsn_below_horizon {
    2536          390 :                 lsn_split_points.push(*lsn);
    2537          390 :             }
    2538          324 :             lsn_split_points.push(horizon);
    2539          324 :             let mut current_idx = 0;
    2540          786 :             for item @ (_, lsn, _) in full_history {
    2541          584 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    2542          122 :                     current_idx += 1;
    2543          122 :                 }
    2544          462 :                 split_history[current_idx].push(item);
    2545              :             }
    2546          324 :             (split_history, lsn_split_points)
    2547              :         };
    2548              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    2549         1362 :         for split_for_lsn in &mut split_history {
    2550         1038 :             let mut prev_lsn = None;
    2551         1038 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    2552         1038 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    2553          462 :                 if let Some(prev_lsn) = &prev_lsn {
    2554           62 :                     if *prev_lsn == lsn {
    2555              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    2556              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    2557              :                         // drop this delta and keep the image.
    2558              :                         //
    2559              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    2560              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    2561              :                         // dropped.
    2562              :                         //
    2563              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    2564              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    2565            0 :                         continue;
    2566           62 :                     }
    2567          400 :                 }
    2568          462 :                 prev_lsn = Some(lsn);
    2569          462 :                 new_split_for_lsn.push(record);
    2570              :             }
    2571         1038 :             *split_for_lsn = new_split_for_lsn;
    2572              :         }
    2573              :         // Step 3: generate images when necessary
    2574          324 :         let mut retention = Vec::with_capacity(split_history.len());
    2575          324 :         let mut records_since_last_image = 0;
    2576          324 :         let batch_cnt = split_history.len();
    2577          324 :         assert!(
    2578          324 :             batch_cnt >= 2,
    2579            0 :             "should have at least below + above horizon batches"
    2580              :         );
    2581          324 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    2582          324 :         if let Some((key, lsn, ref img)) = base_img_from_ancestor {
    2583           21 :             replay_history.push((key, lsn, Value::Image(img.clone())));
    2584          303 :         }
    2585              : 
    2586              :         /// Generate debug information for the replay history
    2587            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    2588              :             use std::fmt::Write;
    2589            0 :             let mut output = String::new();
    2590            0 :             if let Some((key, _, _)) = replay_history.first() {
    2591            0 :                 write!(output, "key={} ", key).unwrap();
    2592            0 :                 let mut cnt = 0;
    2593            0 :                 for (_, lsn, val) in replay_history {
    2594            0 :                     if val.is_image() {
    2595            0 :                         write!(output, "i@{} ", lsn).unwrap();
    2596            0 :                     } else if val.will_init() {
    2597            0 :                         write!(output, "di@{} ", lsn).unwrap();
    2598            0 :                     } else {
    2599            0 :                         write!(output, "d@{} ", lsn).unwrap();
    2600            0 :                     }
    2601            0 :                     cnt += 1;
    2602            0 :                     if cnt >= 128 {
    2603            0 :                         write!(output, "... and more").unwrap();
    2604            0 :                         break;
    2605            0 :                     }
    2606              :                 }
    2607            0 :             } else {
    2608            0 :                 write!(output, "<no history>").unwrap();
    2609            0 :             }
    2610            0 :             output
    2611            0 :         }
    2612              : 
    2613            0 :         fn generate_debug_trace(
    2614            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    2615            0 :             full_history: &[(Key, Lsn, Value)],
    2616            0 :             lsns: &[Lsn],
    2617            0 :             horizon: Lsn,
    2618            0 :         ) -> String {
    2619              :             use std::fmt::Write;
    2620            0 :             let mut output = String::new();
    2621            0 :             if let Some(replay_history) = replay_history {
    2622            0 :                 writeln!(
    2623            0 :                     output,
    2624            0 :                     "replay_history: {}",
    2625            0 :                     generate_history_trace(replay_history)
    2626            0 :                 )
    2627            0 :                 .unwrap();
    2628            0 :             } else {
    2629            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    2630            0 :             }
    2631            0 :             writeln!(
    2632            0 :                 output,
    2633            0 :                 "full_history: {}",
    2634            0 :                 generate_history_trace(full_history)
    2635            0 :             )
    2636            0 :             .unwrap();
    2637            0 :             writeln!(
    2638            0 :                 output,
    2639            0 :                 "when processing: [{}] horizon={}",
    2640            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    2641            0 :                 horizon
    2642            0 :             )
    2643            0 :             .unwrap();
    2644            0 :             output
    2645            0 :         }
    2646              : 
    2647          324 :         let mut key_exists = false;
    2648         1037 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    2649              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    2650         1037 :             records_since_last_image += split_for_lsn.len();
    2651              :             // Whether to produce an image into the final layer files
    2652         1037 :             let produce_image = if i == 0 && !has_ancestor {
    2653              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    2654          303 :                 true
    2655          734 :             } else if i == batch_cnt - 1 {
    2656              :                 // Do not generate images for the last batch (above horizon)
    2657          323 :                 false
    2658          411 :             } else if records_since_last_image == 0 {
    2659          322 :                 false
    2660           89 :             } else if records_since_last_image >= delta_threshold_cnt {
    2661              :                 // Generate images when there are too many records
    2662            3 :                 true
    2663              :             } else {
    2664           86 :                 false
    2665              :             };
    2666         1037 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    2667              :             // Only retain the items after the last image record
    2668         1277 :             for idx in (0..replay_history.len()).rev() {
    2669         1277 :                 if replay_history[idx].2.will_init() {
    2670         1037 :                     replay_history = replay_history[idx..].to_vec();
    2671         1037 :                     break;
    2672          240 :                 }
    2673              :             }
    2674         1037 :             if replay_history.is_empty() && !key_exists {
    2675              :                 // The key does not exist at earlier LSN, we can skip this iteration.
    2676            0 :                 retention.push(Vec::new());
    2677            0 :                 continue;
    2678         1037 :             } else {
    2679         1037 :                 key_exists = true;
    2680         1037 :             }
    2681         1037 :             let Some((_, _, val)) = replay_history.first() else {
    2682            0 :                 unreachable!("replay history should not be empty once it exists")
    2683              :             };
    2684         1037 :             if !val.will_init() {
    2685            0 :                 return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
    2686            0 :                     generate_debug_trace(
    2687            0 :                         Some(&replay_history),
    2688            0 :                         full_history,
    2689            0 :                         retain_lsn_below_horizon,
    2690            0 :                         horizon,
    2691            0 :                     )
    2692            0 :                 });
    2693         1037 :             }
    2694              :             // Whether to reconstruct the image. In debug mode, we will generate an image
    2695              :             // at every retain_lsn to ensure data is not corrupted, but we won't put the
    2696              :             // image into the final layer.
    2697         1037 :             let img_and_lsn = if produce_image {
    2698          306 :                 records_since_last_image = 0;
    2699          306 :                 let replay_history_for_debug = if debug_mode {
    2700          306 :                     Some(replay_history.clone())
    2701              :                 } else {
    2702            0 :                     None
    2703              :                 };
    2704          306 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    2705          306 :                 let history = std::mem::take(&mut replay_history);
    2706          306 :                 let mut img = None;
    2707          306 :                 let mut records = Vec::with_capacity(history.len());
    2708          306 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    2709          295 :                     img = Some((*lsn, val.clone()));
    2710          295 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    2711           20 :                         let Value::WalRecord(rec) = val else {
    2712            0 :                             return Err(anyhow::anyhow!(
    2713            0 :                                 "invalid record, first record is image, expect walrecords"
    2714            0 :                             ))
    2715            0 :                             .with_context(|| {
    2716            0 :                                 generate_debug_trace(
    2717            0 :                                     replay_history_for_debug_ref,
    2718            0 :                                     full_history,
    2719            0 :                                     retain_lsn_below_horizon,
    2720            0 :                                     horizon,
    2721            0 :                                 )
    2722            0 :                             });
    2723              :                         };
    2724           20 :                         records.push((lsn, rec));
    2725              :                     }
    2726              :                 } else {
    2727           18 :                     for (_, lsn, val) in history.into_iter() {
    2728           18 :                         let Value::WalRecord(rec) = val else {
    2729            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    2730            0 :                                 .with_context(|| generate_debug_trace(
    2731            0 :                                     replay_history_for_debug_ref,
    2732            0 :                                     full_history,
    2733            0 :                                     retain_lsn_below_horizon,
    2734            0 :                                     horizon,
    2735            0 :                                 ));
    2736              :                         };
    2737           18 :                         records.push((lsn, rec));
    2738              :                     }
    2739              :                 }
    2740              :                 // WAL redo requires records in the reverse LSN order
    2741          306 :                 records.reverse();
    2742          306 :                 let state = ValueReconstructState { img, records };
    2743              :                 // last batch does not generate image so i is always in range, unless we force generate
    2744              :                 // an image during testing
    2745          306 :                 let request_lsn = if i >= lsn_split_points.len() {
    2746            0 :                     Lsn::MAX
    2747              :                 } else {
    2748          306 :                     lsn_split_points[i]
    2749              :                 };
    2750          306 :                 let img = self
    2751          306 :                     .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
    2752          306 :                     .await?;
    2753          305 :                 Some((request_lsn, img))
    2754              :             } else {
    2755          731 :                 None
    2756              :             };
    2757         1036 :             if produce_image {
    2758          305 :                 let (request_lsn, img) = img_and_lsn.unwrap();
    2759          305 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    2760          305 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    2761          731 :             } else {
    2762          731 :                 let deltas = split_for_lsn
    2763          731 :                     .iter()
    2764          731 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    2765          731 :                     .collect_vec();
    2766          731 :                 retention.push(deltas);
    2767          731 :             }
    2768              :         }
    2769          323 :         let mut result = Vec::with_capacity(retention.len());
    2770          323 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    2771         1036 :         for (idx, logs) in retention.into_iter().enumerate() {
    2772         1036 :             if idx == lsn_split_points.len() {
    2773          323 :                 let retention = KeyHistoryRetention {
    2774          323 :                     below_horizon: result,
    2775          323 :                     above_horizon: KeyLogAtLsn(logs),
    2776          323 :                 };
    2777          323 :                 if verification {
    2778          323 :                     retention
    2779          323 :                         .verify(key, &base_img_from_ancestor, full_history, self)
    2780          323 :                         .await?;
    2781            0 :                 }
    2782          323 :                 return Ok(retention);
    2783          713 :             } else {
    2784          713 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    2785          713 :             }
    2786              :         }
    2787            0 :         unreachable!("key retention is empty")
    2788          324 :     }
    2789              : 
    2790              :     /// Check how much space is left on the disk
    2791           27 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    2792           27 :         let tenants_dir = self.conf.tenants_path();
    2793              : 
    2794           27 :         let stat = Statvfs::get(&tenants_dir, None)
    2795           27 :             .context("statvfs failed, presumably directory got unlinked")?;
    2796              : 
    2797           27 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    2798           27 : 
    2799           27 :         Ok(avail_bytes)
    2800           27 :     }
    2801              : 
    2802              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    2803              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    2804              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    2805              :     /// compaction.
    2806           27 :     async fn check_compaction_space(
    2807           27 :         self: &Arc<Self>,
    2808           27 :         layer_selection: &[Layer],
    2809           27 :     ) -> Result<(), CompactionError> {
    2810           27 :         let available_space = self
    2811           27 :             .check_available_space()
    2812           27 :             .await
    2813           27 :             .map_err(CompactionError::Other)?;
    2814           27 :         let mut remote_layer_size = 0;
    2815           27 :         let mut all_layer_size = 0;
    2816          106 :         for layer in layer_selection {
    2817           79 :             let needs_download = layer
    2818           79 :                 .needs_download()
    2819           79 :                 .await
    2820           79 :                 .context("failed to check if layer needs download")
    2821           79 :                 .map_err(CompactionError::Other)?;
    2822           79 :             if needs_download.is_some() {
    2823            0 :                 remote_layer_size += layer.layer_desc().file_size;
    2824           79 :             }
    2825           79 :             all_layer_size += layer.layer_desc().file_size;
    2826              :         }
    2827           27 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    2828           27 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    2829              :         {
    2830            0 :             return Err(CompactionError::Other(anyhow!(
    2831            0 :                 "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    2832            0 :                 available_space,
    2833            0 :                 allocated_space,
    2834            0 :                 all_layer_size,
    2835            0 :                 remote_layer_size,
    2836            0 :                 all_layer_size + remote_layer_size
    2837            0 :             )));
    2838           27 :         }
    2839           27 :         Ok(())
    2840           27 :     }
    2841              : 
    2842              :     /// Check to bail out of gc compaction early if it would use too much memory.
    2843           27 :     async fn check_memory_usage(
    2844           27 :         self: &Arc<Self>,
    2845           27 :         layer_selection: &[Layer],
    2846           27 :     ) -> Result<(), CompactionError> {
    2847           27 :         let mut estimated_memory_usage_mb = 0.0;
    2848           27 :         let mut num_image_layers = 0;
    2849           27 :         let mut num_delta_layers = 0;
    2850           27 :         let target_layer_size_bytes = 256 * 1024 * 1024;
    2851          106 :         for layer in layer_selection {
    2852           79 :             let layer_desc = layer.layer_desc();
    2853           79 :             if layer_desc.is_delta() {
    2854           44 :                 // Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
    2855           44 :                 // Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
    2856           44 :                 // use 3MB layer size and we need to account for that).
    2857           44 :                 estimated_memory_usage_mb +=
    2858           44 :                     3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
    2859           44 :                 num_delta_layers += 1;
    2860           44 :             } else {
    2861           35 :                 // Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
    2862           35 :                 estimated_memory_usage_mb +=
    2863           35 :                     5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
    2864           35 :                 num_image_layers += 1;
    2865           35 :             }
    2866              :         }
    2867           27 :         if estimated_memory_usage_mb > 1024.0 {
    2868            0 :             return Err(CompactionError::Other(anyhow!(
    2869            0 :                 "estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
    2870            0 :                 estimated_memory_usage_mb,
    2871            0 :                 num_image_layers,
    2872            0 :                 num_delta_layers
    2873            0 :             )));
    2874           27 :         }
    2875           27 :         Ok(())
    2876           27 :     }
    2877              : 
    2878              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    2879              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    2880              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    2881              :     /// here.
    2882           28 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    2883           28 :         let gc_cutoff_lsn = {
    2884           28 :             let gc_info = self.gc_info.read().unwrap();
    2885           28 :             gc_info.min_cutoff()
    2886           28 :         };
    2887           28 : 
    2888           28 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    2889           28 :         // let watermark = watermark.min(self.standby_horizon.load());
    2890           28 : 
    2891           28 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    2892           28 :         // them when computing the watermark.
    2893           28 :         gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
    2894           28 :     }
    2895              : 
    2896              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    2897              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    2898              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    2899              :     /// like a full compaction of the specified keyspace.
    2900            0 :     pub(crate) async fn gc_compaction_split_jobs(
    2901            0 :         self: &Arc<Self>,
    2902            0 :         job: GcCompactJob,
    2903            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    2904            0 :     ) -> Result<Vec<GcCompactJob>, CompactionError> {
    2905            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    2906            0 :             job.compact_lsn_range.end
    2907              :         } else {
    2908            0 :             self.get_gc_compaction_watermark()
    2909              :         };
    2910              : 
    2911            0 :         if compact_below_lsn == Lsn::INVALID {
    2912            0 :             tracing::warn!(
    2913            0 :                 "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    2914              :             );
    2915            0 :             return Ok(vec![]);
    2916            0 :         }
    2917              : 
    2918              :         // Split compaction job to about 4GB each
    2919              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    2920            0 :         let sub_compaction_max_job_size_mb =
    2921            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    2922            0 : 
    2923            0 :         let mut compact_jobs = Vec::<GcCompactJob>::new();
    2924            0 :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    2925            0 :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    2926            0 :         let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
    2927              :         // Truncate the key range to be within user specified compaction range.
    2928            0 :         fn truncate_to(
    2929            0 :             source_start: &Key,
    2930            0 :             source_end: &Key,
    2931            0 :             target_start: &Key,
    2932            0 :             target_end: &Key,
    2933            0 :         ) -> Option<(Key, Key)> {
    2934            0 :             let start = source_start.max(target_start);
    2935            0 :             let end = source_end.min(target_end);
    2936            0 :             if start < end {
    2937            0 :                 Some((*start, *end))
    2938              :             } else {
    2939            0 :                 None
    2940              :             }
    2941            0 :         }
    2942            0 :         let mut split_key_ranges = Vec::new();
    2943            0 :         let ranges = dense_ks
    2944            0 :             .parts
    2945            0 :             .iter()
    2946            0 :             .map(|partition| partition.ranges.iter())
    2947            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    2948            0 :             .flatten()
    2949            0 :             .cloned()
    2950            0 :             .collect_vec();
    2951            0 :         for range in ranges.iter() {
    2952            0 :             let Some((start, end)) = truncate_to(
    2953            0 :                 &range.start,
    2954            0 :                 &range.end,
    2955            0 :                 &job.compact_key_range.start,
    2956            0 :                 &job.compact_key_range.end,
    2957            0 :             ) else {
    2958            0 :                 continue;
    2959              :             };
    2960            0 :             split_key_ranges.push((start, end));
    2961              :         }
    2962            0 :         split_key_ranges.sort();
    2963            0 :         let all_layers = {
    2964            0 :             let guard = self.layers.read().await;
    2965            0 :             let layer_map = guard.layer_map()?;
    2966            0 :             layer_map.iter_historic_layers().collect_vec()
    2967            0 :         };
    2968            0 :         let mut current_start = None;
    2969            0 :         let ranges_num = split_key_ranges.len();
    2970            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    2971            0 :             if current_start.is_none() {
    2972            0 :                 current_start = Some(start);
    2973            0 :             }
    2974            0 :             let start = current_start.unwrap();
    2975            0 :             if start >= end {
    2976              :                 // We have already processed this partition.
    2977            0 :                 continue;
    2978            0 :             }
    2979            0 :             let overlapping_layers = {
    2980            0 :                 let mut desc = Vec::new();
    2981            0 :                 for layer in all_layers.iter() {
    2982            0 :                     if overlaps_with(&layer.get_key_range(), &(start..end))
    2983            0 :                         && layer.get_lsn_range().start <= compact_below_lsn
    2984            0 :                     {
    2985            0 :                         desc.push(layer.clone());
    2986            0 :                     }
    2987              :                 }
    2988            0 :                 desc
    2989            0 :             };
    2990            0 :             let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
    2991            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    2992              :                 // Try to extend the compaction range so that we include at least one full layer file.
    2993            0 :                 let extended_end = overlapping_layers
    2994            0 :                     .iter()
    2995            0 :                     .map(|layer| layer.key_range.end)
    2996            0 :                     .min();
    2997              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    2998              :                 // In this case, we simply use the specified key range end.
    2999            0 :                 let end = if let Some(extended_end) = extended_end {
    3000            0 :                     extended_end.max(end)
    3001              :                 } else {
    3002            0 :                     end
    3003              :                 };
    3004            0 :                 let end = if ranges_num == idx + 1 {
    3005              :                     // extend the compaction range to the end of the key range if it's the last partition
    3006            0 :                     end.max(job.compact_key_range.end)
    3007              :                 } else {
    3008            0 :                     end
    3009              :                 };
    3010            0 :                 if total_size == 0 && !compact_jobs.is_empty() {
    3011            0 :                     info!(
    3012            0 :                         "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
    3013              :                         start, end, total_size
    3014              :                     );
    3015            0 :                     compact_jobs.last_mut().unwrap().compact_key_range.end = end;
    3016            0 :                     current_start = Some(end);
    3017              :                 } else {
    3018            0 :                     info!(
    3019            0 :                         "splitting compaction job: {}..{}, estimated_size={}",
    3020              :                         start, end, total_size
    3021              :                     );
    3022            0 :                     compact_jobs.push(GcCompactJob {
    3023            0 :                         dry_run: job.dry_run,
    3024            0 :                         compact_key_range: start..end,
    3025            0 :                         compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    3026            0 :                     });
    3027            0 :                     current_start = Some(end);
    3028              :                 }
    3029            0 :             }
    3030              :         }
    3031            0 :         Ok(compact_jobs)
    3032            0 :     }
    3033              : 
    3034              :     /// An experimental compaction building block that combines compaction with garbage collection.
    3035              :     ///
    3036              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    3037              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    3038              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    3039              :     /// and create delta layers with all deltas >= gc horizon.
    3040              :     ///
    3041              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    3042              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    3043              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    3044              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    3045              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    3046              :     /// part of the range.
    3047              :     ///
    3048              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    3049              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    3050           28 :     pub(crate) async fn compact_with_gc(
    3051           28 :         self: &Arc<Self>,
    3052           28 :         cancel: &CancellationToken,
    3053           28 :         options: CompactOptions,
    3054           28 :         ctx: &RequestContext,
    3055           28 :     ) -> Result<CompactionOutcome, CompactionError> {
    3056           28 :         let sub_compaction = options.sub_compaction;
    3057           28 :         let job = GcCompactJob::from_compact_options(options.clone());
    3058           28 :         let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
    3059           28 :         if sub_compaction {
    3060            0 :             info!(
    3061            0 :                 "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
    3062              :             );
    3063            0 :             let jobs = self
    3064            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    3065            0 :                 .await?;
    3066            0 :             let jobs_len = jobs.len();
    3067            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    3068            0 :                 info!(
    3069            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    3070            0 :                     idx + 1,
    3071              :                     jobs_len
    3072              :                 );
    3073            0 :                 self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
    3074            0 :                     .await?;
    3075              :             }
    3076            0 :             if jobs_len == 0 {
    3077            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    3078            0 :             }
    3079            0 :             return Ok(CompactionOutcome::Done);
    3080           28 :         }
    3081           28 :         self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
    3082           28 :             .await
    3083           28 :     }
    3084              : 
    3085           28 :     async fn compact_with_gc_inner(
    3086           28 :         self: &Arc<Self>,
    3087           28 :         cancel: &CancellationToken,
    3088           28 :         job: GcCompactJob,
    3089           28 :         ctx: &RequestContext,
    3090           28 :         yield_for_l0: bool,
    3091           28 :     ) -> Result<CompactionOutcome, CompactionError> {
    3092           28 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    3093           28 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    3094           28 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    3095           28 : 
    3096           28 :         let timer = Instant::now();
    3097           28 :         let begin_timer = timer;
    3098           28 : 
    3099           28 :         let gc_lock = async {
    3100           28 :             tokio::select! {
    3101           28 :                 guard = self.gc_lock.lock() => Ok(guard),
    3102           28 :                 _ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
    3103              :             }
    3104           28 :         };
    3105              : 
    3106           28 :         let time_acquire_lock = timer.elapsed();
    3107           28 :         let timer = Instant::now();
    3108              : 
    3109           28 :         let gc_lock = crate::timed(
    3110           28 :             gc_lock,
    3111           28 :             "acquires gc lock",
    3112           28 :             std::time::Duration::from_secs(5),
    3113           28 :         )
    3114           28 :         .await?;
    3115              : 
    3116           28 :         let dry_run = job.dry_run;
    3117           28 :         let compact_key_range = job.compact_key_range;
    3118           28 :         let compact_lsn_range = job.compact_lsn_range;
    3119              : 
    3120           28 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    3121              : 
    3122           28 :         info!(
    3123            0 :             "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
    3124              :             compact_key_range.start,
    3125              :             compact_key_range.end,
    3126              :             compact_lsn_range.start,
    3127              :             compact_lsn_range.end
    3128              :         );
    3129              : 
    3130           28 :         scopeguard::defer! {
    3131           28 :             info!("done enhanced gc bottom-most compaction");
    3132           28 :         };
    3133           28 : 
    3134           28 :         let mut stat = CompactionStatistics::default();
    3135              : 
    3136              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    3137              :         // The layer selection has the following properties:
    3138              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    3139              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    3140           27 :         let job_desc = {
    3141           28 :             let guard = self.layers.read().await;
    3142           28 :             let layers = guard.layer_map()?;
    3143           28 :             let gc_info = self.gc_info.read().unwrap();
    3144           28 :             let mut retain_lsns_below_horizon = Vec::new();
    3145           28 :             let gc_cutoff = {
    3146              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    3147              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    3148              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    3149              :                 // to get the truth data.
    3150           28 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    3151              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    3152              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    3153              :                 // the real cutoff.
    3154           28 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    3155           25 :                     if real_gc_cutoff == Lsn::INVALID {
    3156              :                         // If the gc_cutoff is not generated yet, we should not compact anything.
    3157            0 :                         tracing::warn!(
    3158            0 :                             "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    3159              :                         );
    3160            0 :                         return Ok(CompactionOutcome::Skipped);
    3161           25 :                     }
    3162           25 :                     real_gc_cutoff
    3163              :                 } else {
    3164            3 :                     compact_lsn_range.end
    3165              :                 };
    3166           28 :                 if gc_cutoff > real_gc_cutoff {
    3167            2 :                     warn!(
    3168            0 :                         "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
    3169              :                         gc_cutoff, real_gc_cutoff
    3170              :                     );
    3171            2 :                     gc_cutoff = real_gc_cutoff;
    3172           26 :                 }
    3173           28 :                 gc_cutoff
    3174              :             };
    3175           35 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    3176           35 :                 if lsn < &gc_cutoff {
    3177           35 :                     retain_lsns_below_horizon.push(*lsn);
    3178           35 :                 }
    3179              :             }
    3180           28 :             for lsn in gc_info.leases.keys() {
    3181            0 :                 if lsn < &gc_cutoff {
    3182            0 :                     retain_lsns_below_horizon.push(*lsn);
    3183            0 :                 }
    3184              :             }
    3185           28 :             let mut selected_layers: Vec<Layer> = Vec::new();
    3186           28 :             drop(gc_info);
    3187              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    3188           28 :             let Some(max_layer_lsn) = layers
    3189           28 :                 .iter_historic_layers()
    3190          125 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    3191          107 :                 .map(|desc| desc.get_lsn_range().end)
    3192           28 :                 .max()
    3193              :             else {
    3194            0 :                 info!(
    3195            0 :                     "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
    3196              :                     gc_cutoff
    3197              :                 );
    3198            0 :                 return Ok(CompactionOutcome::Done);
    3199              :             };
    3200              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    3201              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    3202              :             // it is a branch.
    3203           28 :             let Some(min_layer_lsn) = layers
    3204           28 :                 .iter_historic_layers()
    3205          125 :                 .filter(|desc| {
    3206          125 :                     if compact_lsn_range.start == Lsn::INVALID {
    3207          102 :                         true // select all layers below if start == Lsn(0)
    3208              :                     } else {
    3209           23 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    3210              :                     }
    3211          125 :                 })
    3212          116 :                 .map(|desc| desc.get_lsn_range().start)
    3213           28 :                 .min()
    3214              :             else {
    3215            0 :                 info!(
    3216            0 :                     "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
    3217              :                     compact_lsn_range.end
    3218              :                 );
    3219            0 :                 return Ok(CompactionOutcome::Done);
    3220              :             };
    3221              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    3222              :             // layers to compact.
    3223           28 :             let mut rewrite_layers = Vec::new();
    3224          125 :             for desc in layers.iter_historic_layers() {
    3225          125 :                 if desc.get_lsn_range().end <= max_layer_lsn
    3226          107 :                     && desc.get_lsn_range().start >= min_layer_lsn
    3227           98 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    3228              :                 {
    3229              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    3230              :                     // even if it might contain extra keys
    3231           79 :                     selected_layers.push(guard.get_from_desc(&desc));
    3232           79 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    3233           79 :                     // to overlap image layers)
    3234           79 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    3235            1 :                     {
    3236            1 :                         rewrite_layers.push(desc);
    3237           78 :                     }
    3238           46 :                 }
    3239              :             }
    3240           28 :             if selected_layers.is_empty() {
    3241            1 :                 info!(
    3242            0 :                     "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
    3243              :                     gc_cutoff, compact_key_range.start, compact_key_range.end
    3244              :                 );
    3245            1 :                 return Ok(CompactionOutcome::Done);
    3246           27 :             }
    3247           27 :             retain_lsns_below_horizon.sort();
    3248           27 :             GcCompactionJobDescription {
    3249           27 :                 selected_layers,
    3250           27 :                 gc_cutoff,
    3251           27 :                 retain_lsns_below_horizon,
    3252           27 :                 min_layer_lsn,
    3253           27 :                 max_layer_lsn,
    3254           27 :                 compaction_key_range: compact_key_range,
    3255           27 :                 rewrite_layers,
    3256           27 :             }
    3257              :         };
    3258           27 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    3259              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    3260              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    3261            4 :             (true, job_desc.min_layer_lsn)
    3262           23 :         } else if self.ancestor_timeline.is_some() {
    3263              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    3264              :             // LSN ranges all the way to the ancestor timeline.
    3265            1 :             (true, self.ancestor_lsn)
    3266              :         } else {
    3267           22 :             let res = job_desc
    3268           22 :                 .retain_lsns_below_horizon
    3269           22 :                 .first()
    3270           22 :                 .copied()
    3271           22 :                 .unwrap_or(job_desc.gc_cutoff);
    3272           22 :             if debug_mode {
    3273           22 :                 assert_eq!(
    3274           22 :                     res,
    3275           22 :                     job_desc
    3276           22 :                         .retain_lsns_below_horizon
    3277           22 :                         .iter()
    3278           22 :                         .min()
    3279           22 :                         .copied()
    3280           22 :                         .unwrap_or(job_desc.gc_cutoff)
    3281           22 :                 );
    3282            0 :             }
    3283           22 :             (false, res)
    3284              :         };
    3285              : 
    3286           27 :         let verification = self.get_gc_compaction_settings().gc_compaction_verification;
    3287           27 : 
    3288           27 :         info!(
    3289            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    3290            0 :             job_desc.selected_layers.len(),
    3291            0 :             job_desc.rewrite_layers.len(),
    3292              :             job_desc.max_layer_lsn,
    3293              :             job_desc.min_layer_lsn,
    3294              :             job_desc.gc_cutoff,
    3295              :             lowest_retain_lsn,
    3296              :             job_desc.compaction_key_range.start,
    3297              :             job_desc.compaction_key_range.end,
    3298              :             has_data_below,
    3299              :         );
    3300              : 
    3301           27 :         let time_analyze = timer.elapsed();
    3302           27 :         let timer = Instant::now();
    3303              : 
    3304          106 :         for layer in &job_desc.selected_layers {
    3305           79 :             debug!("read layer: {}", layer.layer_desc().key());
    3306              :         }
    3307           28 :         for layer in &job_desc.rewrite_layers {
    3308            1 :             debug!("rewrite layer: {}", layer.key());
    3309              :         }
    3310              : 
    3311           27 :         self.check_compaction_space(&job_desc.selected_layers)
    3312           27 :             .await?;
    3313              : 
    3314           27 :         self.check_memory_usage(&job_desc.selected_layers).await?;
    3315           27 :         if job_desc.selected_layers.len() > 100
    3316            0 :             && job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
    3317              :         {
    3318            0 :             return Err(CompactionError::Other(anyhow!(
    3319            0 :                 "too many layers to rewrite: {} / {}, giving up compaction",
    3320            0 :                 job_desc.rewrite_layers.len(),
    3321            0 :                 job_desc.selected_layers.len()
    3322            0 :             )));
    3323           27 :         }
    3324              : 
    3325              :         // Generate statistics for the compaction
    3326          106 :         for layer in &job_desc.selected_layers {
    3327           79 :             let desc = layer.layer_desc();
    3328           79 :             if desc.is_delta() {
    3329           44 :                 stat.visit_delta_layer(desc.file_size());
    3330           44 :             } else {
    3331           35 :                 stat.visit_image_layer(desc.file_size());
    3332           35 :             }
    3333              :         }
    3334              : 
    3335              :         // Step 1: construct a k-merge iterator over all layers.
    3336              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    3337           27 :         let layer_names = job_desc
    3338           27 :             .selected_layers
    3339           27 :             .iter()
    3340           79 :             .map(|layer| layer.layer_desc().layer_name())
    3341           27 :             .collect_vec();
    3342           27 :         if let Some(err) = check_valid_layermap(&layer_names) {
    3343            0 :             return Err(CompactionError::Other(anyhow!(
    3344            0 :                 "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
    3345            0 :                 err
    3346            0 :             )));
    3347           27 :         }
    3348           27 :         // The maximum LSN we are processing in this compaction loop
    3349           27 :         let end_lsn = job_desc
    3350           27 :             .selected_layers
    3351           27 :             .iter()
    3352           79 :             .map(|l| l.layer_desc().lsn_range.end)
    3353           27 :             .max()
    3354           27 :             .unwrap();
    3355           27 :         let mut delta_layers = Vec::new();
    3356           27 :         let mut image_layers = Vec::new();
    3357           27 :         let mut downloaded_layers = Vec::new();
    3358           27 :         let mut total_downloaded_size = 0;
    3359           27 :         let mut total_layer_size = 0;
    3360          106 :         for layer in &job_desc.selected_layers {
    3361           79 :             if layer
    3362           79 :                 .needs_download()
    3363           79 :                 .await
    3364           79 :                 .context("failed to check if layer needs download")
    3365           79 :                 .map_err(CompactionError::Other)?
    3366           79 :                 .is_some()
    3367            0 :             {
    3368            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    3369           79 :             }
    3370           79 :             total_layer_size += layer.layer_desc().file_size;
    3371           79 :             if cancel.is_cancelled() {
    3372            0 :                 return Err(CompactionError::ShuttingDown);
    3373           79 :             }
    3374           79 :             let should_yield = yield_for_l0
    3375            0 :                 && self
    3376            0 :                     .l0_compaction_trigger
    3377            0 :                     .notified()
    3378            0 :                     .now_or_never()
    3379            0 :                     .is_some();
    3380           79 :             if should_yield {
    3381            0 :                 tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
    3382            0 :                 return Ok(CompactionOutcome::YieldForL0);
    3383           79 :             }
    3384           79 :             let resident_layer = layer
    3385           79 :                 .download_and_keep_resident(ctx)
    3386           79 :                 .await
    3387           79 :                 .context("failed to download and keep resident layer")
    3388           79 :                 .map_err(CompactionError::Other)?;
    3389           79 :             downloaded_layers.push(resident_layer);
    3390              :         }
    3391           27 :         info!(
    3392            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    3393            0 :             total_downloaded_size,
    3394            0 :             total_layer_size,
    3395            0 :             total_downloaded_size as f64 / total_layer_size as f64
    3396              :         );
    3397          106 :         for resident_layer in &downloaded_layers {
    3398           79 :             if resident_layer.layer_desc().is_delta() {
    3399           44 :                 let layer = resident_layer
    3400           44 :                     .get_as_delta(ctx)
    3401           44 :                     .await
    3402           44 :                     .context("failed to get delta layer")
    3403           44 :                     .map_err(CompactionError::Other)?;
    3404           44 :                 delta_layers.push(layer);
    3405              :             } else {
    3406           35 :                 let layer = resident_layer
    3407           35 :                     .get_as_image(ctx)
    3408           35 :                     .await
    3409           35 :                     .context("failed to get image layer")
    3410           35 :                     .map_err(CompactionError::Other)?;
    3411           35 :                 image_layers.push(layer);
    3412              :             }
    3413              :         }
    3414           27 :         let (dense_ks, sparse_ks) = self
    3415           27 :             .collect_gc_compaction_keyspace()
    3416           27 :             .await
    3417           27 :             .context("failed to collect gc compaction keyspace")
    3418           27 :             .map_err(CompactionError::Other)?;
    3419           27 :         let mut merge_iter = FilterIterator::create(
    3420           27 :             MergeIterator::create_with_options(
    3421           27 :                 &delta_layers,
    3422           27 :                 &image_layers,
    3423           27 :                 ctx,
    3424           27 :                 128 * 8192, /* 1MB buffer for each of the inner iterators */
    3425           27 :                 128,
    3426           27 :             ),
    3427           27 :             dense_ks,
    3428           27 :             sparse_ks,
    3429           27 :         )
    3430           27 :         .context("failed to create filter iterator")
    3431           27 :         .map_err(CompactionError::Other)?;
    3432              : 
    3433           27 :         let time_download_layer = timer.elapsed();
    3434           27 :         let mut timer = Instant::now();
    3435           27 : 
    3436           27 :         // Step 2: Produce images+deltas.
    3437           27 :         let mut accumulated_values = Vec::new();
    3438           27 :         let mut accumulated_values_estimated_size = 0;
    3439           27 :         let mut last_key: Option<Key> = None;
    3440              : 
    3441              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    3442              :         // when some condition meet.
    3443           27 :         let mut image_layer_writer = if !has_data_below {
    3444              :             Some(
    3445           22 :                 SplitImageLayerWriter::new(
    3446           22 :                     self.conf,
    3447           22 :                     self.timeline_id,
    3448           22 :                     self.tenant_shard_id,
    3449           22 :                     job_desc.compaction_key_range.start,
    3450           22 :                     lowest_retain_lsn,
    3451           22 :                     self.get_compaction_target_size(),
    3452           22 :                     &self.gate,
    3453           22 :                     self.cancel.clone(),
    3454           22 :                     ctx,
    3455           22 :                 )
    3456           22 :                 .await
    3457           22 :                 .context("failed to create image layer writer")
    3458           22 :                 .map_err(CompactionError::Other)?,
    3459              :             )
    3460              :         } else {
    3461            5 :             None
    3462              :         };
    3463              : 
    3464           27 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    3465           27 :             self.conf,
    3466           27 :             self.timeline_id,
    3467           27 :             self.tenant_shard_id,
    3468           27 :             lowest_retain_lsn..end_lsn,
    3469           27 :             self.get_compaction_target_size(),
    3470           27 :             &self.gate,
    3471           27 :             self.cancel.clone(),
    3472           27 :         )
    3473           27 :         .await
    3474           27 :         .context("failed to create delta layer writer")
    3475           27 :         .map_err(CompactionError::Other)?;
    3476              : 
    3477              :         #[derive(Default)]
    3478              :         struct RewritingLayers {
    3479              :             before: Option<DeltaLayerWriter>,
    3480              :             after: Option<DeltaLayerWriter>,
    3481              :         }
    3482           27 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    3483              : 
    3484              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    3485              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    3486              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    3487              :         ///
    3488              :         /// This function unifies the cases so that later code doesn't have to think about it.
    3489              :         ///
    3490              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    3491              :         /// is needed for reconstruction. This should be fixed in the future.
    3492              :         ///
    3493              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    3494              :         /// images.
    3495          320 :         async fn get_ancestor_image(
    3496          320 :             this_tline: &Arc<Timeline>,
    3497          320 :             key: Key,
    3498          320 :             ctx: &RequestContext,
    3499          320 :             has_data_below: bool,
    3500          320 :             history_lsn_point: Lsn,
    3501          320 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    3502          320 :             if !has_data_below {
    3503          301 :                 return Ok(None);
    3504           19 :             };
    3505              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    3506              :             // as much existing code as possible.
    3507           19 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    3508           19 :             Ok(Some((key, history_lsn_point, img)))
    3509          320 :         }
    3510              : 
    3511              :         // Actually, we can decide not to write to the image layer at all at this point because
    3512              :         // the key and LSN range are determined. However, to keep things simple here, we still
    3513              :         // create this writer, and discard the writer in the end.
    3514           27 :         let mut time_to_first_kv_pair = None;
    3515              : 
    3516          496 :         while let Some(((key, lsn, val), desc)) = merge_iter
    3517          496 :             .next_with_trace()
    3518          496 :             .await
    3519          496 :             .context("failed to get next key-value pair")
    3520          496 :             .map_err(CompactionError::Other)?
    3521              :         {
    3522          470 :             if time_to_first_kv_pair.is_none() {
    3523           27 :                 time_to_first_kv_pair = Some(timer.elapsed());
    3524           27 :                 timer = Instant::now();
    3525          443 :             }
    3526              : 
    3527          470 :             if cancel.is_cancelled() {
    3528            0 :                 return Err(CompactionError::ShuttingDown);
    3529          470 :             }
    3530              : 
    3531          470 :             let should_yield = yield_for_l0
    3532            0 :                 && self
    3533            0 :                     .l0_compaction_trigger
    3534            0 :                     .notified()
    3535            0 :                     .now_or_never()
    3536            0 :                     .is_some();
    3537          470 :             if should_yield {
    3538            0 :                 tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
    3539            0 :                 return Ok(CompactionOutcome::YieldForL0);
    3540          470 :             }
    3541          470 :             if self.shard_identity.is_key_disposable(&key) {
    3542              :                 // If this shard does not need to store this key, simply skip it.
    3543              :                 //
    3544              :                 // This is not handled in the filter iterator because shard is determined by hash.
    3545              :                 // Therefore, it does not give us any performance benefit to do things like skip
    3546              :                 // a whole layer file as handling key spaces (ranges).
    3547            0 :                 if cfg!(debug_assertions) {
    3548            0 :                     let shard = self.shard_identity.shard_index();
    3549            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    3550            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    3551            0 :                 }
    3552            0 :                 continue;
    3553          470 :             }
    3554          470 :             if !job_desc.compaction_key_range.contains(&key) {
    3555           32 :                 if !desc.is_delta {
    3556           30 :                     continue;
    3557            2 :                 }
    3558            2 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    3559            2 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    3560            0 :                     if rewriter.before.is_none() {
    3561            0 :                         rewriter.before = Some(
    3562            0 :                             DeltaLayerWriter::new(
    3563            0 :                                 self.conf,
    3564            0 :                                 self.timeline_id,
    3565            0 :                                 self.tenant_shard_id,
    3566            0 :                                 desc.key_range.start,
    3567            0 :                                 desc.lsn_range.clone(),
    3568            0 :                                 &self.gate,
    3569            0 :                                 self.cancel.clone(),
    3570            0 :                                 ctx,
    3571            0 :                             )
    3572            0 :                             .await
    3573            0 :                             .context("failed to create delta layer writer")
    3574            0 :                             .map_err(CompactionError::Other)?,
    3575              :                         );
    3576            0 :                     }
    3577            0 :                     rewriter.before.as_mut().unwrap()
    3578            2 :                 } else if key >= job_desc.compaction_key_range.end {
    3579            2 :                     if rewriter.after.is_none() {
    3580            1 :                         rewriter.after = Some(
    3581            1 :                             DeltaLayerWriter::new(
    3582            1 :                                 self.conf,
    3583            1 :                                 self.timeline_id,
    3584            1 :                                 self.tenant_shard_id,
    3585            1 :                                 job_desc.compaction_key_range.end,
    3586            1 :                                 desc.lsn_range.clone(),
    3587            1 :                                 &self.gate,
    3588            1 :                                 self.cancel.clone(),
    3589            1 :                                 ctx,
    3590            1 :                             )
    3591            1 :                             .await
    3592            1 :                             .context("failed to create delta layer writer")
    3593            1 :                             .map_err(CompactionError::Other)?,
    3594              :                         );
    3595            1 :                     }
    3596            2 :                     rewriter.after.as_mut().unwrap()
    3597              :                 } else {
    3598            0 :                     unreachable!()
    3599              :                 };
    3600            2 :                 rewriter
    3601            2 :                     .put_value(key, lsn, val, ctx)
    3602            2 :                     .await
    3603            2 :                     .context("failed to put value")
    3604            2 :                     .map_err(CompactionError::Other)?;
    3605            2 :                 continue;
    3606          438 :             }
    3607          438 :             match val {
    3608          315 :                 Value::Image(_) => stat.visit_image_key(&val),
    3609          123 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    3610              :             }
    3611          438 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    3612          144 :                 if last_key.is_none() {
    3613           27 :                     last_key = Some(key);
    3614          117 :                 }
    3615          144 :                 accumulated_values_estimated_size += val.estimated_size();
    3616          144 :                 accumulated_values.push((key, lsn, val));
    3617          144 : 
    3618          144 :                 // Accumulated values should never exceed 512MB.
    3619          144 :                 if accumulated_values_estimated_size >= 1024 * 1024 * 512 {
    3620            0 :                     return Err(CompactionError::Other(anyhow!(
    3621            0 :                         "too many values for a single key: {} for key {}, {} items",
    3622            0 :                         accumulated_values_estimated_size,
    3623            0 :                         key,
    3624            0 :                         accumulated_values.len()
    3625            0 :                     )));
    3626          144 :                 }
    3627              :             } else {
    3628          294 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    3629          294 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    3630          294 :                 let retention = self
    3631          294 :                     .generate_key_retention(
    3632          294 :                         *last_key,
    3633          294 :                         &accumulated_values,
    3634          294 :                         job_desc.gc_cutoff,
    3635          294 :                         &job_desc.retain_lsns_below_horizon,
    3636          294 :                         COMPACTION_DELTA_THRESHOLD,
    3637          294 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    3638          294 :                             .await
    3639          294 :                             .context("failed to get ancestor image")
    3640          294 :                             .map_err(CompactionError::Other)?,
    3641          294 :                         verification,
    3642          294 :                     )
    3643          294 :                     .await
    3644          294 :                     .context("failed to generate key retention")
    3645          294 :                     .map_err(CompactionError::Other)?;
    3646          293 :                 retention
    3647          293 :                     .pipe_to(
    3648          293 :                         *last_key,
    3649          293 :                         &mut delta_layer_writer,
    3650          293 :                         image_layer_writer.as_mut(),
    3651          293 :                         &mut stat,
    3652          293 :                         ctx,
    3653          293 :                     )
    3654          293 :                     .await
    3655          293 :                     .context("failed to pipe to delta layer writer")
    3656          293 :                     .map_err(CompactionError::Other)?;
    3657          293 :                 accumulated_values.clear();
    3658          293 :                 *last_key = key;
    3659          293 :                 accumulated_values_estimated_size = val.estimated_size();
    3660          293 :                 accumulated_values.push((key, lsn, val));
    3661              :             }
    3662              :         }
    3663              : 
    3664              :         // TODO: move the below part to the loop body
    3665           26 :         let Some(last_key) = last_key else {
    3666            0 :             return Err(CompactionError::Other(anyhow!(
    3667            0 :                 "no keys produced during compaction"
    3668            0 :             )));
    3669              :         };
    3670           26 :         stat.on_unique_key_visited();
    3671              : 
    3672           26 :         let retention = self
    3673           26 :             .generate_key_retention(
    3674           26 :                 last_key,
    3675           26 :                 &accumulated_values,
    3676           26 :                 job_desc.gc_cutoff,
    3677           26 :                 &job_desc.retain_lsns_below_horizon,
    3678           26 :                 COMPACTION_DELTA_THRESHOLD,
    3679           26 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
    3680           26 :                     .await
    3681           26 :                     .context("failed to get ancestor image")
    3682           26 :                     .map_err(CompactionError::Other)?,
    3683           26 :                 verification,
    3684           26 :             )
    3685           26 :             .await
    3686           26 :             .context("failed to generate key retention")
    3687           26 :             .map_err(CompactionError::Other)?;
    3688           26 :         retention
    3689           26 :             .pipe_to(
    3690           26 :                 last_key,
    3691           26 :                 &mut delta_layer_writer,
    3692           26 :                 image_layer_writer.as_mut(),
    3693           26 :                 &mut stat,
    3694           26 :                 ctx,
    3695           26 :             )
    3696           26 :             .await
    3697           26 :             .context("failed to pipe to delta layer writer")
    3698           26 :             .map_err(CompactionError::Other)?;
    3699              :         // end: move the above part to the loop body
    3700              : 
    3701           26 :         let time_main_loop = timer.elapsed();
    3702           26 :         let timer = Instant::now();
    3703           26 : 
    3704           26 :         let mut rewrote_delta_layers = Vec::new();
    3705           27 :         for (key, writers) in delta_layer_rewriters {
    3706            1 :             if let Some(delta_writer_before) = writers.before {
    3707            0 :                 let (desc, path) = delta_writer_before
    3708            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    3709            0 :                     .await
    3710            0 :                     .context("failed to finish delta layer writer")
    3711            0 :                     .map_err(CompactionError::Other)?;
    3712            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)
    3713            0 :                     .context("failed to finish creating delta layer")
    3714            0 :                     .map_err(CompactionError::Other)?;
    3715            0 :                 rewrote_delta_layers.push(layer);
    3716            1 :             }
    3717            1 :             if let Some(delta_writer_after) = writers.after {
    3718            1 :                 let (desc, path) = delta_writer_after
    3719            1 :                     .finish(key.key_range.end, ctx)
    3720            1 :                     .await
    3721            1 :                     .context("failed to finish delta layer writer")
    3722            1 :                     .map_err(CompactionError::Other)?;
    3723            1 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)
    3724            1 :                     .context("failed to finish creating delta layer")
    3725            1 :                     .map_err(CompactionError::Other)?;
    3726            1 :                 rewrote_delta_layers.push(layer);
    3727            0 :             }
    3728              :         }
    3729              : 
    3730           37 :         let discard = |key: &PersistentLayerKey| {
    3731           37 :             let key = key.clone();
    3732           37 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    3733           37 :         };
    3734              : 
    3735           26 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    3736           21 :             if !dry_run {
    3737           19 :                 let end_key = job_desc.compaction_key_range.end;
    3738           19 :                 writer
    3739           19 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    3740           19 :                     .await
    3741           19 :                     .context("failed to finish image layer writer")
    3742           19 :                     .map_err(CompactionError::Other)?
    3743              :             } else {
    3744            2 :                 drop(writer);
    3745            2 :                 Vec::new()
    3746              :             }
    3747              :         } else {
    3748            5 :             Vec::new()
    3749              :         };
    3750              : 
    3751           26 :         let produced_delta_layers = if !dry_run {
    3752           24 :             delta_layer_writer
    3753           24 :                 .finish_with_discard_fn(self, ctx, discard)
    3754           24 :                 .await
    3755           24 :                 .context("failed to finish delta layer writer")
    3756           24 :                 .map_err(CompactionError::Other)?
    3757              :         } else {
    3758            2 :             drop(delta_layer_writer);
    3759            2 :             Vec::new()
    3760              :         };
    3761              : 
    3762              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    3763              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    3764           26 :         let mut compact_to = Vec::new();
    3765           26 :         let mut keep_layers = HashSet::new();
    3766           26 :         let produced_delta_layers_len = produced_delta_layers.len();
    3767           26 :         let produced_image_layers_len = produced_image_layers.len();
    3768           26 : 
    3769           26 :         let layer_selection_by_key = job_desc
    3770           26 :             .selected_layers
    3771           26 :             .iter()
    3772           76 :             .map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
    3773           26 :             .collect::<HashMap<_, _>>();
    3774              : 
    3775           44 :         for action in produced_delta_layers {
    3776           18 :             match action {
    3777           11 :                 BatchWriterResult::Produced(layer) => {
    3778           11 :                     if cfg!(debug_assertions) {
    3779           11 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    3780            0 :                     }
    3781           11 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    3782           11 :                     compact_to.push(layer);
    3783              :                 }
    3784            7 :                 BatchWriterResult::Discarded(l) => {
    3785            7 :                     if cfg!(debug_assertions) {
    3786            7 :                         info!("discarded delta layer: {}", l);
    3787            0 :                     }
    3788            7 :                     if let Some(layer_desc) = layer_selection_by_key.get(&l) {
    3789            7 :                         stat.discard_delta_layer(layer_desc.file_size());
    3790            7 :                     } else {
    3791            0 :                         tracing::warn!(
    3792            0 :                             "discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
    3793              :                             l
    3794              :                         );
    3795            0 :                         stat.discard_delta_layer(0);
    3796              :                     }
    3797            7 :                     keep_layers.insert(l);
    3798              :                 }
    3799              :             }
    3800              :         }
    3801           27 :         for layer in &rewrote_delta_layers {
    3802            1 :             debug!(
    3803            0 :                 "produced rewritten delta layer: {}",
    3804            0 :                 layer.layer_desc().key()
    3805              :             );
    3806              :             // For now, we include rewritten delta layer size in the "produce_delta_layer". We could
    3807              :             // make it a separate statistics in the future.
    3808            1 :             stat.produce_delta_layer(layer.layer_desc().file_size());
    3809              :         }
    3810           26 :         compact_to.extend(rewrote_delta_layers);
    3811           45 :         for action in produced_image_layers {
    3812           19 :             match action {
    3813           15 :                 BatchWriterResult::Produced(layer) => {
    3814           15 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    3815           15 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    3816           15 :                     compact_to.push(layer);
    3817              :                 }
    3818            4 :                 BatchWriterResult::Discarded(l) => {
    3819            4 :                     debug!("discarded image layer: {}", l);
    3820            4 :                     if let Some(layer_desc) = layer_selection_by_key.get(&l) {
    3821            4 :                         stat.discard_image_layer(layer_desc.file_size());
    3822            4 :                     } else {
    3823            0 :                         tracing::warn!(
    3824            0 :                             "discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
    3825              :                             l
    3826              :                         );
    3827            0 :                         stat.discard_image_layer(0);
    3828              :                     }
    3829            4 :                     keep_layers.insert(l);
    3830              :                 }
    3831              :             }
    3832              :         }
    3833              : 
    3834           26 :         let mut layer_selection = job_desc.selected_layers;
    3835              : 
    3836              :         // Partial compaction might select more data than it processes, e.g., if
    3837              :         // the compaction_key_range only partially overlaps:
    3838              :         //
    3839              :         //         [---compaction_key_range---]
    3840              :         //   [---A----][----B----][----C----][----D----]
    3841              :         //
    3842              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    3843              :         // the compaction key range, so we can always discard them. However, for image
    3844              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    3845              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    3846              :         //
    3847              :         // The created image layers contain whatever is needed from B, C, and from
    3848              :         // `----]` of A, and from  `[---` of D.
    3849              :         //
    3850              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    3851              :         // keep that data.
    3852              :         //
    3853              :         // The solution for now is to keep A and D completely if they are image layers.
    3854              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    3855              :         // is _not_ fully covered by compaction_key_range).
    3856          102 :         for layer in &layer_selection {
    3857           76 :             if !layer.layer_desc().is_delta() {
    3858           33 :                 if !overlaps_with(
    3859           33 :                     &layer.layer_desc().key_range,
    3860           33 :                     &job_desc.compaction_key_range,
    3861           33 :                 ) {
    3862            0 :                     return Err(CompactionError::Other(anyhow!(
    3863            0 :                         "violated constraint: image layer outside of compaction key range"
    3864            0 :                     )));
    3865           33 :                 }
    3866           33 :                 if !fully_contains(
    3867           33 :                     &job_desc.compaction_key_range,
    3868           33 :                     &layer.layer_desc().key_range,
    3869           33 :                 ) {
    3870            4 :                     keep_layers.insert(layer.layer_desc().key());
    3871           29 :                 }
    3872           43 :             }
    3873              :         }
    3874              : 
    3875           76 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    3876           26 : 
    3877           26 :         let time_final_phase = timer.elapsed();
    3878           26 : 
    3879           26 :         stat.time_final_phase_secs = time_final_phase.as_secs_f64();
    3880           26 :         stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
    3881           26 :             .unwrap_or(Duration::ZERO)
    3882           26 :             .as_secs_f64();
    3883           26 :         stat.time_main_loop_secs = time_main_loop.as_secs_f64();
    3884           26 :         stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
    3885           26 :         stat.time_download_layer_secs = time_download_layer.as_secs_f64();
    3886           26 :         stat.time_analyze_secs = time_analyze.as_secs_f64();
    3887           26 :         stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
    3888           26 :         stat.finalize();
    3889           26 : 
    3890           26 :         info!(
    3891            0 :             "gc-compaction statistics: {}",
    3892            0 :             serde_json::to_string(&stat)
    3893            0 :                 .context("failed to serialize gc-compaction statistics")
    3894            0 :                 .map_err(CompactionError::Other)?
    3895              :         );
    3896              : 
    3897           26 :         if dry_run {
    3898            2 :             return Ok(CompactionOutcome::Done);
    3899           24 :         }
    3900           24 : 
    3901           24 :         info!(
    3902            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    3903            0 :             produced_delta_layers_len,
    3904            0 :             produced_image_layers_len,
    3905            0 :             keep_layers.len()
    3906              :         );
    3907              : 
    3908              :         // Step 3: Place back to the layer map.
    3909              : 
    3910              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    3911           24 :         let all_layers = {
    3912           24 :             let guard = self.layers.read().await;
    3913           24 :             let layer_map = guard.layer_map()?;
    3914           24 :             layer_map.iter_historic_layers().collect_vec()
    3915           24 :         };
    3916           24 : 
    3917           24 :         let mut final_layers = all_layers
    3918           24 :             .iter()
    3919          107 :             .map(|layer| layer.layer_name())
    3920           24 :             .collect::<HashSet<_>>();
    3921           76 :         for layer in &layer_selection {
    3922           52 :             final_layers.remove(&layer.layer_desc().layer_name());
    3923           52 :         }
    3924           51 :         for layer in &compact_to {
    3925           27 :             final_layers.insert(layer.layer_desc().layer_name());
    3926           27 :         }
    3927           24 :         let final_layers = final_layers.into_iter().collect_vec();
    3928              : 
    3929              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    3930              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    3931              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    3932           24 :         if let Some(err) = check_valid_layermap(&final_layers) {
    3933            0 :             return Err(CompactionError::Other(anyhow!(
    3934            0 :                 "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
    3935            0 :                 err
    3936            0 :             )));
    3937           24 :         }
    3938              : 
    3939              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    3940              :         // operate on L1 layers.
    3941              :         {
    3942              :             // Gc-compaction will rewrite the history of a key. This could happen in two ways:
    3943              :             //
    3944              :             // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
    3945              :             // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
    3946              :             // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
    3947              :             // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
    3948              :             // now gets stored in I at the compact LSN.
    3949              :             //
    3950              :             // ---------------                                       ---------------
    3951              :             //   delta1@LSN20                                         image1@LSN20
    3952              :             // ---------------  (read path collects delta@LSN20,  => ---------------  (read path cannot find anything
    3953              :             //   delta1@LSN10    yields)                                               below LSN 20)
    3954              :             // ---------------
    3955              :             //
    3956              :             // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
    3957              :             // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
    3958              :             // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
    3959              :             //
    3960              :             // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
    3961              :             // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
    3962              :             // layer containing 4, yields, and we update the layer map to put the delta layer.
    3963              :             //
    3964              :             // ---------------                                      ---------------
    3965              :             //   delta1@LSN4                                          image1@LSN4
    3966              :             // ---------------  (read path collects delta@LSN4,  => ---------------  (read path collects LSN4 and LSN1,
    3967              :             //  delta1@LSN1-3    yields)                              delta1@LSN1     which is an invalid history)
    3968              :             // ---------------                                      ---------------
    3969              :             //
    3970              :             // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
    3971              :             // and only allow reads to continue after the update is finished.
    3972              : 
    3973           24 :             let update_guard = self.gc_compaction_layer_update_lock.write().await;
    3974              :             // Acquiring the update guard ensures current read operations end and new read operations are blocked.
    3975              :             // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
    3976           24 :             let mut guard = self.layers.write().await;
    3977           24 :             guard
    3978           24 :                 .open_mut()?
    3979           24 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
    3980           24 :             drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
    3981           24 :         };
    3982           24 : 
    3983           24 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    3984           24 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    3985           24 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    3986           24 :         // be batched into `schedule_compaction_update`.
    3987           24 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    3988           24 :         self.schedule_uploads(disk_consistent_lsn, None)
    3989           24 :             .context("failed to schedule uploads")
    3990           24 :             .map_err(CompactionError::Other)?;
    3991              :         // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
    3992              :         // of `compact_from`.
    3993           24 :         let compact_from = {
    3994           24 :             let mut compact_from = Vec::new();
    3995           24 :             let mut compact_to_set = HashMap::new();
    3996           51 :             for layer in &compact_to {
    3997           27 :                 compact_to_set.insert(layer.layer_desc().key(), layer);
    3998           27 :             }
    3999           76 :             for layer in &layer_selection {
    4000           52 :                 if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
    4001            0 :                     tracing::info!(
    4002            0 :                         "skipping delete {} because found same layer key at different generation {}",
    4003              :                         layer,
    4004              :                         to
    4005              :                     );
    4006           52 :                 } else {
    4007           52 :                     compact_from.push(layer.clone());
    4008           52 :                 }
    4009              :             }
    4010           24 :             compact_from
    4011           24 :         };
    4012           24 :         self.remote_client
    4013           24 :             .schedule_compaction_update(&compact_from, &compact_to)?;
    4014              : 
    4015           24 :         drop(gc_lock);
    4016           24 : 
    4017           24 :         Ok(CompactionOutcome::Done)
    4018           28 :     }
    4019              : }
    4020              : 
    4021              : struct TimelineAdaptor {
    4022              :     timeline: Arc<Timeline>,
    4023              : 
    4024              :     keyspace: (Lsn, KeySpace),
    4025              : 
    4026              :     new_deltas: Vec<ResidentLayer>,
    4027              :     new_images: Vec<ResidentLayer>,
    4028              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    4029              : }
    4030              : 
    4031              : impl TimelineAdaptor {
    4032            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    4033            0 :         Self {
    4034            0 :             timeline: timeline.clone(),
    4035            0 :             keyspace,
    4036            0 :             new_images: Vec::new(),
    4037            0 :             new_deltas: Vec::new(),
    4038            0 :             layers_to_delete: Vec::new(),
    4039            0 :         }
    4040            0 :     }
    4041              : 
    4042            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    4043            0 :         let layers_to_delete = {
    4044            0 :             let guard = self.timeline.layers.read().await;
    4045            0 :             self.layers_to_delete
    4046            0 :                 .iter()
    4047            0 :                 .map(|x| guard.get_from_desc(x))
    4048            0 :                 .collect::<Vec<Layer>>()
    4049            0 :         };
    4050            0 :         self.timeline
    4051            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    4052            0 :             .await?;
    4053              : 
    4054            0 :         self.timeline
    4055            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    4056              : 
    4057            0 :         self.new_deltas.clear();
    4058            0 :         self.layers_to_delete.clear();
    4059            0 :         Ok(())
    4060            0 :     }
    4061              : }
    4062              : 
    4063              : #[derive(Clone)]
    4064              : struct ResidentDeltaLayer(ResidentLayer);
    4065              : #[derive(Clone)]
    4066              : struct ResidentImageLayer(ResidentLayer);
    4067              : 
    4068              : impl CompactionJobExecutor for TimelineAdaptor {
    4069              :     type Key = pageserver_api::key::Key;
    4070              : 
    4071              :     type Layer = OwnArc<PersistentLayerDesc>;
    4072              :     type DeltaLayer = ResidentDeltaLayer;
    4073              :     type ImageLayer = ResidentImageLayer;
    4074              : 
    4075              :     type RequestContext = crate::context::RequestContext;
    4076              : 
    4077            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    4078            0 :         self.timeline.get_shard_identity()
    4079            0 :     }
    4080              : 
    4081            0 :     async fn get_layers(
    4082            0 :         &mut self,
    4083            0 :         key_range: &Range<Key>,
    4084            0 :         lsn_range: &Range<Lsn>,
    4085            0 :         _ctx: &RequestContext,
    4086            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    4087            0 :         self.flush_updates().await?;
    4088              : 
    4089            0 :         let guard = self.timeline.layers.read().await;
    4090            0 :         let layer_map = guard.layer_map()?;
    4091              : 
    4092            0 :         let result = layer_map
    4093            0 :             .iter_historic_layers()
    4094            0 :             .filter(|l| {
    4095            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    4096            0 :             })
    4097            0 :             .map(OwnArc)
    4098            0 :             .collect();
    4099            0 :         Ok(result)
    4100            0 :     }
    4101              : 
    4102            0 :     async fn get_keyspace(
    4103            0 :         &mut self,
    4104            0 :         key_range: &Range<Key>,
    4105            0 :         lsn: Lsn,
    4106            0 :         _ctx: &RequestContext,
    4107            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    4108            0 :         if lsn == self.keyspace.0 {
    4109            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    4110            0 :                 &self.keyspace.1.ranges,
    4111            0 :                 key_range,
    4112            0 :             ))
    4113              :         } else {
    4114              :             // The current compaction implementation only ever requests the key space
    4115              :             // at the compaction end LSN.
    4116            0 :             anyhow::bail!("keyspace not available for requested lsn");
    4117              :         }
    4118            0 :     }
    4119              : 
    4120            0 :     async fn downcast_delta_layer(
    4121            0 :         &self,
    4122            0 :         layer: &OwnArc<PersistentLayerDesc>,
    4123            0 :         ctx: &RequestContext,
    4124            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    4125            0 :         // this is a lot more complex than a simple downcast...
    4126            0 :         if layer.is_delta() {
    4127            0 :             let l = {
    4128            0 :                 let guard = self.timeline.layers.read().await;
    4129            0 :                 guard.get_from_desc(layer)
    4130              :             };
    4131            0 :             let result = l.download_and_keep_resident(ctx).await?;
    4132              : 
    4133            0 :             Ok(Some(ResidentDeltaLayer(result)))
    4134              :         } else {
    4135            0 :             Ok(None)
    4136              :         }
    4137            0 :     }
    4138              : 
    4139            0 :     async fn create_image(
    4140            0 :         &mut self,
    4141            0 :         lsn: Lsn,
    4142            0 :         key_range: &Range<Key>,
    4143            0 :         ctx: &RequestContext,
    4144            0 :     ) -> anyhow::Result<()> {
    4145            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    4146            0 :     }
    4147              : 
    4148            0 :     async fn create_delta(
    4149            0 :         &mut self,
    4150            0 :         lsn_range: &Range<Lsn>,
    4151            0 :         key_range: &Range<Key>,
    4152            0 :         input_layers: &[ResidentDeltaLayer],
    4153            0 :         ctx: &RequestContext,
    4154            0 :     ) -> anyhow::Result<()> {
    4155            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    4156              : 
    4157            0 :         let mut all_entries = Vec::new();
    4158            0 :         for dl in input_layers.iter() {
    4159            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    4160              :         }
    4161              : 
    4162              :         // The current stdlib sorting implementation is designed in a way where it is
    4163              :         // particularly fast where the slice is made up of sorted sub-ranges.
    4164            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    4165              : 
    4166            0 :         let mut writer = DeltaLayerWriter::new(
    4167            0 :             self.timeline.conf,
    4168            0 :             self.timeline.timeline_id,
    4169            0 :             self.timeline.tenant_shard_id,
    4170            0 :             key_range.start,
    4171            0 :             lsn_range.clone(),
    4172            0 :             &self.timeline.gate,
    4173            0 :             self.timeline.cancel.clone(),
    4174            0 :             ctx,
    4175            0 :         )
    4176            0 :         .await?;
    4177              : 
    4178            0 :         let mut dup_values = 0;
    4179            0 : 
    4180            0 :         // This iterator walks through all key-value pairs from all the layers
    4181            0 :         // we're compacting, in key, LSN order.
    4182            0 :         let mut prev: Option<(Key, Lsn)> = None;
    4183              :         for &DeltaEntry {
    4184            0 :             key, lsn, ref val, ..
    4185            0 :         } in all_entries.iter()
    4186              :         {
    4187            0 :             if prev == Some((key, lsn)) {
    4188              :                 // This is a duplicate. Skip it.
    4189              :                 //
    4190              :                 // It can happen if compaction is interrupted after writing some
    4191              :                 // layers but not all, and we are compacting the range again.
    4192              :                 // The calculations in the algorithm assume that there are no
    4193              :                 // duplicates, so the math on targeted file size is likely off,
    4194              :                 // and we will create smaller files than expected.
    4195            0 :                 dup_values += 1;
    4196            0 :                 continue;
    4197            0 :             }
    4198              : 
    4199            0 :             let value = val.load(ctx).await?;
    4200              : 
    4201            0 :             writer.put_value(key, lsn, value, ctx).await?;
    4202              : 
    4203            0 :             prev = Some((key, lsn));
    4204              :         }
    4205              : 
    4206            0 :         if dup_values > 0 {
    4207            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    4208            0 :         }
    4209              : 
    4210            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    4211            0 :             Err(anyhow::anyhow!(
    4212            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    4213            0 :             ))
    4214            0 :         });
    4215              : 
    4216            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    4217            0 :         let new_delta_layer =
    4218            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    4219              : 
    4220            0 :         self.new_deltas.push(new_delta_layer);
    4221            0 :         Ok(())
    4222            0 :     }
    4223              : 
    4224            0 :     async fn delete_layer(
    4225            0 :         &mut self,
    4226            0 :         layer: &OwnArc<PersistentLayerDesc>,
    4227            0 :         _ctx: &RequestContext,
    4228            0 :     ) -> anyhow::Result<()> {
    4229            0 :         self.layers_to_delete.push(layer.clone().0);
    4230            0 :         Ok(())
    4231            0 :     }
    4232              : }
    4233              : 
    4234              : impl TimelineAdaptor {
    4235            0 :     async fn create_image_impl(
    4236            0 :         &mut self,
    4237            0 :         lsn: Lsn,
    4238            0 :         key_range: &Range<Key>,
    4239            0 :         ctx: &RequestContext,
    4240            0 :     ) -> Result<(), CreateImageLayersError> {
    4241            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    4242              : 
    4243            0 :         let image_layer_writer = ImageLayerWriter::new(
    4244            0 :             self.timeline.conf,
    4245            0 :             self.timeline.timeline_id,
    4246            0 :             self.timeline.tenant_shard_id,
    4247            0 :             key_range,
    4248            0 :             lsn,
    4249            0 :             &self.timeline.gate,
    4250            0 :             self.timeline.cancel.clone(),
    4251            0 :             ctx,
    4252            0 :         )
    4253            0 :         .await?;
    4254              : 
    4255            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    4256            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    4257            0 :                 "failpoint image-layer-writer-fail-before-finish"
    4258            0 :             )))
    4259            0 :         });
    4260              : 
    4261            0 :         let keyspace = KeySpace {
    4262            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    4263              :         };
    4264              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    4265            0 :         let outcome = self
    4266            0 :             .timeline
    4267            0 :             .create_image_layer_for_rel_blocks(
    4268            0 :                 &keyspace,
    4269            0 :                 image_layer_writer,
    4270            0 :                 lsn,
    4271            0 :                 ctx,
    4272            0 :                 key_range.clone(),
    4273            0 :                 IoConcurrency::sequential(),
    4274            0 :             )
    4275            0 :             .await?;
    4276              : 
    4277              :         if let ImageLayerCreationOutcome::Generated {
    4278            0 :             unfinished_image_layer,
    4279            0 :         } = outcome
    4280              :         {
    4281            0 :             let (desc, path) = unfinished_image_layer.finish(ctx).await?;
    4282            0 :             let image_layer =
    4283            0 :                 Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    4284            0 :             self.new_images.push(image_layer);
    4285            0 :         }
    4286              : 
    4287            0 :         timer.stop_and_record();
    4288            0 : 
    4289            0 :         Ok(())
    4290            0 :     }
    4291              : }
    4292              : 
    4293              : impl CompactionRequestContext for crate::context::RequestContext {}
    4294              : 
    4295              : #[derive(Debug, Clone)]
    4296              : pub struct OwnArc<T>(pub Arc<T>);
    4297              : 
    4298              : impl<T> Deref for OwnArc<T> {
    4299              :     type Target = <Arc<T> as Deref>::Target;
    4300            0 :     fn deref(&self) -> &Self::Target {
    4301            0 :         &self.0
    4302            0 :     }
    4303              : }
    4304              : 
    4305              : impl<T> AsRef<T> for OwnArc<T> {
    4306            0 :     fn as_ref(&self) -> &T {
    4307            0 :         self.0.as_ref()
    4308            0 :     }
    4309              : }
    4310              : 
    4311              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    4312            0 :     fn key_range(&self) -> &Range<Key> {
    4313            0 :         &self.key_range
    4314            0 :     }
    4315            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4316            0 :         &self.lsn_range
    4317            0 :     }
    4318            0 :     fn file_size(&self) -> u64 {
    4319            0 :         self.file_size
    4320            0 :     }
    4321            0 :     fn short_id(&self) -> std::string::String {
    4322            0 :         self.as_ref().short_id().to_string()
    4323            0 :     }
    4324            0 :     fn is_delta(&self) -> bool {
    4325            0 :         self.as_ref().is_delta()
    4326            0 :     }
    4327              : }
    4328              : 
    4329              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    4330            0 :     fn key_range(&self) -> &Range<Key> {
    4331            0 :         &self.layer_desc().key_range
    4332            0 :     }
    4333            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4334            0 :         &self.layer_desc().lsn_range
    4335            0 :     }
    4336            0 :     fn file_size(&self) -> u64 {
    4337            0 :         self.layer_desc().file_size
    4338            0 :     }
    4339            0 :     fn short_id(&self) -> std::string::String {
    4340            0 :         self.layer_desc().short_id().to_string()
    4341            0 :     }
    4342            0 :     fn is_delta(&self) -> bool {
    4343            0 :         true
    4344            0 :     }
    4345              : }
    4346              : 
    4347              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    4348            0 :     fn key_range(&self) -> &Range<Key> {
    4349            0 :         &self.0.layer_desc().key_range
    4350            0 :     }
    4351            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4352            0 :         &self.0.layer_desc().lsn_range
    4353            0 :     }
    4354            0 :     fn file_size(&self) -> u64 {
    4355            0 :         self.0.layer_desc().file_size
    4356            0 :     }
    4357            0 :     fn short_id(&self) -> std::string::String {
    4358            0 :         self.0.layer_desc().short_id().to_string()
    4359            0 :     }
    4360            0 :     fn is_delta(&self) -> bool {
    4361            0 :         true
    4362            0 :     }
    4363              : }
    4364              : 
    4365              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    4366              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    4367              : 
    4368            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    4369            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    4370            0 :     }
    4371              : }
    4372              : 
    4373              : impl CompactionLayer<Key> for ResidentImageLayer {
    4374            0 :     fn key_range(&self) -> &Range<Key> {
    4375            0 :         &self.0.layer_desc().key_range
    4376            0 :     }
    4377            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4378            0 :         &self.0.layer_desc().lsn_range
    4379            0 :     }
    4380            0 :     fn file_size(&self) -> u64 {
    4381            0 :         self.0.layer_desc().file_size
    4382            0 :     }
    4383            0 :     fn short_id(&self) -> std::string::String {
    4384            0 :         self.0.layer_desc().short_id().to_string()
    4385            0 :     }
    4386            0 :     fn is_delta(&self) -> bool {
    4387            0 :         false
    4388            0 :     }
    4389              : }
    4390              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta