LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 5e392a02abbad1ab595f4dba672e219a49f7f539.info Lines: 52.9 % 2859 1512
Test Date: 2025-04-11 22:43:24 Functions: 40.8 % 184 75

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : use std::time::{Duration, Instant};
      11              : 
      12              : use super::layer_manager::LayerManager;
      13              : use super::{
      14              :     CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
      15              :     GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
      16              :     Timeline,
      17              : };
      18              : 
      19              : use crate::tenant::timeline::DeltaEntry;
      20              : use crate::walredo::RedoAttemptType;
      21              : use anyhow::{Context, anyhow};
      22              : use bytes::Bytes;
      23              : use enumset::EnumSet;
      24              : use fail::fail_point;
      25              : use futures::FutureExt;
      26              : use itertools::Itertools;
      27              : use once_cell::sync::Lazy;
      28              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
      29              : use pageserver_api::key::{KEY_SIZE, Key};
      30              : use pageserver_api::keyspace::{KeySpace, ShardedRange};
      31              : use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
      32              : use pageserver_api::record::NeonWalRecord;
      33              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      34              : use pageserver_api::value::Value;
      35              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      36              : use pageserver_compaction::interface::*;
      37              : use serde::Serialize;
      38              : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
      39              : use tokio_util::sync::CancellationToken;
      40              : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
      41              : use utils::critical;
      42              : use utils::id::TimelineId;
      43              : use utils::lsn::Lsn;
      44              : 
      45              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      46              : use crate::page_cache;
      47              : use crate::statvfs::Statvfs;
      48              : use crate::tenant::checks::check_valid_layermap;
      49              : use crate::tenant::gc_block::GcBlock;
      50              : use crate::tenant::layer_map::LayerMap;
      51              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      52              : use crate::tenant::remote_timeline_client::index::GcCompactionState;
      53              : use crate::tenant::storage_layer::batch_split_writer::{
      54              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      55              : };
      56              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      57              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      58              : use crate::tenant::storage_layer::{
      59              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      60              : };
      61              : use crate::tenant::tasks::log_compaction_error;
      62              : use crate::tenant::timeline::{
      63              :     DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
      64              :     ResidentLayer, drop_rlock,
      65              : };
      66              : use crate::tenant::{DeltaLayer, MaybeOffloaded};
      67              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      68              : 
      69              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      70              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      71              : 
      72              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
      73              : pub struct GcCompactionJobId(pub usize);
      74              : 
      75              : impl std::fmt::Display for GcCompactionJobId {
      76            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      77            0 :         write!(f, "{}", self.0)
      78            0 :     }
      79              : }
      80              : 
      81              : pub struct GcCompactionCombinedSettings {
      82              :     pub gc_compaction_enabled: bool,
      83              :     pub gc_compaction_verification: bool,
      84              :     pub gc_compaction_initial_threshold_kb: u64,
      85              :     pub gc_compaction_ratio_percent: u64,
      86              : }
      87              : 
      88              : #[derive(Debug, Clone)]
      89              : pub enum GcCompactionQueueItem {
      90              :     MetaJob {
      91              :         /// Compaction options
      92              :         options: CompactOptions,
      93              :         /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
      94              :         auto: bool,
      95              :     },
      96              :     SubCompactionJob(CompactOptions),
      97              :     Notify(GcCompactionJobId, Option<Lsn>),
      98              : }
      99              : 
     100              : impl GcCompactionQueueItem {
     101            0 :     pub fn into_compact_info_resp(
     102            0 :         self,
     103            0 :         id: GcCompactionJobId,
     104            0 :         running: bool,
     105            0 :     ) -> Option<CompactInfoResponse> {
     106            0 :         match self {
     107            0 :             GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
     108            0 :                 compact_key_range: options.compact_key_range,
     109            0 :                 compact_lsn_range: options.compact_lsn_range,
     110            0 :                 sub_compaction: options.sub_compaction,
     111            0 :                 running,
     112            0 :                 job_id: id.0,
     113            0 :             }),
     114            0 :             GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
     115            0 :                 compact_key_range: options.compact_key_range,
     116            0 :                 compact_lsn_range: options.compact_lsn_range,
     117            0 :                 sub_compaction: options.sub_compaction,
     118            0 :                 running,
     119            0 :                 job_id: id.0,
     120            0 :             }),
     121            0 :             GcCompactionQueueItem::Notify(_, _) => None,
     122              :         }
     123            0 :     }
     124              : }
     125              : 
     126              : #[derive(Default)]
     127              : struct GcCompactionGuardItems {
     128              :     notify: Option<tokio::sync::oneshot::Sender<()>>,
     129              :     permit: Option<OwnedSemaphorePermit>,
     130              : }
     131              : 
     132              : struct GcCompactionQueueInner {
     133              :     running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     134              :     queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     135              :     guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
     136              :     last_id: GcCompactionJobId,
     137              : }
     138              : 
     139              : impl GcCompactionQueueInner {
     140            0 :     fn next_id(&mut self) -> GcCompactionJobId {
     141            0 :         let id = self.last_id;
     142            0 :         self.last_id = GcCompactionJobId(id.0 + 1);
     143            0 :         id
     144            0 :     }
     145              : }
     146              : 
     147              : /// A structure to store gc_compaction jobs.
     148              : pub struct GcCompactionQueue {
     149              :     /// All items in the queue, and the currently-running job.
     150              :     inner: std::sync::Mutex<GcCompactionQueueInner>,
     151              :     /// Ensure only one thread is consuming the queue.
     152              :     consumer_lock: tokio::sync::Mutex<()>,
     153              : }
     154              : 
     155            0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
     156            0 :     // Only allow two timelines on one pageserver to run gc compaction at a time.
     157            0 :     Arc::new(Semaphore::new(2))
     158            0 : });
     159              : 
     160              : impl GcCompactionQueue {
     161            0 :     pub fn new() -> Self {
     162            0 :         GcCompactionQueue {
     163            0 :             inner: std::sync::Mutex::new(GcCompactionQueueInner {
     164            0 :                 running: None,
     165            0 :                 queued: VecDeque::new(),
     166            0 :                 guards: HashMap::new(),
     167            0 :                 last_id: GcCompactionJobId(0),
     168            0 :             }),
     169            0 :             consumer_lock: tokio::sync::Mutex::new(()),
     170            0 :         }
     171            0 :     }
     172              : 
     173            0 :     pub fn cancel_scheduled(&self) {
     174            0 :         let mut guard = self.inner.lock().unwrap();
     175            0 :         guard.queued.clear();
     176            0 :         // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
     177            0 :         // API is only used for testing purposes, so we can drop everything here.
     178            0 :         guard.guards.clear();
     179            0 :     }
     180              : 
     181              :     /// Schedule a manual compaction job.
     182            0 :     pub fn schedule_manual_compaction(
     183            0 :         &self,
     184            0 :         options: CompactOptions,
     185            0 :         notify: Option<tokio::sync::oneshot::Sender<()>>,
     186            0 :     ) -> GcCompactionJobId {
     187            0 :         let mut guard = self.inner.lock().unwrap();
     188            0 :         let id = guard.next_id();
     189            0 :         guard.queued.push_back((
     190            0 :             id,
     191            0 :             GcCompactionQueueItem::MetaJob {
     192            0 :                 options,
     193            0 :                 auto: false,
     194            0 :             },
     195            0 :         ));
     196            0 :         guard.guards.entry(id).or_default().notify = notify;
     197            0 :         info!("scheduled compaction job id={}", id);
     198            0 :         id
     199            0 :     }
     200              : 
     201              :     /// Schedule an auto compaction job.
     202            0 :     fn schedule_auto_compaction(
     203            0 :         &self,
     204            0 :         options: CompactOptions,
     205            0 :         permit: OwnedSemaphorePermit,
     206            0 :     ) -> GcCompactionJobId {
     207            0 :         let mut guard = self.inner.lock().unwrap();
     208            0 :         let id = guard.next_id();
     209            0 :         guard.queued.push_back((
     210            0 :             id,
     211            0 :             GcCompactionQueueItem::MetaJob {
     212            0 :                 options,
     213            0 :                 auto: true,
     214            0 :             },
     215            0 :         ));
     216            0 :         guard.guards.entry(id).or_default().permit = Some(permit);
     217            0 :         id
     218            0 :     }
     219              : 
     220              :     /// Trigger an auto compaction.
     221            0 :     pub async fn trigger_auto_compaction(
     222            0 :         &self,
     223            0 :         timeline: &Arc<Timeline>,
     224            0 :     ) -> Result<(), CompactionError> {
     225            0 :         let GcCompactionCombinedSettings {
     226            0 :             gc_compaction_enabled,
     227            0 :             gc_compaction_initial_threshold_kb,
     228            0 :             gc_compaction_ratio_percent,
     229            0 :             ..
     230            0 :         } = timeline.get_gc_compaction_settings();
     231            0 :         if !gc_compaction_enabled {
     232            0 :             return Ok(());
     233            0 :         }
     234            0 :         if self.remaining_jobs_num() > 0 {
     235              :             // Only schedule auto compaction when the queue is empty
     236            0 :             return Ok(());
     237            0 :         }
     238            0 :         if timeline.ancestor_timeline().is_some() {
     239              :             // Do not trigger auto compaction for child timelines. We haven't tested
     240              :             // it enough in staging yet.
     241            0 :             return Ok(());
     242            0 :         }
     243            0 :         if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
     244              :             // If the gc watermark is not set, we don't need to trigger auto compaction.
     245              :             // This check is the same as in `gc_compaction_split_jobs` but we don't log
     246              :             // here and we can also skip the computation of the trigger condition earlier.
     247            0 :             return Ok(());
     248            0 :         }
     249              : 
     250            0 :         let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
     251              :             // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
     252              :             // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
     253              :             // to ensure the fairness while avoid starving other tasks.
     254            0 :             return Ok(());
     255              :         };
     256              : 
     257            0 :         let gc_compaction_state = timeline.get_gc_compaction_state();
     258            0 :         let l2_lsn = gc_compaction_state
     259            0 :             .map(|x| x.last_completed_lsn)
     260            0 :             .unwrap_or(Lsn::INVALID);
     261              : 
     262            0 :         let layers = {
     263            0 :             let guard = timeline.layers.read().await;
     264            0 :             let layer_map = guard.layer_map()?;
     265            0 :             layer_map.iter_historic_layers().collect_vec()
     266            0 :         };
     267            0 :         let mut l2_size: u64 = 0;
     268            0 :         let mut l1_size = 0;
     269            0 :         let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
     270            0 :         for layer in layers {
     271            0 :             if layer.lsn_range.start <= l2_lsn {
     272            0 :                 l2_size += layer.file_size();
     273            0 :             } else if layer.lsn_range.start <= gc_cutoff {
     274            0 :                 l1_size += layer.file_size();
     275            0 :             }
     276              :         }
     277              : 
     278            0 :         fn trigger_compaction(
     279            0 :             l1_size: u64,
     280            0 :             l2_size: u64,
     281            0 :             gc_compaction_initial_threshold_kb: u64,
     282            0 :             gc_compaction_ratio_percent: u64,
     283            0 :         ) -> bool {
     284              :             const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
     285            0 :             if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
     286              :                 // Do not auto-trigger when physical size >= 150GB
     287            0 :                 return false;
     288            0 :             }
     289            0 :             // initial trigger
     290            0 :             if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
     291            0 :                 info!(
     292            0 :                     "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
     293              :                     l1_size, gc_compaction_initial_threshold_kb
     294              :                 );
     295            0 :                 return true;
     296            0 :             }
     297            0 :             // size ratio trigger
     298            0 :             if l2_size == 0 {
     299            0 :                 return false;
     300            0 :             }
     301            0 :             if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
     302            0 :                 info!(
     303            0 :                     "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
     304              :                     l1_size, l2_size, gc_compaction_ratio_percent
     305              :                 );
     306            0 :                 return true;
     307            0 :             }
     308            0 :             false
     309            0 :         }
     310              : 
     311            0 :         if trigger_compaction(
     312            0 :             l1_size,
     313            0 :             l2_size,
     314            0 :             gc_compaction_initial_threshold_kb,
     315            0 :             gc_compaction_ratio_percent,
     316            0 :         ) {
     317            0 :             self.schedule_auto_compaction(
     318            0 :                 CompactOptions {
     319            0 :                     flags: {
     320            0 :                         let mut flags = EnumSet::new();
     321            0 :                         flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     322            0 :                         if timeline.get_compaction_l0_first() {
     323            0 :                             flags |= CompactFlags::YieldForL0;
     324            0 :                         }
     325            0 :                         flags
     326            0 :                     },
     327            0 :                     sub_compaction: true,
     328            0 :                     // Only auto-trigger gc-compaction over the data keyspace due to concerns in
     329            0 :                     // https://github.com/neondatabase/neon/issues/11318.
     330            0 :                     compact_key_range: Some(CompactKeyRange {
     331            0 :                         start: Key::MIN,
     332            0 :                         end: Key::metadata_key_range().start,
     333            0 :                     }),
     334            0 :                     compact_lsn_range: None,
     335            0 :                     sub_compaction_max_job_size_mb: None,
     336            0 :                 },
     337            0 :                 permit,
     338            0 :             );
     339            0 :             info!(
     340            0 :                 "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     341              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     342              :             );
     343              :         } else {
     344            0 :             debug!(
     345            0 :                 "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     346              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     347              :             );
     348              :         }
     349            0 :         Ok(())
     350            0 :     }
     351              : 
     352              :     /// Notify the caller the job has finished and unblock GC.
     353            0 :     fn notify_and_unblock(&self, id: GcCompactionJobId) {
     354            0 :         info!("compaction job id={} finished", id);
     355            0 :         let mut guard = self.inner.lock().unwrap();
     356            0 :         if let Some(items) = guard.guards.remove(&id) {
     357            0 :             if let Some(tx) = items.notify {
     358            0 :                 let _ = tx.send(());
     359            0 :             }
     360            0 :         }
     361            0 :     }
     362              : 
     363            0 :     fn clear_running_job(&self) {
     364            0 :         let mut guard = self.inner.lock().unwrap();
     365            0 :         guard.running = None;
     366            0 :     }
     367              : 
     368            0 :     async fn handle_sub_compaction(
     369            0 :         &self,
     370            0 :         id: GcCompactionJobId,
     371            0 :         options: CompactOptions,
     372            0 :         timeline: &Arc<Timeline>,
     373            0 :         auto: bool,
     374            0 :     ) -> Result<(), CompactionError> {
     375            0 :         info!(
     376            0 :             "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     377              :         );
     378            0 :         let res = timeline
     379            0 :             .gc_compaction_split_jobs(
     380            0 :                 GcCompactJob::from_compact_options(options.clone()),
     381            0 :                 options.sub_compaction_max_job_size_mb,
     382            0 :             )
     383            0 :             .await;
     384            0 :         let jobs = match res {
     385            0 :             Ok(jobs) => jobs,
     386            0 :             Err(err) => {
     387            0 :                 warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
     388            0 :                 self.notify_and_unblock(id);
     389            0 :                 return Err(err);
     390              :             }
     391              :         };
     392            0 :         if jobs.is_empty() {
     393            0 :             info!("no jobs to run, skipping scheduled compaction task");
     394            0 :             self.notify_and_unblock(id);
     395              :         } else {
     396            0 :             let jobs_len = jobs.len();
     397            0 :             let mut pending_tasks = Vec::new();
     398            0 :             // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
     399            0 :             // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
     400            0 :             let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
     401            0 :             for job in jobs {
     402              :                 // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
     403              :                 // until we do further refactors to allow directly call `compact_with_gc`.
     404            0 :                 let mut flags: EnumSet<CompactFlags> = EnumSet::default();
     405            0 :                 flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     406            0 :                 if job.dry_run {
     407            0 :                     flags |= CompactFlags::DryRun;
     408            0 :                 }
     409            0 :                 if options.flags.contains(CompactFlags::YieldForL0) {
     410            0 :                     flags |= CompactFlags::YieldForL0;
     411            0 :                 }
     412            0 :                 let options = CompactOptions {
     413            0 :                     flags,
     414            0 :                     sub_compaction: false,
     415            0 :                     compact_key_range: Some(job.compact_key_range.into()),
     416            0 :                     compact_lsn_range: Some(job.compact_lsn_range.into()),
     417            0 :                     sub_compaction_max_job_size_mb: None,
     418            0 :                 };
     419            0 :                 pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
     420              :             }
     421              : 
     422            0 :             if !auto {
     423            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
     424            0 :             } else {
     425            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
     426            0 :             }
     427              : 
     428              :             {
     429            0 :                 let mut guard = self.inner.lock().unwrap();
     430            0 :                 let mut tasks = Vec::new();
     431            0 :                 for task in pending_tasks {
     432            0 :                     let id = guard.next_id();
     433            0 :                     tasks.push((id, task));
     434            0 :                 }
     435            0 :                 tasks.reverse();
     436            0 :                 for item in tasks {
     437            0 :                     guard.queued.push_front(item);
     438            0 :                 }
     439              :             }
     440            0 :             info!(
     441            0 :                 "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
     442              :                 jobs_len
     443              :             );
     444              :         }
     445            0 :         Ok(())
     446            0 :     }
     447              : 
     448              :     /// Take a job from the queue and process it. Returns if there are still pending tasks.
     449            0 :     pub async fn iteration(
     450            0 :         &self,
     451            0 :         cancel: &CancellationToken,
     452            0 :         ctx: &RequestContext,
     453            0 :         gc_block: &GcBlock,
     454            0 :         timeline: &Arc<Timeline>,
     455            0 :     ) -> Result<CompactionOutcome, CompactionError> {
     456            0 :         let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
     457            0 :         if let Err(err) = &res {
     458            0 :             log_compaction_error(err, None, cancel.is_cancelled(), true);
     459            0 :         }
     460            0 :         match res {
     461            0 :             Ok(res) => Ok(res),
     462            0 :             Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
     463              :             Err(_) => {
     464              :                 // There are some cases where traditional gc might collect some layer
     465              :                 // files causing gc-compaction cannot read the full history of the key.
     466              :                 // This needs to be resolved in the long-term by improving the compaction
     467              :                 // process. For now, let's simply avoid such errors triggering the
     468              :                 // circuit breaker.
     469            0 :                 Ok(CompactionOutcome::Skipped)
     470              :             }
     471              :         }
     472            0 :     }
     473              : 
     474            0 :     async fn iteration_inner(
     475            0 :         &self,
     476            0 :         cancel: &CancellationToken,
     477            0 :         ctx: &RequestContext,
     478            0 :         gc_block: &GcBlock,
     479            0 :         timeline: &Arc<Timeline>,
     480            0 :     ) -> Result<CompactionOutcome, CompactionError> {
     481            0 :         let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
     482            0 :             return Err(CompactionError::AlreadyRunning(
     483            0 :                 "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
     484            0 :             ));
     485              :         };
     486              :         let has_pending_tasks;
     487            0 :         let mut yield_for_l0 = false;
     488            0 :         let Some((id, item)) = ({
     489            0 :             let mut guard = self.inner.lock().unwrap();
     490            0 :             if let Some((id, item)) = guard.queued.pop_front() {
     491            0 :                 guard.running = Some((id, item.clone()));
     492            0 :                 has_pending_tasks = !guard.queued.is_empty();
     493            0 :                 Some((id, item))
     494              :             } else {
     495            0 :                 has_pending_tasks = false;
     496            0 :                 None
     497              :             }
     498              :         }) else {
     499            0 :             self.trigger_auto_compaction(timeline).await?;
     500              :             // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
     501              :             // have not implemented preemption mechanism yet. We always want to yield it to more important
     502              :             // tasks if there is one.
     503            0 :             return Ok(CompactionOutcome::Done);
     504              :         };
     505            0 :         match item {
     506            0 :             GcCompactionQueueItem::MetaJob { options, auto } => {
     507            0 :                 if !options
     508            0 :                     .flags
     509            0 :                     .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     510              :                 {
     511            0 :                     warn!(
     512            0 :                         "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
     513              :                         options
     514              :                     );
     515            0 :                 } else if options.sub_compaction {
     516            0 :                     info!(
     517            0 :                         "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     518              :                     );
     519            0 :                     self.handle_sub_compaction(id, options, timeline, auto)
     520            0 :                         .await?;
     521              :                 } else {
     522              :                     // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
     523              :                     // in this branch.
     524            0 :                     let _gc_guard = match gc_block.start().await {
     525            0 :                         Ok(guard) => guard,
     526            0 :                         Err(e) => {
     527            0 :                             self.notify_and_unblock(id);
     528            0 :                             self.clear_running_job();
     529            0 :                             return Err(CompactionError::Other(anyhow!(
     530            0 :                                 "cannot run gc-compaction because gc is blocked: {}",
     531            0 :                                 e
     532            0 :                             )));
     533              :                         }
     534              :                     };
     535            0 :                     let res = timeline.compact_with_options(cancel, options, ctx).await;
     536            0 :                     let compaction_result = match res {
     537            0 :                         Ok(res) => res,
     538            0 :                         Err(err) => {
     539            0 :                             warn!(%err, "failed to run gc-compaction");
     540            0 :                             self.notify_and_unblock(id);
     541            0 :                             self.clear_running_job();
     542            0 :                             return Err(err);
     543              :                         }
     544              :                     };
     545            0 :                     if compaction_result == CompactionOutcome::YieldForL0 {
     546            0 :                         yield_for_l0 = true;
     547            0 :                     }
     548              :                 }
     549              :             }
     550            0 :             GcCompactionQueueItem::SubCompactionJob(options) => {
     551              :                 // TODO: error handling, clear the queue if any task fails?
     552            0 :                 let _gc_guard = match gc_block.start().await {
     553            0 :                     Ok(guard) => guard,
     554            0 :                     Err(e) => {
     555            0 :                         self.clear_running_job();
     556            0 :                         return Err(CompactionError::Other(anyhow!(
     557            0 :                             "cannot run gc-compaction because gc is blocked: {}",
     558            0 :                             e
     559            0 :                         )));
     560              :                     }
     561              :                 };
     562            0 :                 let res = timeline.compact_with_options(cancel, options, ctx).await;
     563            0 :                 let compaction_result = match res {
     564            0 :                     Ok(res) => res,
     565            0 :                     Err(err) => {
     566            0 :                         warn!(%err, "failed to run gc-compaction subcompaction job");
     567            0 :                         self.clear_running_job();
     568            0 :                         return Err(err);
     569              :                     }
     570              :                 };
     571            0 :                 if compaction_result == CompactionOutcome::YieldForL0 {
     572            0 :                     // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
     573            0 :                     // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
     574            0 :                     // we need to clean things up before returning from the function.
     575            0 :                     yield_for_l0 = true;
     576            0 :                 }
     577              :             }
     578            0 :             GcCompactionQueueItem::Notify(id, l2_lsn) => {
     579            0 :                 self.notify_and_unblock(id);
     580            0 :                 if let Some(l2_lsn) = l2_lsn {
     581            0 :                     let current_l2_lsn = timeline
     582            0 :                         .get_gc_compaction_state()
     583            0 :                         .map(|x| x.last_completed_lsn)
     584            0 :                         .unwrap_or(Lsn::INVALID);
     585            0 :                     if l2_lsn >= current_l2_lsn {
     586            0 :                         info!("l2_lsn updated to {}", l2_lsn);
     587            0 :                         timeline
     588            0 :                             .update_gc_compaction_state(GcCompactionState {
     589            0 :                                 last_completed_lsn: l2_lsn,
     590            0 :                             })
     591            0 :                             .map_err(CompactionError::Other)?;
     592              :                     } else {
     593            0 :                         warn!(
     594            0 :                             "l2_lsn updated to {} but it is less than the current l2_lsn {}",
     595              :                             l2_lsn, current_l2_lsn
     596              :                         );
     597              :                     }
     598            0 :                 }
     599              :             }
     600              :         }
     601            0 :         self.clear_running_job();
     602            0 :         Ok(if yield_for_l0 {
     603            0 :             tracing::info!("give up gc-compaction: yield for L0 compaction");
     604            0 :             CompactionOutcome::YieldForL0
     605            0 :         } else if has_pending_tasks {
     606            0 :             CompactionOutcome::Pending
     607              :         } else {
     608            0 :             CompactionOutcome::Done
     609              :         })
     610            0 :     }
     611              : 
     612              :     #[allow(clippy::type_complexity)]
     613            0 :     pub fn remaining_jobs(
     614            0 :         &self,
     615            0 :     ) -> (
     616            0 :         Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     617            0 :         VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     618            0 :     ) {
     619            0 :         let guard = self.inner.lock().unwrap();
     620            0 :         (guard.running.clone(), guard.queued.clone())
     621            0 :     }
     622              : 
     623            0 :     pub fn remaining_jobs_num(&self) -> usize {
     624            0 :         let guard = self.inner.lock().unwrap();
     625            0 :         guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
     626            0 :     }
     627              : }
     628              : 
     629              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
     630              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
     631              : /// called.
     632              : #[derive(Debug, Clone)]
     633              : pub(crate) struct GcCompactJob {
     634              :     pub dry_run: bool,
     635              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
     636              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
     637              :     pub compact_key_range: Range<Key>,
     638              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
     639              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
     640              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
     641              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
     642              :     pub compact_lsn_range: Range<Lsn>,
     643              : }
     644              : 
     645              : impl GcCompactJob {
     646          112 :     pub fn from_compact_options(options: CompactOptions) -> Self {
     647          112 :         GcCompactJob {
     648          112 :             dry_run: options.flags.contains(CompactFlags::DryRun),
     649          112 :             compact_key_range: options
     650          112 :                 .compact_key_range
     651          112 :                 .map(|x| x.into())
     652          112 :                 .unwrap_or(Key::MIN..Key::MAX),
     653          112 :             compact_lsn_range: options
     654          112 :                 .compact_lsn_range
     655          112 :                 .map(|x| x.into())
     656          112 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     657          112 :         }
     658          112 :     }
     659              : }
     660              : 
     661              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     662              : /// and contains the exact layers we want to compact.
     663              : pub struct GcCompactionJobDescription {
     664              :     /// All layers to read in the compaction job
     665              :     selected_layers: Vec<Layer>,
     666              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     667              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     668              :     gc_cutoff: Lsn,
     669              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     670              :     /// generate an image == this LSN.
     671              :     retain_lsns_below_horizon: Vec<Lsn>,
     672              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     673              :     /// \>= this LSN will be kept and will not be rewritten.
     674              :     max_layer_lsn: Lsn,
     675              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     676              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     677              :     /// k-merge within gc-compaction.
     678              :     min_layer_lsn: Lsn,
     679              :     /// Only compact layers overlapping with this range.
     680              :     compaction_key_range: Range<Key>,
     681              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     682              :     /// This field is here solely for debugging. The field will not be read once the compaction
     683              :     /// description is generated.
     684              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     685              : }
     686              : 
     687              : /// The result of bottom-most compaction for a single key at each LSN.
     688              : #[derive(Debug)]
     689              : #[cfg_attr(test, derive(PartialEq))]
     690              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     691              : 
     692              : /// The result of bottom-most compaction.
     693              : #[derive(Debug)]
     694              : #[cfg_attr(test, derive(PartialEq))]
     695              : pub(crate) struct KeyHistoryRetention {
     696              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     697              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     698              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     699              :     pub(crate) above_horizon: KeyLogAtLsn,
     700              : }
     701              : 
     702              : impl KeyHistoryRetention {
     703              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     704              :     ///
     705              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     706              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     707              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     708              :     /// Then we delete branch @ 0x20.
     709              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     710              :     /// And that wouldnt' change the shape of the layer.
     711              :     ///
     712              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     713              :     ///
     714              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     715          148 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     716          148 :         if dry_run {
     717            0 :             return true;
     718          148 :         }
     719          148 :         if LayerMap::is_l0(&key.key_range, key.is_delta) {
     720              :             // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
     721              :             // We should ignore such layers.
     722            0 :             return true;
     723          148 :         }
     724              :         let layer_generation;
     725              :         {
     726          148 :             let guard = tline.layers.read().await;
     727          148 :             if !guard.contains_key(key) {
     728          104 :                 return false;
     729           44 :             }
     730           44 :             layer_generation = guard.get_from_key(key).metadata().generation;
     731           44 :         }
     732           44 :         if layer_generation == tline.generation {
     733           44 :             info!(
     734              :                 key=%key,
     735              :                 ?layer_generation,
     736            0 :                 "discard layer due to duplicated layer key in the same generation",
     737              :             );
     738           44 :             true
     739              :         } else {
     740            0 :             false
     741              :         }
     742          148 :     }
     743              : 
     744              :     /// Pipe a history of a single key to the writers.
     745              :     ///
     746              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     747              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     748              :     #[allow(clippy::too_many_arguments)]
     749         1276 :     async fn pipe_to(
     750         1276 :         self,
     751         1276 :         key: Key,
     752         1276 :         delta_writer: &mut SplitDeltaLayerWriter,
     753         1276 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     754         1276 :         stat: &mut CompactionStatistics,
     755         1276 :         ctx: &RequestContext,
     756         1276 :     ) -> anyhow::Result<()> {
     757         1276 :         let mut first_batch = true;
     758         4088 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     759         2812 :             if first_batch {
     760         1276 :                 if logs.len() == 1 && logs[0].1.is_image() {
     761         1200 :                     let Value::Image(img) = &logs[0].1 else {
     762            0 :                         unreachable!()
     763              :                     };
     764         1200 :                     stat.produce_image_key(img);
     765         1200 :                     if let Some(image_writer) = image_writer.as_mut() {
     766         1200 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     767              :                     } else {
     768            0 :                         delta_writer
     769            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     770            0 :                             .await?;
     771              :                     }
     772              :                 } else {
     773          132 :                     for (lsn, val) in logs {
     774           56 :                         stat.produce_key(&val);
     775           56 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     776              :                     }
     777              :                 }
     778         1276 :                 first_batch = false;
     779              :             } else {
     780         1768 :                 for (lsn, val) in logs {
     781          232 :                     stat.produce_key(&val);
     782          232 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     783              :                 }
     784              :             }
     785              :         }
     786         1276 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     787         1392 :         for (lsn, val) in above_horizon_logs {
     788          116 :             stat.produce_key(&val);
     789          116 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     790              :         }
     791         1276 :         Ok(())
     792         1276 :     }
     793              : 
     794              :     /// Verify if every key in the retention is readable by replaying the logs.
     795         1292 :     async fn verify(
     796         1292 :         &self,
     797         1292 :         key: Key,
     798         1292 :         base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
     799         1292 :         full_history: &[(Key, Lsn, Value)],
     800         1292 :         tline: &Arc<Timeline>,
     801         1292 :     ) -> anyhow::Result<()> {
     802              :         // Usually the min_lsn should be the first record but we do a full iteration to be safe.
     803         1832 :         let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
     804              :             // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
     805            0 :             return Ok(());
     806              :         };
     807         1832 :         let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
     808              :             // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
     809            0 :             return Ok(());
     810              :         };
     811         1292 :         let mut base_img = base_img_from_ancestor
     812         1292 :             .as_ref()
     813         1292 :             .map(|(_, lsn, img)| (*lsn, img));
     814         1292 :         let mut history = Vec::new();
     815              : 
     816         4108 :         async fn collect_and_verify(
     817         4108 :             key: Key,
     818         4108 :             lsn: Lsn,
     819         4108 :             base_img: &Option<(Lsn, &Bytes)>,
     820         4108 :             history: &[(Lsn, &NeonWalRecord)],
     821         4108 :             tline: &Arc<Timeline>,
     822         4108 :         ) -> anyhow::Result<()> {
     823         4108 :             let mut records = history
     824         4108 :                 .iter()
     825         4108 :                 .map(|(lsn, val)| (*lsn, (*val).clone()))
     826         4108 :                 .collect::<Vec<_>>();
     827         4108 : 
     828         4108 :             // WAL redo requires records in the reverse LSN order
     829         4108 :             records.reverse();
     830         4108 :             let data = ValueReconstructState {
     831         4108 :                 img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
     832         4108 :                 records,
     833         4108 :             };
     834         4108 : 
     835         4108 :             tline
     836         4108 :                 .reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
     837         4108 :                 .await
     838         4108 :                 .with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
     839              : 
     840         4108 :             Ok(())
     841         4108 :         }
     842              : 
     843         4144 :         for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
     844         4384 :             for (lsn, val) in logs {
     845          304 :                 match val {
     846         1228 :                     Value::Image(img) => {
     847         1228 :                         base_img = Some((*lsn, img));
     848         1228 :                         history.clear();
     849         1228 :                     }
     850          304 :                     Value::WalRecord(rec) if val.will_init() => {
     851            0 :                         base_img = None;
     852            0 :                         history.clear();
     853            0 :                         history.push((*lsn, rec));
     854            0 :                     }
     855          304 :                     Value::WalRecord(rec) => {
     856          304 :                         history.push((*lsn, rec));
     857          304 :                     }
     858              :                 }
     859              :             }
     860         2852 :             if *retain_lsn >= min_lsn {
     861              :                 // Only verify after the key appears in the full history for the first time.
     862              : 
     863         2796 :                 if base_img.is_none() && history.is_empty() {
     864            0 :                     anyhow::bail!(
     865            0 :                         "verificatoin failed: key {} has no history at {}",
     866            0 :                         key,
     867            0 :                         retain_lsn
     868            0 :                     );
     869         2796 :                 };
     870         2796 :                 // We don't modify history: in theory, we could replace the history with a single
     871         2796 :                 // image as in `generate_key_retention` to make redos at later LSNs faster. But we
     872         2796 :                 // want to verify everything as if they are read from the real layer map.
     873         2796 :                 collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
     874           56 :             }
     875              :         }
     876              : 
     877         1440 :         for (lsn, val) in &self.above_horizon.0 {
     878          128 :             match val {
     879           20 :                 Value::Image(img) => {
     880           20 :                     // Above the GC horizon, we verify every time we see an image.
     881           20 :                     collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
     882           20 :                     base_img = Some((*lsn, img));
     883           20 :                     history.clear();
     884              :                 }
     885          128 :                 Value::WalRecord(rec) if val.will_init() => {
     886            0 :                     // Above the GC horizon, we verify every time we see an init record.
     887            0 :                     collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
     888            0 :                     base_img = None;
     889            0 :                     history.clear();
     890            0 :                     history.push((*lsn, rec));
     891              :                 }
     892          128 :                 Value::WalRecord(rec) => {
     893          128 :                     history.push((*lsn, rec));
     894          128 :                 }
     895              :             }
     896              :         }
     897              :         // Ensure the latest record is readable.
     898         1292 :         collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
     899         1292 :         Ok(())
     900         1292 :     }
     901              : }
     902              : 
     903              : #[derive(Debug, Serialize, Default)]
     904              : struct CompactionStatisticsNumSize {
     905              :     num: u64,
     906              :     size: u64,
     907              : }
     908              : 
     909              : #[derive(Debug, Serialize, Default)]
     910              : pub struct CompactionStatistics {
     911              :     /// Delta layer visited (maybe compressed, physical size)
     912              :     delta_layer_visited: CompactionStatisticsNumSize,
     913              :     /// Image layer visited (maybe compressed, physical size)
     914              :     image_layer_visited: CompactionStatisticsNumSize,
     915              :     /// Delta layer produced (maybe compressed, physical size)
     916              :     delta_layer_produced: CompactionStatisticsNumSize,
     917              :     /// Image layer produced (maybe compressed, physical size)
     918              :     image_layer_produced: CompactionStatisticsNumSize,
     919              :     /// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
     920              :     delta_layer_discarded: CompactionStatisticsNumSize,
     921              :     /// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
     922              :     image_layer_discarded: CompactionStatisticsNumSize,
     923              :     num_unique_keys_visited: usize,
     924              :     /// Delta visited (uncompressed, original size)
     925              :     wal_keys_visited: CompactionStatisticsNumSize,
     926              :     /// Image visited (uncompressed, original size)
     927              :     image_keys_visited: CompactionStatisticsNumSize,
     928              :     /// Delta produced (uncompressed, original size)
     929              :     wal_produced: CompactionStatisticsNumSize,
     930              :     /// Image produced (uncompressed, original size)
     931              :     image_produced: CompactionStatisticsNumSize,
     932              : 
     933              :     // Time spent in each phase
     934              :     time_acquire_lock_secs: f64,
     935              :     time_analyze_secs: f64,
     936              :     time_download_layer_secs: f64,
     937              :     time_to_first_kv_pair_secs: f64,
     938              :     time_main_loop_secs: f64,
     939              :     time_final_phase_secs: f64,
     940              :     time_total_secs: f64,
     941              : 
     942              :     // Summary
     943              :     /// Ratio of the key-value size after/before gc-compaction.
     944              :     uncompressed_retention_ratio: f64,
     945              :     /// Ratio of the physical size after/before gc-compaction.
     946              :     compressed_retention_ratio: f64,
     947              : }
     948              : 
     949              : impl CompactionStatistics {
     950         2136 :     fn estimated_size_of_value(val: &Value) -> usize {
     951          876 :         match val {
     952         1260 :             Value::Image(img) => img.len(),
     953            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     954          876 :             _ => std::mem::size_of::<NeonWalRecord>(),
     955              :         }
     956         2136 :     }
     957         3356 :     fn estimated_size_of_key() -> usize {
     958         3356 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     959         3356 :     }
     960          176 :     fn visit_delta_layer(&mut self, size: u64) {
     961          176 :         self.delta_layer_visited.num += 1;
     962          176 :         self.delta_layer_visited.size += size;
     963          176 :     }
     964          140 :     fn visit_image_layer(&mut self, size: u64) {
     965          140 :         self.image_layer_visited.num += 1;
     966          140 :         self.image_layer_visited.size += size;
     967          140 :     }
     968         1280 :     fn on_unique_key_visited(&mut self) {
     969         1280 :         self.num_unique_keys_visited += 1;
     970         1280 :     }
     971          492 :     fn visit_wal_key(&mut self, val: &Value) {
     972          492 :         self.wal_keys_visited.num += 1;
     973          492 :         self.wal_keys_visited.size +=
     974          492 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     975          492 :     }
     976         1260 :     fn visit_image_key(&mut self, val: &Value) {
     977         1260 :         self.image_keys_visited.num += 1;
     978         1260 :         self.image_keys_visited.size +=
     979         1260 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     980         1260 :     }
     981          404 :     fn produce_key(&mut self, val: &Value) {
     982          404 :         match val {
     983           20 :             Value::Image(img) => self.produce_image_key(img),
     984          384 :             Value::WalRecord(_) => self.produce_wal_key(val),
     985              :         }
     986          404 :     }
     987          384 :     fn produce_wal_key(&mut self, val: &Value) {
     988          384 :         self.wal_produced.num += 1;
     989          384 :         self.wal_produced.size +=
     990          384 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     991          384 :     }
     992         1220 :     fn produce_image_key(&mut self, val: &Bytes) {
     993         1220 :         self.image_produced.num += 1;
     994         1220 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     995         1220 :     }
     996           28 :     fn discard_delta_layer(&mut self, original_size: u64) {
     997           28 :         self.delta_layer_discarded.num += 1;
     998           28 :         self.delta_layer_discarded.size += original_size;
     999           28 :     }
    1000           16 :     fn discard_image_layer(&mut self, original_size: u64) {
    1001           16 :         self.image_layer_discarded.num += 1;
    1002           16 :         self.image_layer_discarded.size += original_size;
    1003           16 :     }
    1004           48 :     fn produce_delta_layer(&mut self, size: u64) {
    1005           48 :         self.delta_layer_produced.num += 1;
    1006           48 :         self.delta_layer_produced.size += size;
    1007           48 :     }
    1008           60 :     fn produce_image_layer(&mut self, size: u64) {
    1009           60 :         self.image_layer_produced.num += 1;
    1010           60 :         self.image_layer_produced.size += size;
    1011           60 :     }
    1012          104 :     fn finalize(&mut self) {
    1013          104 :         let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
    1014          104 :         let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
    1015          104 :         self.uncompressed_retention_ratio =
    1016          104 :             produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
    1017          104 :         let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
    1018          104 :         let produced_physical_size = self.image_layer_produced.size
    1019          104 :             + self.delta_layer_produced.size
    1020          104 :             + self.image_layer_discarded.size
    1021          104 :             + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
    1022          104 :         self.compressed_retention_ratio =
    1023          104 :             produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
    1024          104 :     }
    1025              : }
    1026              : 
    1027              : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
    1028              : pub enum CompactionOutcome {
    1029              :     #[default]
    1030              :     /// No layers need to be compacted after this round. Compaction doesn't need
    1031              :     /// to be immediately scheduled.
    1032              :     Done,
    1033              :     /// Still has pending layers to be compacted after this round. Ideally, the scheduler
    1034              :     /// should immediately schedule another compaction.
    1035              :     Pending,
    1036              :     /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
    1037              :     /// guaranteed when `compaction_l0_first` is enabled).
    1038              :     YieldForL0,
    1039              :     /// Compaction was skipped, because the timeline is ineligible for compaction.
    1040              :     Skipped,
    1041              : }
    1042              : 
    1043              : impl Timeline {
    1044              :     /// TODO: cancellation
    1045              :     ///
    1046              :     /// Returns whether the compaction has pending tasks.
    1047          728 :     pub(crate) async fn compact_legacy(
    1048          728 :         self: &Arc<Self>,
    1049          728 :         cancel: &CancellationToken,
    1050          728 :         options: CompactOptions,
    1051          728 :         ctx: &RequestContext,
    1052          728 :     ) -> Result<CompactionOutcome, CompactionError> {
    1053          728 :         if options
    1054          728 :             .flags
    1055          728 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
    1056              :         {
    1057            0 :             self.compact_with_gc(cancel, options, ctx).await?;
    1058            0 :             return Ok(CompactionOutcome::Done);
    1059          728 :         }
    1060          728 : 
    1061          728 :         if options.flags.contains(CompactFlags::DryRun) {
    1062            0 :             return Err(CompactionError::Other(anyhow!(
    1063            0 :                 "dry-run mode is not supported for legacy compaction for now"
    1064            0 :             )));
    1065          728 :         }
    1066          728 : 
    1067          728 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
    1068              :             // maybe useful in the future? could implement this at some point
    1069            0 :             return Err(CompactionError::Other(anyhow!(
    1070            0 :                 "compaction range is not supported for legacy compaction for now"
    1071            0 :             )));
    1072          728 :         }
    1073          728 : 
    1074          728 :         // High level strategy for compaction / image creation:
    1075          728 :         //
    1076          728 :         // 1. First, do a L0 compaction to ensure we move the L0
    1077          728 :         // layers into the historic layer map get flat levels of
    1078          728 :         // layers. If we did not compact all L0 layers, we will
    1079          728 :         // prioritize compacting the timeline again and not do
    1080          728 :         // any of the compactions below.
    1081          728 :         //
    1082          728 :         // 2. Then, calculate the desired "partitioning" of the
    1083          728 :         // currently in-use key space. The goal is to partition the
    1084          728 :         // key space into roughly fixed-size chunks, but also take into
    1085          728 :         // account any existing image layers, and try to align the
    1086          728 :         // chunk boundaries with the existing image layers to avoid
    1087          728 :         // too much churn. Also try to align chunk boundaries with
    1088          728 :         // relation boundaries.  In principle, we don't know about
    1089          728 :         // relation boundaries here, we just deal with key-value
    1090          728 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
    1091          728 :         // map relations into key-value pairs. But in practice we know
    1092          728 :         // that 'field6' is the block number, and the fields 1-5
    1093          728 :         // identify a relation. This is just an optimization,
    1094          728 :         // though.
    1095          728 :         //
    1096          728 :         // 3. Once we know the partitioning, for each partition,
    1097          728 :         // decide if it's time to create a new image layer. The
    1098          728 :         // criteria is: there has been too much "churn" since the last
    1099          728 :         // image layer? The "churn" is fuzzy concept, it's a
    1100          728 :         // combination of too many delta files, or too much WAL in
    1101          728 :         // total in the delta file. Or perhaps: if creating an image
    1102          728 :         // file would allow to delete some older files.
    1103          728 :         //
    1104          728 :         // 4. In the end, if the tenant gets auto-sharded, we will run
    1105          728 :         // a shard-ancestor compaction.
    1106          728 : 
    1107          728 :         // Is the timeline being deleted?
    1108          728 :         if self.is_stopping() {
    1109            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1110            0 :             return Err(CompactionError::ShuttingDown);
    1111          728 :         }
    1112          728 : 
    1113          728 :         let target_file_size = self.get_checkpoint_distance();
    1114              : 
    1115              :         // Define partitioning schema if needed
    1116              : 
    1117              :         // 1. L0 Compact
    1118          728 :         let l0_outcome = {
    1119          728 :             let timer = self.metrics.compact_time_histo.start_timer();
    1120          728 :             let l0_outcome = self
    1121          728 :                 .compact_level0(
    1122          728 :                     target_file_size,
    1123          728 :                     options.flags.contains(CompactFlags::ForceL0Compaction),
    1124          728 :                     ctx,
    1125          728 :                 )
    1126          728 :                 .await?;
    1127          728 :             timer.stop_and_record();
    1128          728 :             l0_outcome
    1129          728 :         };
    1130          728 : 
    1131          728 :         if options.flags.contains(CompactFlags::OnlyL0Compaction) {
    1132            0 :             return Ok(l0_outcome);
    1133          728 :         }
    1134          728 : 
    1135          728 :         // Yield if we have pending L0 compaction. The scheduler will do another pass.
    1136          728 :         if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
    1137            0 :             && options.flags.contains(CompactFlags::YieldForL0)
    1138              :         {
    1139            0 :             info!("image/ancestor compaction yielding for L0 compaction");
    1140            0 :             return Ok(CompactionOutcome::YieldForL0);
    1141          728 :         }
    1142          728 : 
    1143          728 :         // 2. Repartition and create image layers if necessary
    1144          728 :         match self
    1145          728 :             .repartition(
    1146          728 :                 self.get_last_record_lsn(),
    1147          728 :                 self.get_compaction_target_size(),
    1148          728 :                 options.flags,
    1149          728 :                 ctx,
    1150          728 :             )
    1151          728 :             .await
    1152              :         {
    1153          728 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
    1154          728 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
    1155          728 :                 let image_ctx = RequestContextBuilder::from(ctx)
    1156          728 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
    1157          728 :                     .attached_child();
    1158          728 : 
    1159          728 :                 let mut partitioning = dense_partitioning;
    1160          728 :                 partitioning
    1161          728 :                     .parts
    1162          728 :                     .extend(sparse_partitioning.into_dense().parts);
    1163              : 
    1164              :                 // 3. Create new image layers for partitions that have been modified "enough".
    1165          728 :                 let (image_layers, outcome) = self
    1166          728 :                     .create_image_layers(
    1167          728 :                         &partitioning,
    1168          728 :                         lsn,
    1169          728 :                         if options
    1170          728 :                             .flags
    1171          728 :                             .contains(CompactFlags::ForceImageLayerCreation)
    1172              :                         {
    1173           28 :                             ImageLayerCreationMode::Force
    1174              :                         } else {
    1175          700 :                             ImageLayerCreationMode::Try
    1176              :                         },
    1177          728 :                         &image_ctx,
    1178          728 :                         self.last_image_layer_creation_status
    1179          728 :                             .load()
    1180          728 :                             .as_ref()
    1181          728 :                             .clone(),
    1182          728 :                         options.flags.contains(CompactFlags::YieldForL0),
    1183          728 :                     )
    1184          728 :                     .await
    1185          728 :                     .inspect_err(|err| {
    1186              :                         if let CreateImageLayersError::GetVectoredError(
    1187              :                             GetVectoredError::MissingKey(_),
    1188            0 :                         ) = err
    1189              :                         {
    1190            0 :                             critical!("missing key during compaction: {err:?}");
    1191            0 :                         }
    1192          728 :                     })?;
    1193              : 
    1194          728 :                 self.last_image_layer_creation_status
    1195          728 :                     .store(Arc::new(outcome.clone()));
    1196          728 : 
    1197          728 :                 self.upload_new_image_layers(image_layers)?;
    1198          728 :                 if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
    1199              :                     // Yield and do not do any other kind of compaction.
    1200            0 :                     info!(
    1201            0 :                         "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
    1202              :                     );
    1203            0 :                     return Ok(CompactionOutcome::YieldForL0);
    1204          728 :                 }
    1205              :             }
    1206              : 
    1207              :             // Suppress errors when cancelled.
    1208            0 :             Err(_) if self.cancel.is_cancelled() => {}
    1209            0 :             Err(err) if err.is_cancel() => {}
    1210              : 
    1211              :             // Alert on critical errors that indicate data corruption.
    1212            0 :             Err(err) if err.is_critical() => {
    1213            0 :                 critical!("could not compact, repartitioning keyspace failed: {err:?}");
    1214              :             }
    1215              : 
    1216              :             // Log other errors. No partitioning? This is normal, if the timeline was just created
    1217              :             // as an empty timeline. Also in unit tests, when we use the timeline as a simple
    1218              :             // key-value store, ignoring the datadir layout. Log the error but continue.
    1219            0 :             Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
    1220              :         };
    1221              : 
    1222          728 :         let partition_count = self.partitioning.read().0.0.parts.len();
    1223          728 : 
    1224          728 :         // 4. Shard ancestor compaction
    1225          728 : 
    1226          728 :         if self.shard_identity.count >= ShardCount::new(2) {
    1227              :             // Limit the number of layer rewrites to the number of partitions: this means its
    1228              :             // runtime should be comparable to a full round of image layer creations, rather than
    1229              :             // being potentially much longer.
    1230            0 :             let rewrite_max = partition_count;
    1231              : 
    1232            0 :             let outcome = self
    1233            0 :                 .compact_shard_ancestors(
    1234            0 :                     rewrite_max,
    1235            0 :                     options.flags.contains(CompactFlags::YieldForL0),
    1236            0 :                     ctx,
    1237            0 :                 )
    1238            0 :                 .await?;
    1239            0 :             match outcome {
    1240            0 :                 CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
    1241            0 :                 CompactionOutcome::Done | CompactionOutcome::Skipped => {}
    1242              :             }
    1243          728 :         }
    1244              : 
    1245          728 :         Ok(CompactionOutcome::Done)
    1246          728 :     }
    1247              : 
    1248              :     /// Check for layers that are elegible to be rewritten:
    1249              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
    1250              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
    1251              :     /// - For future use: layers beyond pitr_interval that are in formats we would
    1252              :     ///   rather not maintain compatibility with indefinitely.
    1253              :     ///
    1254              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
    1255              :     /// how much work it will try to do in each compaction pass.
    1256            0 :     async fn compact_shard_ancestors(
    1257            0 :         self: &Arc<Self>,
    1258            0 :         rewrite_max: usize,
    1259            0 :         yield_for_l0: bool,
    1260            0 :         ctx: &RequestContext,
    1261            0 :     ) -> Result<CompactionOutcome, CompactionError> {
    1262            0 :         let mut outcome = CompactionOutcome::Done;
    1263            0 :         let mut drop_layers = Vec::new();
    1264            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
    1265            0 : 
    1266            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
    1267            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
    1268            0 :         // pitr_interval, for example because a branchpoint references it.
    1269            0 :         //
    1270            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
    1271            0 :         // are rewriting layers.
    1272            0 :         let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
    1273            0 :         let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
    1274              : 
    1275            0 :         let layers = self.layers.read().await;
    1276            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
    1277            0 :             let layer = layers.get_from_desc(&layer_desc);
    1278            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
    1279              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
    1280            0 :                 continue;
    1281            0 :             }
    1282            0 : 
    1283            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
    1284            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
    1285            0 :             let layer_local_page_count = sharded_range.page_count();
    1286            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
    1287            0 :             if layer_local_page_count == 0 {
    1288              :                 // This ancestral layer only covers keys that belong to other shards.
    1289              :                 // We include the full metadata in the log: if we had some critical bug that caused
    1290              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
    1291            0 :                 debug!(%layer, old_metadata=?layer.metadata(),
    1292            0 :                     "dropping layer after shard split, contains no keys for this shard",
    1293              :                 );
    1294              : 
    1295            0 :                 if cfg!(debug_assertions) {
    1296              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
    1297              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
    1298              :                     // should be !is_key_disposable()
    1299              :                     // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
    1300            0 :                     let range = layer_desc.get_key_range();
    1301            0 :                     let mut key = range.start;
    1302            0 :                     while key < range.end {
    1303            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
    1304            0 :                         key = key.next();
    1305              :                     }
    1306            0 :                 }
    1307              : 
    1308            0 :                 drop_layers.push(layer);
    1309            0 :                 continue;
    1310            0 :             } else if layer_local_page_count != u32::MAX
    1311            0 :                 && layer_local_page_count == layer_raw_page_count
    1312              :             {
    1313            0 :                 debug!(%layer,
    1314            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
    1315              :                     layer_local_page_count
    1316              :                 );
    1317            0 :                 continue;
    1318            0 :             }
    1319            0 : 
    1320            0 :             // Don't bother re-writing a layer unless it will at least halve its size
    1321            0 :             if layer_local_page_count != u32::MAX
    1322            0 :                 && layer_local_page_count > layer_raw_page_count / 2
    1323              :             {
    1324            0 :                 debug!(%layer,
    1325            0 :                     "layer is already mostly local ({}/{}), not rewriting",
    1326              :                     layer_local_page_count,
    1327              :                     layer_raw_page_count
    1328              :                 );
    1329            0 :             }
    1330              : 
    1331              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
    1332              :             // without incurring the I/O cost of a rewrite.
    1333            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
    1334            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
    1335            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
    1336            0 :                 continue;
    1337            0 :             }
    1338            0 : 
    1339            0 :             if layer_desc.is_delta() {
    1340              :                 // We do not yet implement rewrite of delta layers
    1341            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
    1342            0 :                 continue;
    1343            0 :             }
    1344            0 : 
    1345            0 :             // Only rewrite layers if their generations differ.  This guarantees:
    1346            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
    1347            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
    1348            0 :             if layer.metadata().generation == self.generation {
    1349            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
    1350            0 :                 continue;
    1351            0 :             }
    1352            0 : 
    1353            0 :             if layers_to_rewrite.len() >= rewrite_max {
    1354            0 :                 debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
    1355            0 :                     layers_to_rewrite.len()
    1356              :                 );
    1357            0 :                 outcome = CompactionOutcome::Pending;
    1358            0 :                 break;
    1359            0 :             }
    1360            0 : 
    1361            0 :             // Fall through: all our conditions for doing a rewrite passed.
    1362            0 :             layers_to_rewrite.push(layer);
    1363              :         }
    1364              : 
    1365              :         // Drop read lock on layer map before we start doing time-consuming I/O.
    1366            0 :         drop(layers);
    1367            0 : 
    1368            0 :         // Drop out early if there's nothing to do.
    1369            0 :         if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
    1370            0 :             return Ok(CompactionOutcome::Done);
    1371            0 :         }
    1372            0 : 
    1373            0 :         info!(
    1374            0 :             "starting shard ancestor compaction, rewriting {} layers and dropping {} layers \
    1375            0 :                 (latest_gc_cutoff={} pitr_cutoff={})",
    1376            0 :             layers_to_rewrite.len(),
    1377            0 :             drop_layers.len(),
    1378            0 :             *latest_gc_cutoff,
    1379              :             pitr_cutoff,
    1380              :         );
    1381            0 :         let started = Instant::now();
    1382            0 : 
    1383            0 :         let mut replace_image_layers = Vec::new();
    1384              : 
    1385            0 :         for layer in layers_to_rewrite {
    1386            0 :             if self.cancel.is_cancelled() {
    1387            0 :                 return Err(CompactionError::ShuttingDown);
    1388            0 :             }
    1389            0 : 
    1390            0 :             info!(layer=%layer, "rewriting layer after shard split");
    1391            0 :             let mut image_layer_writer = ImageLayerWriter::new(
    1392            0 :                 self.conf,
    1393            0 :                 self.timeline_id,
    1394            0 :                 self.tenant_shard_id,
    1395            0 :                 &layer.layer_desc().key_range,
    1396            0 :                 layer.layer_desc().image_layer_lsn(),
    1397            0 :                 ctx,
    1398            0 :             )
    1399            0 :             .await
    1400            0 :             .map_err(CompactionError::Other)?;
    1401              : 
    1402              :             // Safety of layer rewrites:
    1403              :             // - We are writing to a different local file path than we are reading from, so the old Layer
    1404              :             //   cannot interfere with the new one.
    1405              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
    1406              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
    1407              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
    1408              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
    1409              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
    1410              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
    1411              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
    1412              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
    1413              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
    1414              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
    1415            0 :             let resident = layer.download_and_keep_resident(ctx).await?;
    1416              : 
    1417            0 :             let keys_written = resident
    1418            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
    1419            0 :                 .await?;
    1420              : 
    1421            0 :             if keys_written > 0 {
    1422            0 :                 let (desc, path) = image_layer_writer
    1423            0 :                     .finish(ctx)
    1424            0 :                     .await
    1425            0 :                     .map_err(CompactionError::Other)?;
    1426            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
    1427            0 :                     .map_err(CompactionError::Other)?;
    1428            0 :                 info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
    1429            0 :                     layer.metadata().file_size,
    1430            0 :                     new_layer.metadata().file_size);
    1431              : 
    1432            0 :                 replace_image_layers.push((layer, new_layer));
    1433            0 :             } else {
    1434            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
    1435            0 :                 // the layer has no data for us with the ShardedRange check above, but
    1436            0 :                 drop_layers.push(layer);
    1437            0 :             }
    1438              : 
    1439              :             // Yield for L0 compaction if necessary, but make sure we update the layer map below
    1440              :             // with the work we've already done.
    1441            0 :             if yield_for_l0
    1442            0 :                 && self
    1443            0 :                     .l0_compaction_trigger
    1444            0 :                     .notified()
    1445            0 :                     .now_or_never()
    1446            0 :                     .is_some()
    1447              :             {
    1448            0 :                 info!("shard ancestor compaction yielding for L0 compaction");
    1449            0 :                 outcome = CompactionOutcome::YieldForL0;
    1450            0 :                 break;
    1451            0 :             }
    1452              :         }
    1453              : 
    1454            0 :         for layer in &drop_layers {
    1455            0 :             info!(%layer, old_metadata=?layer.metadata(),
    1456            0 :                 "dropping layer after shard split (no keys for this shard)",
    1457              :             );
    1458              :         }
    1459              : 
    1460              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
    1461              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
    1462              :         // to remote index) and be removed. This is inefficient but safe.
    1463            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
    1464            0 : 
    1465            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
    1466            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
    1467            0 :             .await?;
    1468              : 
    1469            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
    1470            0 : 
    1471            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
    1472            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
    1473            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
    1474            0 :         // load.
    1475            0 :         if outcome != CompactionOutcome::YieldForL0 {
    1476            0 :             info!("shard ancestor compaction waiting for uploads");
    1477            0 :             tokio::select! {
    1478            0 :                 result = self.remote_client.wait_completion() => match result {
    1479            0 :                     Ok(()) => {},
    1480            0 :                     Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
    1481              :                     Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
    1482            0 :                         return Err(CompactionError::ShuttingDown);
    1483              :                     }
    1484              :                 },
    1485              :                 // Don't wait if there's L0 compaction to do. We don't need to update the outcome
    1486              :                 // here, because we've already done the actual work.
    1487            0 :                 _ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
    1488              :             }
    1489            0 :         }
    1490              : 
    1491            0 :         info!(
    1492            0 :             "shard ancestor compaction done in {:.3}s{}",
    1493            0 :             started.elapsed().as_secs_f64(),
    1494            0 :             match outcome {
    1495              :                 CompactionOutcome::Pending =>
    1496            0 :                     format!(", with pending work (rewrite_max={rewrite_max})"),
    1497            0 :                 CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
    1498            0 :                 CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
    1499              :             }
    1500              :         );
    1501              : 
    1502            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
    1503            0 : 
    1504            0 :         Ok(outcome)
    1505            0 :     }
    1506              : 
    1507              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
    1508              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
    1509              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
    1510              :     ///
    1511              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
    1512              :     /// that we know won't be needed for reads.
    1513          476 :     pub(crate) async fn update_layer_visibility(
    1514          476 :         &self,
    1515          476 :     ) -> Result<(), super::layer_manager::Shutdown> {
    1516          476 :         let head_lsn = self.get_last_record_lsn();
    1517              : 
    1518              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
    1519              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
    1520              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
    1521              :         // they will be subject to L0->L1 compaction in the near future.
    1522          476 :         let layer_manager = self.layers.read().await;
    1523          476 :         let layer_map = layer_manager.layer_map()?;
    1524              : 
    1525          476 :         let readable_points = {
    1526          476 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
    1527          476 : 
    1528          476 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
    1529          476 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
    1530            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
    1531            0 :                     continue;
    1532            0 :                 }
    1533            0 :                 readable_points.push(*child_lsn);
    1534              :             }
    1535          476 :             readable_points.push(head_lsn);
    1536          476 :             readable_points
    1537          476 :         };
    1538          476 : 
    1539          476 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
    1540         1200 :         for (layer_desc, visibility) in layer_visibility {
    1541          724 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
    1542          724 :             let layer = layer_manager.get_from_desc(&layer_desc);
    1543          724 :             layer.set_visibility(visibility);
    1544          724 :         }
    1545              : 
    1546              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
    1547              :         // avoid assuming that everything at a branch point is visible.
    1548          476 :         drop(covered);
    1549          476 :         Ok(())
    1550          476 :     }
    1551              : 
    1552              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    1553              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
    1554          728 :     async fn compact_level0(
    1555          728 :         self: &Arc<Self>,
    1556          728 :         target_file_size: u64,
    1557          728 :         force_compaction_ignore_threshold: bool,
    1558          728 :         ctx: &RequestContext,
    1559          728 :     ) -> Result<CompactionOutcome, CompactionError> {
    1560              :         let CompactLevel0Phase1Result {
    1561          728 :             new_layers,
    1562          728 :             deltas_to_compact,
    1563          728 :             outcome,
    1564              :         } = {
    1565          728 :             let phase1_span = info_span!("compact_level0_phase1");
    1566          728 :             let ctx = ctx.attached_child();
    1567          728 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    1568          728 :                 version: Some(2),
    1569          728 :                 tenant_id: Some(self.tenant_shard_id),
    1570          728 :                 timeline_id: Some(self.timeline_id),
    1571          728 :                 ..Default::default()
    1572          728 :             };
    1573          728 : 
    1574          728 :             let begin = tokio::time::Instant::now();
    1575          728 :             let phase1_layers_locked = self.layers.read().await;
    1576          728 :             let now = tokio::time::Instant::now();
    1577          728 :             stats.read_lock_acquisition_micros =
    1578          728 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    1579          728 :             self.compact_level0_phase1(
    1580          728 :                 phase1_layers_locked,
    1581          728 :                 stats,
    1582          728 :                 target_file_size,
    1583          728 :                 force_compaction_ignore_threshold,
    1584          728 :                 &ctx,
    1585          728 :             )
    1586          728 :             .instrument(phase1_span)
    1587          728 :             .await?
    1588              :         };
    1589              : 
    1590          728 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    1591              :             // nothing to do
    1592          672 :             return Ok(CompactionOutcome::Done);
    1593           56 :         }
    1594           56 : 
    1595           56 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    1596           56 :             .await?;
    1597           56 :         Ok(outcome)
    1598          728 :     }
    1599              : 
    1600              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    1601          728 :     async fn compact_level0_phase1<'a>(
    1602          728 :         self: &'a Arc<Self>,
    1603          728 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
    1604          728 :         mut stats: CompactLevel0Phase1StatsBuilder,
    1605          728 :         target_file_size: u64,
    1606          728 :         force_compaction_ignore_threshold: bool,
    1607          728 :         ctx: &RequestContext,
    1608          728 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    1609          728 :         stats.read_lock_held_spawn_blocking_startup_micros =
    1610          728 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    1611          728 :         let layers = guard.layer_map()?;
    1612          728 :         let level0_deltas = layers.level0_deltas();
    1613          728 :         stats.level0_deltas_count = Some(level0_deltas.len());
    1614          728 : 
    1615          728 :         // Only compact if enough layers have accumulated.
    1616          728 :         let threshold = self.get_compaction_threshold();
    1617          728 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    1618          672 :             if force_compaction_ignore_threshold {
    1619            0 :                 if !level0_deltas.is_empty() {
    1620            0 :                     info!(
    1621            0 :                         level0_deltas = level0_deltas.len(),
    1622            0 :                         threshold, "too few deltas to compact, but forcing compaction"
    1623              :                     );
    1624              :                 } else {
    1625            0 :                     info!(
    1626            0 :                         level0_deltas = level0_deltas.len(),
    1627            0 :                         threshold, "too few deltas to compact, cannot force compaction"
    1628              :                     );
    1629            0 :                     return Ok(CompactLevel0Phase1Result::default());
    1630              :                 }
    1631              :             } else {
    1632          672 :                 debug!(
    1633            0 :                     level0_deltas = level0_deltas.len(),
    1634            0 :                     threshold, "too few deltas to compact"
    1635              :                 );
    1636          672 :                 return Ok(CompactLevel0Phase1Result::default());
    1637              :             }
    1638           56 :         }
    1639              : 
    1640           56 :         let mut level0_deltas = level0_deltas
    1641           56 :             .iter()
    1642          804 :             .map(|x| guard.get_from_desc(x))
    1643           56 :             .collect::<Vec<_>>();
    1644           56 : 
    1645           56 :         // Gather the files to compact in this iteration.
    1646           56 :         //
    1647           56 :         // Start with the oldest Level 0 delta file, and collect any other
    1648           56 :         // level 0 files that form a contiguous sequence, such that the end
    1649           56 :         // LSN of previous file matches the start LSN of the next file.
    1650           56 :         //
    1651           56 :         // Note that if the files don't form such a sequence, we might
    1652           56 :         // "compact" just a single file. That's a bit pointless, but it allows
    1653           56 :         // us to get rid of the level 0 file, and compact the other files on
    1654           56 :         // the next iteration. This could probably made smarter, but such
    1655           56 :         // "gaps" in the sequence of level 0 files should only happen in case
    1656           56 :         // of a crash, partial download from cloud storage, or something like
    1657           56 :         // that, so it's not a big deal in practice.
    1658         1496 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    1659           56 :         let mut level0_deltas_iter = level0_deltas.iter();
    1660           56 : 
    1661           56 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    1662           56 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    1663           56 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    1664           56 : 
    1665           56 :         // Accumulate the size of layers in `deltas_to_compact`
    1666           56 :         let mut deltas_to_compact_bytes = 0;
    1667           56 : 
    1668           56 :         // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
    1669           56 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
    1670           56 :         // work in this function to only operate on this much delta data at once.
    1671           56 :         //
    1672           56 :         // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
    1673           56 :         // the constraint is not respected, we use the larger of the two.
    1674           56 :         let delta_size_limit = std::cmp::max(
    1675           56 :             self.get_compaction_upper_limit(),
    1676           56 :             self.get_compaction_threshold(),
    1677           56 :         ) as u64
    1678           56 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
    1679           56 : 
    1680           56 :         let mut fully_compacted = true;
    1681           56 : 
    1682           56 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
    1683          804 :         for l in level0_deltas_iter {
    1684          748 :             let lsn_range = &l.layer_desc().lsn_range;
    1685          748 : 
    1686          748 :             if lsn_range.start != prev_lsn_end {
    1687            0 :                 break;
    1688          748 :             }
    1689          748 :             deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
    1690          748 :             deltas_to_compact_bytes += l.metadata().file_size;
    1691          748 :             prev_lsn_end = lsn_range.end;
    1692          748 : 
    1693          748 :             if deltas_to_compact_bytes >= delta_size_limit {
    1694            0 :                 info!(
    1695            0 :                     l0_deltas_selected = deltas_to_compact.len(),
    1696            0 :                     l0_deltas_total = level0_deltas.len(),
    1697            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
    1698              :                     delta_size_limit
    1699              :                 );
    1700            0 :                 fully_compacted = false;
    1701            0 : 
    1702            0 :                 // Proceed with compaction, but only a subset of L0s
    1703            0 :                 break;
    1704          748 :             }
    1705              :         }
    1706           56 :         let lsn_range = Range {
    1707           56 :             start: deltas_to_compact
    1708           56 :                 .first()
    1709           56 :                 .unwrap()
    1710           56 :                 .layer_desc()
    1711           56 :                 .lsn_range
    1712           56 :                 .start,
    1713           56 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    1714           56 :         };
    1715           56 : 
    1716           56 :         info!(
    1717            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    1718            0 :             lsn_range.start,
    1719            0 :             lsn_range.end,
    1720            0 :             deltas_to_compact.len(),
    1721            0 :             level0_deltas.len()
    1722              :         );
    1723              : 
    1724          804 :         for l in deltas_to_compact.iter() {
    1725          804 :             info!("compact includes {l}");
    1726              :         }
    1727              : 
    1728              :         // We don't need the original list of layers anymore. Drop it so that
    1729              :         // we don't accidentally use it later in the function.
    1730           56 :         drop(level0_deltas);
    1731           56 : 
    1732           56 :         stats.read_lock_held_prerequisites_micros = stats
    1733           56 :             .read_lock_held_spawn_blocking_startup_micros
    1734           56 :             .till_now();
    1735              : 
    1736              :         // TODO: replace with streaming k-merge
    1737           56 :         let all_keys = {
    1738           56 :             let mut all_keys = Vec::new();
    1739          804 :             for l in deltas_to_compact.iter() {
    1740          804 :                 if self.cancel.is_cancelled() {
    1741            0 :                     return Err(CompactionError::ShuttingDown);
    1742          804 :                 }
    1743          804 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1744          804 :                 let keys = delta
    1745          804 :                     .index_entries(ctx)
    1746          804 :                     .await
    1747          804 :                     .map_err(CompactionError::Other)?;
    1748          804 :                 all_keys.extend(keys);
    1749              :             }
    1750              :             // The current stdlib sorting implementation is designed in a way where it is
    1751              :             // particularly fast where the slice is made up of sorted sub-ranges.
    1752      8847580 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1753           56 :             all_keys
    1754           56 :         };
    1755           56 : 
    1756           56 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    1757              : 
    1758              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
    1759              :         //
    1760              :         // A hole is a key range for which this compaction doesn't have any WAL records.
    1761              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
    1762              :         // cover the hole, but actually don't contain any WAL records for that key range.
    1763              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
    1764              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
    1765              :         //
    1766              :         // The algorithm chooses holes as follows.
    1767              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
    1768              :         // - Filter: min threshold on range length
    1769              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
    1770              :         //
    1771              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
    1772              :         #[derive(PartialEq, Eq)]
    1773              :         struct Hole {
    1774              :             key_range: Range<Key>,
    1775              :             coverage_size: usize,
    1776              :         }
    1777           56 :         let holes: Vec<Hole> = {
    1778              :             use std::cmp::Ordering;
    1779              :             impl Ord for Hole {
    1780            0 :                 fn cmp(&self, other: &Self) -> Ordering {
    1781            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
    1782            0 :                 }
    1783              :             }
    1784              :             impl PartialOrd for Hole {
    1785            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    1786            0 :                     Some(self.cmp(other))
    1787            0 :                 }
    1788              :             }
    1789           56 :             let max_holes = deltas_to_compact.len();
    1790           56 :             let last_record_lsn = self.get_last_record_lsn();
    1791           56 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    1792           56 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
    1793           56 :             // min-heap (reserve space for one more element added before eviction)
    1794           56 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    1795           56 :             let mut prev: Option<Key> = None;
    1796              : 
    1797      4128076 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    1798      4128076 :                 if let Some(prev_key) = prev {
    1799              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
    1800              :                     // compaction is the gap between data key and metadata keys.
    1801      4128020 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
    1802            0 :                         && !Key::is_metadata_key(&prev_key)
    1803              :                     {
    1804            0 :                         let key_range = prev_key..next_key;
    1805            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
    1806            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
    1807            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    1808            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
    1809            0 :                         let coverage_size =
    1810            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
    1811            0 :                         if coverage_size >= min_hole_coverage_size {
    1812            0 :                             heap.push(Hole {
    1813            0 :                                 key_range,
    1814            0 :                                 coverage_size,
    1815            0 :                             });
    1816            0 :                             if heap.len() > max_holes {
    1817            0 :                                 heap.pop(); // remove smallest hole
    1818            0 :                             }
    1819            0 :                         }
    1820      4128020 :                     }
    1821           56 :                 }
    1822      4128076 :                 prev = Some(next_key.next());
    1823              :             }
    1824           56 :             let mut holes = heap.into_vec();
    1825           56 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1826           56 :             holes
    1827           56 :         };
    1828           56 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1829           56 :         drop_rlock(guard);
    1830           56 : 
    1831           56 :         if self.cancel.is_cancelled() {
    1832            0 :             return Err(CompactionError::ShuttingDown);
    1833           56 :         }
    1834           56 : 
    1835           56 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1836              : 
    1837              :         // This iterator walks through all key-value pairs from all the layers
    1838              :         // we're compacting, in key, LSN order.
    1839              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1840              :         // then the Value::Image is ordered before Value::WalRecord.
    1841           56 :         let mut all_values_iter = {
    1842           56 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1843          804 :             for l in deltas_to_compact.iter() {
    1844          804 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1845          804 :                 deltas.push(l);
    1846              :             }
    1847           56 :             MergeIterator::create(&deltas, &[], ctx)
    1848           56 :         };
    1849           56 : 
    1850           56 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1851           56 :         let mut all_keys_iter = all_keys
    1852           56 :             .iter()
    1853      4128076 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1854      4128020 :             .coalesce(|mut prev, cur| {
    1855      4128020 :                 // Coalesce keys that belong to the same key pair.
    1856      4128020 :                 // This ensures that compaction doesn't put them
    1857      4128020 :                 // into different layer files.
    1858      4128020 :                 // Still limit this by the target file size,
    1859      4128020 :                 // so that we keep the size of the files in
    1860      4128020 :                 // check.
    1861      4128020 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1862        80076 :                     prev.2 += cur.2;
    1863        80076 :                     Ok(prev)
    1864              :                 } else {
    1865      4047944 :                     Err((prev, cur))
    1866              :                 }
    1867      4128020 :             });
    1868           56 : 
    1869           56 :         // Merge the contents of all the input delta layers into a new set
    1870           56 :         // of delta layers, based on the current partitioning.
    1871           56 :         //
    1872           56 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1873           56 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1874           56 :         // would be too large. In that case, we also split on the LSN dimension.
    1875           56 :         //
    1876           56 :         // LSN
    1877           56 :         //  ^
    1878           56 :         //  |
    1879           56 :         //  | +-----------+            +--+--+--+--+
    1880           56 :         //  | |           |            |  |  |  |  |
    1881           56 :         //  | +-----------+            |  |  |  |  |
    1882           56 :         //  | |           |            |  |  |  |  |
    1883           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1884           56 :         //  | |           |            |  |  |  |  |
    1885           56 :         //  | +-----------+            |  |  |  |  |
    1886           56 :         //  | |           |            |  |  |  |  |
    1887           56 :         //  | +-----------+            +--+--+--+--+
    1888           56 :         //  |
    1889           56 :         //  +--------------> key
    1890           56 :         //
    1891           56 :         //
    1892           56 :         // If one key (X) has a lot of page versions:
    1893           56 :         //
    1894           56 :         // LSN
    1895           56 :         //  ^
    1896           56 :         //  |                                 (X)
    1897           56 :         //  | +-----------+            +--+--+--+--+
    1898           56 :         //  | |           |            |  |  |  |  |
    1899           56 :         //  | +-----------+            |  |  +--+  |
    1900           56 :         //  | |           |            |  |  |  |  |
    1901           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1902           56 :         //  | |           |            |  |  +--+  |
    1903           56 :         //  | +-----------+            |  |  |  |  |
    1904           56 :         //  | |           |            |  |  |  |  |
    1905           56 :         //  | +-----------+            +--+--+--+--+
    1906           56 :         //  |
    1907           56 :         //  +--------------> key
    1908           56 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1909           56 :         // based on the partitioning.
    1910           56 :         //
    1911           56 :         // TODO: we should also opportunistically materialize and
    1912           56 :         // garbage collect what we can.
    1913           56 :         let mut new_layers = Vec::new();
    1914           56 :         let mut prev_key: Option<Key> = None;
    1915           56 :         let mut writer: Option<DeltaLayerWriter> = None;
    1916           56 :         let mut key_values_total_size = 0u64;
    1917           56 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1918           56 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1919           56 :         let mut next_hole = 0; // index of next hole in holes vector
    1920           56 : 
    1921           56 :         let mut keys = 0;
    1922              : 
    1923      4128132 :         while let Some((key, lsn, value)) = all_values_iter
    1924      4128132 :             .next()
    1925      4128132 :             .await
    1926      4128132 :             .map_err(CompactionError::Other)?
    1927              :         {
    1928      4128076 :             keys += 1;
    1929      4128076 : 
    1930      4128076 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1931              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1932              :                 // shuffling an order of million keys per layer, this means we'll check it
    1933              :                 // around tens of times per layer.
    1934            0 :                 return Err(CompactionError::ShuttingDown);
    1935      4128076 :             }
    1936      4128076 : 
    1937      4128076 :             let same_key = prev_key == Some(key);
    1938      4128076 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1939      4128076 :             if !same_key || lsn == dup_end_lsn {
    1940      4048000 :                 let mut next_key_size = 0u64;
    1941      4048000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1942      4048000 :                 dup_start_lsn = Lsn::INVALID;
    1943      4048000 :                 if !same_key {
    1944      4048000 :                     dup_end_lsn = Lsn::INVALID;
    1945      4048000 :                 }
    1946              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1947      4048000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1948      4048000 :                     next_key_size = next_size;
    1949      4048000 :                     if key != next_key {
    1950      4047944 :                         if dup_end_lsn.is_valid() {
    1951            0 :                             // We are writting segment with duplicates:
    1952            0 :                             // place all remaining values of this key in separate segment
    1953            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1954            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1955      4047944 :                         }
    1956      4047944 :                         break;
    1957           56 :                     }
    1958           56 :                     key_values_total_size += next_size;
    1959           56 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1960           56 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1961           56 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1962              :                         // Split key between multiple layers: such layer can contain only single key
    1963            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1964            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1965              :                         } else {
    1966            0 :                             lsn // start with the first LSN for this key
    1967              :                         };
    1968            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1969            0 :                         break;
    1970           56 :                     }
    1971              :                 }
    1972              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1973      4048000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1974            0 :                     dup_start_lsn = dup_end_lsn;
    1975            0 :                     dup_end_lsn = lsn_range.end;
    1976      4048000 :                 }
    1977      4048000 :                 if writer.is_some() {
    1978      4047944 :                     let written_size = writer.as_mut().unwrap().size();
    1979      4047944 :                     let contains_hole =
    1980      4047944 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1981              :                     // check if key cause layer overflow or contains hole...
    1982      4047944 :                     if is_dup_layer
    1983      4047944 :                         || dup_end_lsn.is_valid()
    1984      4047944 :                         || written_size + key_values_total_size > target_file_size
    1985      4047384 :                         || contains_hole
    1986              :                     {
    1987              :                         // ... if so, flush previous layer and prepare to write new one
    1988          560 :                         let (desc, path) = writer
    1989          560 :                             .take()
    1990          560 :                             .unwrap()
    1991          560 :                             .finish(prev_key.unwrap().next(), ctx)
    1992          560 :                             .await
    1993          560 :                             .map_err(CompactionError::Other)?;
    1994          560 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1995          560 :                             .map_err(CompactionError::Other)?;
    1996              : 
    1997          560 :                         new_layers.push(new_delta);
    1998          560 :                         writer = None;
    1999          560 : 
    2000          560 :                         if contains_hole {
    2001            0 :                             // skip hole
    2002            0 :                             next_hole += 1;
    2003          560 :                         }
    2004      4047384 :                     }
    2005           56 :                 }
    2006              :                 // Remember size of key value because at next iteration we will access next item
    2007      4048000 :                 key_values_total_size = next_key_size;
    2008        80076 :             }
    2009      4128076 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2010            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    2011            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    2012            0 :                 )))
    2013      4128076 :             });
    2014              : 
    2015      4128076 :             if !self.shard_identity.is_key_disposable(&key) {
    2016      4128076 :                 if writer.is_none() {
    2017          616 :                     if self.cancel.is_cancelled() {
    2018              :                         // to be somewhat responsive to cancellation, check for each new layer
    2019            0 :                         return Err(CompactionError::ShuttingDown);
    2020          616 :                     }
    2021              :                     // Create writer if not initiaized yet
    2022          616 :                     writer = Some(
    2023              :                         DeltaLayerWriter::new(
    2024          616 :                             self.conf,
    2025          616 :                             self.timeline_id,
    2026          616 :                             self.tenant_shard_id,
    2027          616 :                             key,
    2028          616 :                             if dup_end_lsn.is_valid() {
    2029              :                                 // this is a layer containing slice of values of the same key
    2030            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    2031            0 :                                 dup_start_lsn..dup_end_lsn
    2032              :                             } else {
    2033          616 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2034          616 :                                 lsn_range.clone()
    2035              :                             },
    2036          616 :                             ctx,
    2037          616 :                         )
    2038          616 :                         .await
    2039          616 :                         .map_err(CompactionError::Other)?,
    2040              :                     );
    2041              : 
    2042          616 :                     keys = 0;
    2043      4127460 :                 }
    2044              : 
    2045      4128076 :                 writer
    2046      4128076 :                     .as_mut()
    2047      4128076 :                     .unwrap()
    2048      4128076 :                     .put_value(key, lsn, value, ctx)
    2049      4128076 :                     .await
    2050      4128076 :                     .map_err(CompactionError::Other)?;
    2051              :             } else {
    2052            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    2053            0 : 
    2054            0 :                 // This happens after a shard split, when we're compacting an L0 created by our parent shard
    2055            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    2056              :             }
    2057              : 
    2058      4128076 :             if !new_layers.is_empty() {
    2059        39572 :                 fail_point!("after-timeline-compacted-first-L1");
    2060      4088504 :             }
    2061              : 
    2062      4128076 :             prev_key = Some(key);
    2063              :         }
    2064           56 :         if let Some(writer) = writer {
    2065           56 :             let (desc, path) = writer
    2066           56 :                 .finish(prev_key.unwrap().next(), ctx)
    2067           56 :                 .await
    2068           56 :                 .map_err(CompactionError::Other)?;
    2069           56 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    2070           56 :                 .map_err(CompactionError::Other)?;
    2071           56 :             new_layers.push(new_delta);
    2072            0 :         }
    2073              : 
    2074              :         // Sync layers
    2075           56 :         if !new_layers.is_empty() {
    2076              :             // Print a warning if the created layer is larger than double the target size
    2077              :             // Add two pages for potential overhead. This should in theory be already
    2078              :             // accounted for in the target calculation, but for very small targets,
    2079              :             // we still might easily hit the limit otherwise.
    2080           56 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    2081          616 :             for layer in new_layers.iter() {
    2082          616 :                 if layer.layer_desc().file_size > warn_limit {
    2083            0 :                     warn!(
    2084              :                         %layer,
    2085            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    2086              :                     );
    2087          616 :                 }
    2088              :             }
    2089              : 
    2090              :             // The writer.finish() above already did the fsync of the inodes.
    2091              :             // We just need to fsync the directory in which these inodes are linked,
    2092              :             // which we know to be the timeline directory.
    2093              :             //
    2094              :             // We use fatal_err() below because the after writer.finish() returns with success,
    2095              :             // the in-memory state of the filesystem already has the layer file in its final place,
    2096              :             // and subsequent pageserver code could think it's durable while it really isn't.
    2097           56 :             let timeline_dir = VirtualFile::open(
    2098           56 :                 &self
    2099           56 :                     .conf
    2100           56 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    2101           56 :                 ctx,
    2102           56 :             )
    2103           56 :             .await
    2104           56 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    2105           56 :             timeline_dir
    2106           56 :                 .sync_all()
    2107           56 :                 .await
    2108           56 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    2109            0 :         }
    2110              : 
    2111           56 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    2112           56 :         stats.new_deltas_count = Some(new_layers.len());
    2113          616 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    2114           56 : 
    2115           56 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    2116           56 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    2117              :         {
    2118           56 :             Ok(stats_json) => {
    2119           56 :                 info!(
    2120            0 :                     stats_json = stats_json.as_str(),
    2121            0 :                     "compact_level0_phase1 stats available"
    2122              :                 )
    2123              :             }
    2124            0 :             Err(e) => {
    2125            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    2126              :             }
    2127              :         }
    2128              : 
    2129              :         // Without this, rustc complains about deltas_to_compact still
    2130              :         // being borrowed when we `.into_iter()` below.
    2131           56 :         drop(all_values_iter);
    2132           56 : 
    2133           56 :         Ok(CompactLevel0Phase1Result {
    2134           56 :             new_layers,
    2135           56 :             deltas_to_compact: deltas_to_compact
    2136           56 :                 .into_iter()
    2137          804 :                 .map(|x| x.drop_eviction_guard())
    2138           56 :                 .collect::<Vec<_>>(),
    2139           56 :             outcome: if fully_compacted {
    2140           56 :                 CompactionOutcome::Done
    2141              :             } else {
    2142            0 :                 CompactionOutcome::Pending
    2143              :             },
    2144              :         })
    2145          728 :     }
    2146              : }
    2147              : 
    2148              : #[derive(Default)]
    2149              : struct CompactLevel0Phase1Result {
    2150              :     new_layers: Vec<ResidentLayer>,
    2151              :     deltas_to_compact: Vec<Layer>,
    2152              :     // Whether we have included all L0 layers, or selected only part of them due to the
    2153              :     // L0 compaction size limit.
    2154              :     outcome: CompactionOutcome,
    2155              : }
    2156              : 
    2157              : #[derive(Default)]
    2158              : struct CompactLevel0Phase1StatsBuilder {
    2159              :     version: Option<u64>,
    2160              :     tenant_id: Option<TenantShardId>,
    2161              :     timeline_id: Option<TimelineId>,
    2162              :     read_lock_acquisition_micros: DurationRecorder,
    2163              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    2164              :     read_lock_held_key_sort_micros: DurationRecorder,
    2165              :     read_lock_held_prerequisites_micros: DurationRecorder,
    2166              :     read_lock_held_compute_holes_micros: DurationRecorder,
    2167              :     read_lock_drop_micros: DurationRecorder,
    2168              :     write_layer_files_micros: DurationRecorder,
    2169              :     level0_deltas_count: Option<usize>,
    2170              :     new_deltas_count: Option<usize>,
    2171              :     new_deltas_size: Option<u64>,
    2172              : }
    2173              : 
    2174              : #[derive(serde::Serialize)]
    2175              : struct CompactLevel0Phase1Stats {
    2176              :     version: u64,
    2177              :     tenant_id: TenantShardId,
    2178              :     timeline_id: TimelineId,
    2179              :     read_lock_acquisition_micros: RecordedDuration,
    2180              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    2181              :     read_lock_held_key_sort_micros: RecordedDuration,
    2182              :     read_lock_held_prerequisites_micros: RecordedDuration,
    2183              :     read_lock_held_compute_holes_micros: RecordedDuration,
    2184              :     read_lock_drop_micros: RecordedDuration,
    2185              :     write_layer_files_micros: RecordedDuration,
    2186              :     level0_deltas_count: usize,
    2187              :     new_deltas_count: usize,
    2188              :     new_deltas_size: u64,
    2189              : }
    2190              : 
    2191              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    2192              :     type Error = anyhow::Error;
    2193              : 
    2194           56 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    2195           56 :         Ok(Self {
    2196           56 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    2197           56 :             tenant_id: value
    2198           56 :                 .tenant_id
    2199           56 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    2200           56 :             timeline_id: value
    2201           56 :                 .timeline_id
    2202           56 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    2203           56 :             read_lock_acquisition_micros: value
    2204           56 :                 .read_lock_acquisition_micros
    2205           56 :                 .into_recorded()
    2206           56 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    2207           56 :             read_lock_held_spawn_blocking_startup_micros: value
    2208           56 :                 .read_lock_held_spawn_blocking_startup_micros
    2209           56 :                 .into_recorded()
    2210           56 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    2211           56 :             read_lock_held_key_sort_micros: value
    2212           56 :                 .read_lock_held_key_sort_micros
    2213           56 :                 .into_recorded()
    2214           56 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    2215           56 :             read_lock_held_prerequisites_micros: value
    2216           56 :                 .read_lock_held_prerequisites_micros
    2217           56 :                 .into_recorded()
    2218           56 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    2219           56 :             read_lock_held_compute_holes_micros: value
    2220           56 :                 .read_lock_held_compute_holes_micros
    2221           56 :                 .into_recorded()
    2222           56 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    2223           56 :             read_lock_drop_micros: value
    2224           56 :                 .read_lock_drop_micros
    2225           56 :                 .into_recorded()
    2226           56 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    2227           56 :             write_layer_files_micros: value
    2228           56 :                 .write_layer_files_micros
    2229           56 :                 .into_recorded()
    2230           56 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    2231           56 :             level0_deltas_count: value
    2232           56 :                 .level0_deltas_count
    2233           56 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    2234           56 :             new_deltas_count: value
    2235           56 :                 .new_deltas_count
    2236           56 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    2237           56 :             new_deltas_size: value
    2238           56 :                 .new_deltas_size
    2239           56 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    2240              :         })
    2241           56 :     }
    2242              : }
    2243              : 
    2244              : impl Timeline {
    2245              :     /// Entry point for new tiered compaction algorithm.
    2246              :     ///
    2247              :     /// All the real work is in the implementation in the pageserver_compaction
    2248              :     /// crate. The code here would apply to any algorithm implemented by the
    2249              :     /// same interface, but tiered is the only one at the moment.
    2250              :     ///
    2251              :     /// TODO: cancellation
    2252            0 :     pub(crate) async fn compact_tiered(
    2253            0 :         self: &Arc<Self>,
    2254            0 :         _cancel: &CancellationToken,
    2255            0 :         ctx: &RequestContext,
    2256            0 :     ) -> Result<(), CompactionError> {
    2257            0 :         let fanout = self.get_compaction_threshold() as u64;
    2258            0 :         let target_file_size = self.get_checkpoint_distance();
    2259              : 
    2260              :         // Find the top of the historical layers
    2261            0 :         let end_lsn = {
    2262            0 :             let guard = self.layers.read().await;
    2263            0 :             let layers = guard.layer_map()?;
    2264              : 
    2265            0 :             let l0_deltas = layers.level0_deltas();
    2266            0 : 
    2267            0 :             // As an optimization, if we find that there are too few L0 layers,
    2268            0 :             // bail out early. We know that the compaction algorithm would do
    2269            0 :             // nothing in that case.
    2270            0 :             if l0_deltas.len() < fanout as usize {
    2271              :                 // doesn't need compacting
    2272            0 :                 return Ok(());
    2273            0 :             }
    2274            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    2275            0 :         };
    2276            0 : 
    2277            0 :         // Is the timeline being deleted?
    2278            0 :         if self.is_stopping() {
    2279            0 :             trace!("Dropping out of compaction on timeline shutdown");
    2280            0 :             return Err(CompactionError::ShuttingDown);
    2281            0 :         }
    2282              : 
    2283            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    2284              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    2285            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    2286            0 : 
    2287            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    2288            0 :             &mut adaptor,
    2289            0 :             end_lsn,
    2290            0 :             target_file_size,
    2291            0 :             fanout,
    2292            0 :             ctx,
    2293            0 :         )
    2294            0 :         .await
    2295              :         // TODO: compact_tiered needs to return CompactionError
    2296            0 :         .map_err(CompactionError::Other)?;
    2297              : 
    2298            0 :         adaptor.flush_updates().await?;
    2299            0 :         Ok(())
    2300            0 :     }
    2301              : 
    2302              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    2303              :     ///
    2304              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    2305              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    2306              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    2307              :     ///
    2308              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    2309              :     ///
    2310              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    2311              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    2312              :     ///
    2313              :     /// The function will produce:
    2314              :     ///
    2315              :     /// ```plain
    2316              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    2317              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    2318              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    2319              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    2320              :     /// ```
    2321              :     ///
    2322              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    2323              :     #[allow(clippy::too_many_arguments)]
    2324         1296 :     pub(crate) async fn generate_key_retention(
    2325         1296 :         self: &Arc<Timeline>,
    2326         1296 :         key: Key,
    2327         1296 :         full_history: &[(Key, Lsn, Value)],
    2328         1296 :         horizon: Lsn,
    2329         1296 :         retain_lsn_below_horizon: &[Lsn],
    2330         1296 :         delta_threshold_cnt: usize,
    2331         1296 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    2332         1296 :         verification: bool,
    2333         1296 :     ) -> anyhow::Result<KeyHistoryRetention> {
    2334              :         // Pre-checks for the invariants
    2335              : 
    2336         1296 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2337              : 
    2338         1296 :         if debug_mode {
    2339         3144 :             for (log_key, _, _) in full_history {
    2340         1848 :                 assert_eq!(log_key, &key, "mismatched key");
    2341              :             }
    2342         1296 :             for i in 1..full_history.len() {
    2343          552 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    2344          552 :                 if full_history[i - 1].1 == full_history[i].1 {
    2345            0 :                     assert!(
    2346            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    2347            0 :                         "unordered delta/image, or duplicated delta"
    2348              :                     );
    2349          552 :                 }
    2350              :             }
    2351              :             // There was an assertion for no base image that checks if the first
    2352              :             // record in the history is `will_init` before, but it was removed.
    2353              :             // This is explained in the test cases for generate_key_retention.
    2354              :             // Search "incomplete history" for more information.
    2355         2856 :             for lsn in retain_lsn_below_horizon {
    2356         1560 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    2357              :             }
    2358         1296 :             for i in 1..retain_lsn_below_horizon.len() {
    2359          712 :                 assert!(
    2360          712 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    2361            0 :                     "unordered LSN"
    2362              :                 );
    2363              :             }
    2364            0 :         }
    2365         1296 :         let has_ancestor = base_img_from_ancestor.is_some();
    2366              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    2367              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    2368         1296 :         let (mut split_history, lsn_split_points) = {
    2369         1296 :             let mut split_history = Vec::new();
    2370         1296 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    2371         1296 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    2372         2856 :             for lsn in retain_lsn_below_horizon {
    2373         1560 :                 lsn_split_points.push(*lsn);
    2374         1560 :             }
    2375         1296 :             lsn_split_points.push(horizon);
    2376         1296 :             let mut current_idx = 0;
    2377         3144 :             for item @ (_, lsn, _) in full_history {
    2378         2336 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    2379          488 :                     current_idx += 1;
    2380          488 :                 }
    2381         1848 :                 split_history[current_idx].push(item);
    2382              :             }
    2383         1296 :             (split_history, lsn_split_points)
    2384              :         };
    2385              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    2386         5448 :         for split_for_lsn in &mut split_history {
    2387         4152 :             let mut prev_lsn = None;
    2388         4152 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    2389         4152 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    2390         1848 :                 if let Some(prev_lsn) = &prev_lsn {
    2391          248 :                     if *prev_lsn == lsn {
    2392              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    2393              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    2394              :                         // drop this delta and keep the image.
    2395              :                         //
    2396              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    2397              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    2398              :                         // dropped.
    2399              :                         //
    2400              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    2401              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    2402            0 :                         continue;
    2403          248 :                     }
    2404         1600 :                 }
    2405         1848 :                 prev_lsn = Some(lsn);
    2406         1848 :                 new_split_for_lsn.push(record);
    2407              :             }
    2408         4152 :             *split_for_lsn = new_split_for_lsn;
    2409              :         }
    2410              :         // Step 3: generate images when necessary
    2411         1296 :         let mut retention = Vec::with_capacity(split_history.len());
    2412         1296 :         let mut records_since_last_image = 0;
    2413         1296 :         let batch_cnt = split_history.len();
    2414         1296 :         assert!(
    2415         1296 :             batch_cnt >= 2,
    2416            0 :             "should have at least below + above horizon batches"
    2417              :         );
    2418         1296 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    2419         1296 :         if let Some((key, lsn, ref img)) = base_img_from_ancestor {
    2420           84 :             replay_history.push((key, lsn, Value::Image(img.clone())));
    2421         1212 :         }
    2422              : 
    2423              :         /// Generate debug information for the replay history
    2424            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    2425              :             use std::fmt::Write;
    2426            0 :             let mut output = String::new();
    2427            0 :             if let Some((key, _, _)) = replay_history.first() {
    2428            0 :                 write!(output, "key={} ", key).unwrap();
    2429            0 :                 let mut cnt = 0;
    2430            0 :                 for (_, lsn, val) in replay_history {
    2431            0 :                     if val.is_image() {
    2432            0 :                         write!(output, "i@{} ", lsn).unwrap();
    2433            0 :                     } else if val.will_init() {
    2434            0 :                         write!(output, "di@{} ", lsn).unwrap();
    2435            0 :                     } else {
    2436            0 :                         write!(output, "d@{} ", lsn).unwrap();
    2437            0 :                     }
    2438            0 :                     cnt += 1;
    2439            0 :                     if cnt >= 128 {
    2440            0 :                         write!(output, "... and more").unwrap();
    2441            0 :                         break;
    2442            0 :                     }
    2443              :                 }
    2444            0 :             } else {
    2445            0 :                 write!(output, "<no history>").unwrap();
    2446            0 :             }
    2447            0 :             output
    2448            0 :         }
    2449              : 
    2450            0 :         fn generate_debug_trace(
    2451            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    2452            0 :             full_history: &[(Key, Lsn, Value)],
    2453            0 :             lsns: &[Lsn],
    2454            0 :             horizon: Lsn,
    2455            0 :         ) -> String {
    2456              :             use std::fmt::Write;
    2457            0 :             let mut output = String::new();
    2458            0 :             if let Some(replay_history) = replay_history {
    2459            0 :                 writeln!(
    2460            0 :                     output,
    2461            0 :                     "replay_history: {}",
    2462            0 :                     generate_history_trace(replay_history)
    2463            0 :                 )
    2464            0 :                 .unwrap();
    2465            0 :             } else {
    2466            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    2467            0 :             }
    2468            0 :             writeln!(
    2469            0 :                 output,
    2470            0 :                 "full_history: {}",
    2471            0 :                 generate_history_trace(full_history)
    2472            0 :             )
    2473            0 :             .unwrap();
    2474            0 :             writeln!(
    2475            0 :                 output,
    2476            0 :                 "when processing: [{}] horizon={}",
    2477            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    2478            0 :                 horizon
    2479            0 :             )
    2480            0 :             .unwrap();
    2481            0 :             output
    2482            0 :         }
    2483              : 
    2484         1296 :         let mut key_exists = false;
    2485         4148 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    2486              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    2487         4148 :             records_since_last_image += split_for_lsn.len();
    2488              :             // Whether to produce an image into the final layer files
    2489         4148 :             let produce_image = if i == 0 && !has_ancestor {
    2490              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    2491         1212 :                 true
    2492         2936 :             } else if i == batch_cnt - 1 {
    2493              :                 // Do not generate images for the last batch (above horizon)
    2494         1292 :                 false
    2495         1644 :             } else if records_since_last_image == 0 {
    2496         1288 :                 false
    2497          356 :             } else if records_since_last_image >= delta_threshold_cnt {
    2498              :                 // Generate images when there are too many records
    2499           12 :                 true
    2500              :             } else {
    2501          344 :                 false
    2502              :             };
    2503         4148 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    2504              :             // Only retain the items after the last image record
    2505         5108 :             for idx in (0..replay_history.len()).rev() {
    2506         5108 :                 if replay_history[idx].2.will_init() {
    2507         4148 :                     replay_history = replay_history[idx..].to_vec();
    2508         4148 :                     break;
    2509          960 :                 }
    2510              :             }
    2511         4148 :             if replay_history.is_empty() && !key_exists {
    2512              :                 // The key does not exist at earlier LSN, we can skip this iteration.
    2513            0 :                 retention.push(Vec::new());
    2514            0 :                 continue;
    2515         4148 :             } else {
    2516         4148 :                 key_exists = true;
    2517         4148 :             }
    2518         4148 :             let Some((_, _, val)) = replay_history.first() else {
    2519            0 :                 unreachable!("replay history should not be empty once it exists")
    2520              :             };
    2521         4148 :             if !val.will_init() {
    2522            0 :                 return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
    2523            0 :                     generate_debug_trace(
    2524            0 :                         Some(&replay_history),
    2525            0 :                         full_history,
    2526            0 :                         retain_lsn_below_horizon,
    2527            0 :                         horizon,
    2528            0 :                     )
    2529            0 :                 });
    2530         4148 :             }
    2531              :             // Whether to reconstruct the image. In debug mode, we will generate an image
    2532              :             // at every retain_lsn to ensure data is not corrupted, but we won't put the
    2533              :             // image into the final layer.
    2534         4148 :             let img_and_lsn = if produce_image {
    2535         1224 :                 records_since_last_image = 0;
    2536         1224 :                 let replay_history_for_debug = if debug_mode {
    2537         1224 :                     Some(replay_history.clone())
    2538              :                 } else {
    2539            0 :                     None
    2540              :                 };
    2541         1224 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    2542         1224 :                 let history = std::mem::take(&mut replay_history);
    2543         1224 :                 let mut img = None;
    2544         1224 :                 let mut records = Vec::with_capacity(history.len());
    2545         1224 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    2546         1180 :                     img = Some((*lsn, val.clone()));
    2547         1180 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    2548           80 :                         let Value::WalRecord(rec) = val else {
    2549            0 :                             return Err(anyhow::anyhow!(
    2550            0 :                                 "invalid record, first record is image, expect walrecords"
    2551            0 :                             ))
    2552            0 :                             .with_context(|| {
    2553            0 :                                 generate_debug_trace(
    2554            0 :                                     replay_history_for_debug_ref,
    2555            0 :                                     full_history,
    2556            0 :                                     retain_lsn_below_horizon,
    2557            0 :                                     horizon,
    2558            0 :                                 )
    2559            0 :                             });
    2560              :                         };
    2561           80 :                         records.push((lsn, rec));
    2562              :                     }
    2563              :                 } else {
    2564           72 :                     for (_, lsn, val) in history.into_iter() {
    2565           72 :                         let Value::WalRecord(rec) = val else {
    2566            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    2567            0 :                                 .with_context(|| generate_debug_trace(
    2568            0 :                                     replay_history_for_debug_ref,
    2569            0 :                                     full_history,
    2570            0 :                                     retain_lsn_below_horizon,
    2571            0 :                                     horizon,
    2572            0 :                                 ));
    2573              :                         };
    2574           72 :                         records.push((lsn, rec));
    2575              :                     }
    2576              :                 }
    2577              :                 // WAL redo requires records in the reverse LSN order
    2578         1224 :                 records.reverse();
    2579         1224 :                 let state = ValueReconstructState { img, records };
    2580              :                 // last batch does not generate image so i is always in range, unless we force generate
    2581              :                 // an image during testing
    2582         1224 :                 let request_lsn = if i >= lsn_split_points.len() {
    2583            0 :                     Lsn::MAX
    2584              :                 } else {
    2585         1224 :                     lsn_split_points[i]
    2586              :                 };
    2587         1224 :                 let img = self
    2588         1224 :                     .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
    2589         1224 :                     .await?;
    2590         1220 :                 Some((request_lsn, img))
    2591              :             } else {
    2592         2924 :                 None
    2593              :             };
    2594         4144 :             if produce_image {
    2595         1220 :                 let (request_lsn, img) = img_and_lsn.unwrap();
    2596         1220 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    2597         1220 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    2598         2924 :             } else {
    2599         2924 :                 let deltas = split_for_lsn
    2600         2924 :                     .iter()
    2601         2924 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    2602         2924 :                     .collect_vec();
    2603         2924 :                 retention.push(deltas);
    2604         2924 :             }
    2605              :         }
    2606         1292 :         let mut result = Vec::with_capacity(retention.len());
    2607         1292 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    2608         4144 :         for (idx, logs) in retention.into_iter().enumerate() {
    2609         4144 :             if idx == lsn_split_points.len() {
    2610         1292 :                 let retention = KeyHistoryRetention {
    2611         1292 :                     below_horizon: result,
    2612         1292 :                     above_horizon: KeyLogAtLsn(logs),
    2613         1292 :                 };
    2614         1292 :                 if verification {
    2615         1292 :                     retention
    2616         1292 :                         .verify(key, &base_img_from_ancestor, full_history, self)
    2617         1292 :                         .await?;
    2618            0 :                 }
    2619         1292 :                 return Ok(retention);
    2620         2852 :             } else {
    2621         2852 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    2622         2852 :             }
    2623              :         }
    2624            0 :         unreachable!("key retention is empty")
    2625         1296 :     }
    2626              : 
    2627              :     /// Check how much space is left on the disk
    2628          108 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    2629          108 :         let tenants_dir = self.conf.tenants_path();
    2630              : 
    2631          108 :         let stat = Statvfs::get(&tenants_dir, None)
    2632          108 :             .context("statvfs failed, presumably directory got unlinked")?;
    2633              : 
    2634          108 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    2635          108 : 
    2636          108 :         Ok(avail_bytes)
    2637          108 :     }
    2638              : 
    2639              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    2640              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    2641              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    2642              :     /// compaction.
    2643          108 :     async fn check_compaction_space(
    2644          108 :         self: &Arc<Self>,
    2645          108 :         layer_selection: &[Layer],
    2646          108 :     ) -> Result<(), CompactionError> {
    2647          108 :         let available_space = self
    2648          108 :             .check_available_space()
    2649          108 :             .await
    2650          108 :             .map_err(CompactionError::Other)?;
    2651          108 :         let mut remote_layer_size = 0;
    2652          108 :         let mut all_layer_size = 0;
    2653          424 :         for layer in layer_selection {
    2654          316 :             let needs_download = layer
    2655          316 :                 .needs_download()
    2656          316 :                 .await
    2657          316 :                 .context("failed to check if layer needs download")
    2658          316 :                 .map_err(CompactionError::Other)?;
    2659          316 :             if needs_download.is_some() {
    2660            0 :                 remote_layer_size += layer.layer_desc().file_size;
    2661          316 :             }
    2662          316 :             all_layer_size += layer.layer_desc().file_size;
    2663              :         }
    2664          108 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    2665          108 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    2666              :         {
    2667            0 :             return Err(CompactionError::Other(anyhow!(
    2668            0 :                 "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    2669            0 :                 available_space,
    2670            0 :                 allocated_space,
    2671            0 :                 all_layer_size,
    2672            0 :                 remote_layer_size,
    2673            0 :                 all_layer_size + remote_layer_size
    2674            0 :             )));
    2675          108 :         }
    2676          108 :         Ok(())
    2677          108 :     }
    2678              : 
    2679              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    2680              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    2681              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    2682              :     /// here.
    2683          112 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    2684          112 :         let gc_cutoff_lsn = {
    2685          112 :             let gc_info = self.gc_info.read().unwrap();
    2686          112 :             gc_info.min_cutoff()
    2687          112 :         };
    2688          112 : 
    2689          112 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    2690          112 :         // let watermark = watermark.min(self.standby_horizon.load());
    2691          112 : 
    2692          112 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    2693          112 :         // them when computing the watermark.
    2694          112 :         gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
    2695          112 :     }
    2696              : 
    2697              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    2698              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    2699              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    2700              :     /// like a full compaction of the specified keyspace.
    2701            0 :     pub(crate) async fn gc_compaction_split_jobs(
    2702            0 :         self: &Arc<Self>,
    2703            0 :         job: GcCompactJob,
    2704            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    2705            0 :     ) -> Result<Vec<GcCompactJob>, CompactionError> {
    2706            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    2707            0 :             job.compact_lsn_range.end
    2708              :         } else {
    2709            0 :             self.get_gc_compaction_watermark()
    2710              :         };
    2711              : 
    2712            0 :         if compact_below_lsn == Lsn::INVALID {
    2713            0 :             tracing::warn!(
    2714            0 :                 "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    2715              :             );
    2716            0 :             return Ok(vec![]);
    2717            0 :         }
    2718              : 
    2719              :         // Split compaction job to about 4GB each
    2720              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    2721            0 :         let sub_compaction_max_job_size_mb =
    2722            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    2723            0 : 
    2724            0 :         let mut compact_jobs = Vec::<GcCompactJob>::new();
    2725            0 :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    2726            0 :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    2727            0 :         let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
    2728              :         // Truncate the key range to be within user specified compaction range.
    2729            0 :         fn truncate_to(
    2730            0 :             source_start: &Key,
    2731            0 :             source_end: &Key,
    2732            0 :             target_start: &Key,
    2733            0 :             target_end: &Key,
    2734            0 :         ) -> Option<(Key, Key)> {
    2735            0 :             let start = source_start.max(target_start);
    2736            0 :             let end = source_end.min(target_end);
    2737            0 :             if start < end {
    2738            0 :                 Some((*start, *end))
    2739              :             } else {
    2740            0 :                 None
    2741              :             }
    2742            0 :         }
    2743            0 :         let mut split_key_ranges = Vec::new();
    2744            0 :         let ranges = dense_ks
    2745            0 :             .parts
    2746            0 :             .iter()
    2747            0 :             .map(|partition| partition.ranges.iter())
    2748            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    2749            0 :             .flatten()
    2750            0 :             .cloned()
    2751            0 :             .collect_vec();
    2752            0 :         for range in ranges.iter() {
    2753            0 :             let Some((start, end)) = truncate_to(
    2754            0 :                 &range.start,
    2755            0 :                 &range.end,
    2756            0 :                 &job.compact_key_range.start,
    2757            0 :                 &job.compact_key_range.end,
    2758            0 :             ) else {
    2759            0 :                 continue;
    2760              :             };
    2761            0 :             split_key_ranges.push((start, end));
    2762              :         }
    2763            0 :         split_key_ranges.sort();
    2764            0 :         let all_layers = {
    2765            0 :             let guard = self.layers.read().await;
    2766            0 :             let layer_map = guard.layer_map()?;
    2767            0 :             layer_map.iter_historic_layers().collect_vec()
    2768            0 :         };
    2769            0 :         let mut current_start = None;
    2770            0 :         let ranges_num = split_key_ranges.len();
    2771            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    2772            0 :             if current_start.is_none() {
    2773            0 :                 current_start = Some(start);
    2774            0 :             }
    2775            0 :             let start = current_start.unwrap();
    2776            0 :             if start >= end {
    2777              :                 // We have already processed this partition.
    2778            0 :                 continue;
    2779            0 :             }
    2780            0 :             let overlapping_layers = {
    2781            0 :                 let mut desc = Vec::new();
    2782            0 :                 for layer in all_layers.iter() {
    2783            0 :                     if overlaps_with(&layer.get_key_range(), &(start..end))
    2784            0 :                         && layer.get_lsn_range().start <= compact_below_lsn
    2785            0 :                     {
    2786            0 :                         desc.push(layer.clone());
    2787            0 :                     }
    2788              :                 }
    2789            0 :                 desc
    2790            0 :             };
    2791            0 :             let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
    2792            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    2793              :                 // Try to extend the compaction range so that we include at least one full layer file.
    2794            0 :                 let extended_end = overlapping_layers
    2795            0 :                     .iter()
    2796            0 :                     .map(|layer| layer.key_range.end)
    2797            0 :                     .min();
    2798              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    2799              :                 // In this case, we simply use the specified key range end.
    2800            0 :                 let end = if let Some(extended_end) = extended_end {
    2801            0 :                     extended_end.max(end)
    2802              :                 } else {
    2803            0 :                     end
    2804              :                 };
    2805            0 :                 let end = if ranges_num == idx + 1 {
    2806              :                     // extend the compaction range to the end of the key range if it's the last partition
    2807            0 :                     end.max(job.compact_key_range.end)
    2808              :                 } else {
    2809            0 :                     end
    2810              :                 };
    2811            0 :                 if total_size == 0 && !compact_jobs.is_empty() {
    2812            0 :                     info!(
    2813            0 :                         "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
    2814              :                         start, end, total_size
    2815              :                     );
    2816            0 :                     compact_jobs.last_mut().unwrap().compact_key_range.end = end;
    2817            0 :                     current_start = Some(end);
    2818              :                 } else {
    2819            0 :                     info!(
    2820            0 :                         "splitting compaction job: {}..{}, estimated_size={}",
    2821              :                         start, end, total_size
    2822              :                     );
    2823            0 :                     compact_jobs.push(GcCompactJob {
    2824            0 :                         dry_run: job.dry_run,
    2825            0 :                         compact_key_range: start..end,
    2826            0 :                         compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    2827            0 :                     });
    2828            0 :                     current_start = Some(end);
    2829              :                 }
    2830            0 :             }
    2831              :         }
    2832            0 :         Ok(compact_jobs)
    2833            0 :     }
    2834              : 
    2835              :     /// An experimental compaction building block that combines compaction with garbage collection.
    2836              :     ///
    2837              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    2838              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    2839              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    2840              :     /// and create delta layers with all deltas >= gc horizon.
    2841              :     ///
    2842              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    2843              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    2844              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    2845              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    2846              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    2847              :     /// part of the range.
    2848              :     ///
    2849              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    2850              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    2851          112 :     pub(crate) async fn compact_with_gc(
    2852          112 :         self: &Arc<Self>,
    2853          112 :         cancel: &CancellationToken,
    2854          112 :         options: CompactOptions,
    2855          112 :         ctx: &RequestContext,
    2856          112 :     ) -> Result<CompactionOutcome, CompactionError> {
    2857          112 :         let sub_compaction = options.sub_compaction;
    2858          112 :         let job = GcCompactJob::from_compact_options(options.clone());
    2859          112 :         let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
    2860          112 :         if sub_compaction {
    2861            0 :             info!(
    2862            0 :                 "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
    2863              :             );
    2864            0 :             let jobs = self
    2865            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    2866            0 :                 .await?;
    2867            0 :             let jobs_len = jobs.len();
    2868            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    2869            0 :                 info!(
    2870            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    2871            0 :                     idx + 1,
    2872              :                     jobs_len
    2873              :                 );
    2874            0 :                 self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
    2875            0 :                     .await?;
    2876              :             }
    2877            0 :             if jobs_len == 0 {
    2878            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    2879            0 :             }
    2880            0 :             return Ok(CompactionOutcome::Done);
    2881          112 :         }
    2882          112 :         self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
    2883          112 :             .await
    2884          112 :     }
    2885              : 
    2886          112 :     async fn compact_with_gc_inner(
    2887          112 :         self: &Arc<Self>,
    2888          112 :         cancel: &CancellationToken,
    2889          112 :         job: GcCompactJob,
    2890          112 :         ctx: &RequestContext,
    2891          112 :         yield_for_l0: bool,
    2892          112 :     ) -> Result<CompactionOutcome, CompactionError> {
    2893          112 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    2894          112 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    2895          112 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    2896          112 : 
    2897          112 :         let timer = Instant::now();
    2898          112 :         let begin_timer = timer;
    2899          112 : 
    2900          112 :         let gc_lock = async {
    2901          112 :             tokio::select! {
    2902          112 :                 guard = self.gc_lock.lock() => Ok(guard),
    2903          112 :                 _ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
    2904              :             }
    2905          112 :         };
    2906              : 
    2907          112 :         let time_acquire_lock = timer.elapsed();
    2908          112 :         let timer = Instant::now();
    2909              : 
    2910          112 :         let gc_lock = crate::timed(
    2911          112 :             gc_lock,
    2912          112 :             "acquires gc lock",
    2913          112 :             std::time::Duration::from_secs(5),
    2914          112 :         )
    2915          112 :         .await?;
    2916              : 
    2917          112 :         let dry_run = job.dry_run;
    2918          112 :         let compact_key_range = job.compact_key_range;
    2919          112 :         let compact_lsn_range = job.compact_lsn_range;
    2920              : 
    2921          112 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2922              : 
    2923          112 :         info!(
    2924            0 :             "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
    2925              :             compact_key_range.start,
    2926              :             compact_key_range.end,
    2927              :             compact_lsn_range.start,
    2928              :             compact_lsn_range.end
    2929              :         );
    2930              : 
    2931          112 :         scopeguard::defer! {
    2932          112 :             info!("done enhanced gc bottom-most compaction");
    2933          112 :         };
    2934          112 : 
    2935          112 :         let mut stat = CompactionStatistics::default();
    2936              : 
    2937              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    2938              :         // The layer selection has the following properties:
    2939              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    2940              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    2941          108 :         let job_desc = {
    2942          112 :             let guard = self.layers.read().await;
    2943          112 :             let layers = guard.layer_map()?;
    2944          112 :             let gc_info = self.gc_info.read().unwrap();
    2945          112 :             let mut retain_lsns_below_horizon = Vec::new();
    2946          112 :             let gc_cutoff = {
    2947              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2948              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2949              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2950              :                 // to get the truth data.
    2951          112 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    2952              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2953              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2954              :                 // the real cutoff.
    2955          112 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2956          100 :                     if real_gc_cutoff == Lsn::INVALID {
    2957              :                         // If the gc_cutoff is not generated yet, we should not compact anything.
    2958            0 :                         tracing::warn!(
    2959            0 :                             "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    2960              :                         );
    2961            0 :                         return Ok(CompactionOutcome::Skipped);
    2962          100 :                     }
    2963          100 :                     real_gc_cutoff
    2964              :                 } else {
    2965           12 :                     compact_lsn_range.end
    2966              :                 };
    2967          112 :                 if gc_cutoff > real_gc_cutoff {
    2968            8 :                     warn!(
    2969            0 :                         "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
    2970              :                         gc_cutoff, real_gc_cutoff
    2971              :                     );
    2972            8 :                     gc_cutoff = real_gc_cutoff;
    2973          104 :                 }
    2974          112 :                 gc_cutoff
    2975              :             };
    2976          140 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2977          140 :                 if lsn < &gc_cutoff {
    2978          140 :                     retain_lsns_below_horizon.push(*lsn);
    2979          140 :                 }
    2980              :             }
    2981          112 :             for lsn in gc_info.leases.keys() {
    2982            0 :                 if lsn < &gc_cutoff {
    2983            0 :                     retain_lsns_below_horizon.push(*lsn);
    2984            0 :                 }
    2985              :             }
    2986          112 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2987          112 :             drop(gc_info);
    2988              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2989          112 :             let Some(max_layer_lsn) = layers
    2990          112 :                 .iter_historic_layers()
    2991          500 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2992          428 :                 .map(|desc| desc.get_lsn_range().end)
    2993          112 :                 .max()
    2994              :             else {
    2995            0 :                 info!(
    2996            0 :                     "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
    2997              :                     gc_cutoff
    2998              :                 );
    2999            0 :                 return Ok(CompactionOutcome::Done);
    3000              :             };
    3001              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    3002              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    3003              :             // it is a branch.
    3004          112 :             let Some(min_layer_lsn) = layers
    3005          112 :                 .iter_historic_layers()
    3006          500 :                 .filter(|desc| {
    3007          500 :                     if compact_lsn_range.start == Lsn::INVALID {
    3008          408 :                         true // select all layers below if start == Lsn(0)
    3009              :                     } else {
    3010           92 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    3011              :                     }
    3012          500 :                 })
    3013          464 :                 .map(|desc| desc.get_lsn_range().start)
    3014          112 :                 .min()
    3015              :             else {
    3016            0 :                 info!(
    3017            0 :                     "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
    3018              :                     compact_lsn_range.end
    3019              :                 );
    3020            0 :                 return Ok(CompactionOutcome::Done);
    3021              :             };
    3022              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    3023              :             // layers to compact.
    3024          112 :             let mut rewrite_layers = Vec::new();
    3025          500 :             for desc in layers.iter_historic_layers() {
    3026          500 :                 if desc.get_lsn_range().end <= max_layer_lsn
    3027          428 :                     && desc.get_lsn_range().start >= min_layer_lsn
    3028          392 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    3029              :                 {
    3030              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    3031              :                     // even if it might contain extra keys
    3032          316 :                     selected_layers.push(guard.get_from_desc(&desc));
    3033          316 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    3034          316 :                     // to overlap image layers)
    3035          316 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    3036            4 :                     {
    3037            4 :                         rewrite_layers.push(desc);
    3038          312 :                     }
    3039          184 :                 }
    3040              :             }
    3041          112 :             if selected_layers.is_empty() {
    3042            4 :                 info!(
    3043            0 :                     "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
    3044              :                     gc_cutoff, compact_key_range.start, compact_key_range.end
    3045              :                 );
    3046            4 :                 return Ok(CompactionOutcome::Done);
    3047          108 :             }
    3048          108 :             retain_lsns_below_horizon.sort();
    3049          108 :             GcCompactionJobDescription {
    3050          108 :                 selected_layers,
    3051          108 :                 gc_cutoff,
    3052          108 :                 retain_lsns_below_horizon,
    3053          108 :                 min_layer_lsn,
    3054          108 :                 max_layer_lsn,
    3055          108 :                 compaction_key_range: compact_key_range,
    3056          108 :                 rewrite_layers,
    3057          108 :             }
    3058              :         };
    3059          108 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    3060              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    3061              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    3062           16 :             (true, job_desc.min_layer_lsn)
    3063           92 :         } else if self.ancestor_timeline.is_some() {
    3064              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    3065              :             // LSN ranges all the way to the ancestor timeline.
    3066            4 :             (true, self.ancestor_lsn)
    3067              :         } else {
    3068           88 :             let res = job_desc
    3069           88 :                 .retain_lsns_below_horizon
    3070           88 :                 .first()
    3071           88 :                 .copied()
    3072           88 :                 .unwrap_or(job_desc.gc_cutoff);
    3073           88 :             if debug_mode {
    3074           88 :                 assert_eq!(
    3075           88 :                     res,
    3076           88 :                     job_desc
    3077           88 :                         .retain_lsns_below_horizon
    3078           88 :                         .iter()
    3079           88 :                         .min()
    3080           88 :                         .copied()
    3081           88 :                         .unwrap_or(job_desc.gc_cutoff)
    3082           88 :                 );
    3083            0 :             }
    3084           88 :             (false, res)
    3085              :         };
    3086              : 
    3087          108 :         let verification = self.get_gc_compaction_settings().gc_compaction_verification;
    3088          108 : 
    3089          108 :         info!(
    3090            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    3091            0 :             job_desc.selected_layers.len(),
    3092            0 :             job_desc.rewrite_layers.len(),
    3093              :             job_desc.max_layer_lsn,
    3094              :             job_desc.min_layer_lsn,
    3095              :             job_desc.gc_cutoff,
    3096              :             lowest_retain_lsn,
    3097              :             job_desc.compaction_key_range.start,
    3098              :             job_desc.compaction_key_range.end,
    3099              :             has_data_below,
    3100              :         );
    3101              : 
    3102          108 :         let time_analyze = timer.elapsed();
    3103          108 :         let timer = Instant::now();
    3104              : 
    3105          424 :         for layer in &job_desc.selected_layers {
    3106          316 :             debug!("read layer: {}", layer.layer_desc().key());
    3107              :         }
    3108          112 :         for layer in &job_desc.rewrite_layers {
    3109            4 :             debug!("rewrite layer: {}", layer.key());
    3110              :         }
    3111              : 
    3112          108 :         self.check_compaction_space(&job_desc.selected_layers)
    3113          108 :             .await?;
    3114              : 
    3115              :         // Generate statistics for the compaction
    3116          424 :         for layer in &job_desc.selected_layers {
    3117          316 :             let desc = layer.layer_desc();
    3118          316 :             if desc.is_delta() {
    3119          176 :                 stat.visit_delta_layer(desc.file_size());
    3120          176 :             } else {
    3121          140 :                 stat.visit_image_layer(desc.file_size());
    3122          140 :             }
    3123              :         }
    3124              : 
    3125              :         // Step 1: construct a k-merge iterator over all layers.
    3126              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    3127          108 :         let layer_names = job_desc
    3128          108 :             .selected_layers
    3129          108 :             .iter()
    3130          316 :             .map(|layer| layer.layer_desc().layer_name())
    3131          108 :             .collect_vec();
    3132          108 :         if let Some(err) = check_valid_layermap(&layer_names) {
    3133            0 :             return Err(CompactionError::Other(anyhow!(
    3134            0 :                 "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
    3135            0 :                 err
    3136            0 :             )));
    3137          108 :         }
    3138          108 :         // The maximum LSN we are processing in this compaction loop
    3139          108 :         let end_lsn = job_desc
    3140          108 :             .selected_layers
    3141          108 :             .iter()
    3142          316 :             .map(|l| l.layer_desc().lsn_range.end)
    3143          108 :             .max()
    3144          108 :             .unwrap();
    3145          108 :         let mut delta_layers = Vec::new();
    3146          108 :         let mut image_layers = Vec::new();
    3147          108 :         let mut downloaded_layers = Vec::new();
    3148          108 :         let mut total_downloaded_size = 0;
    3149          108 :         let mut total_layer_size = 0;
    3150          424 :         for layer in &job_desc.selected_layers {
    3151          316 :             if layer
    3152          316 :                 .needs_download()
    3153          316 :                 .await
    3154          316 :                 .context("failed to check if layer needs download")
    3155          316 :                 .map_err(CompactionError::Other)?
    3156          316 :                 .is_some()
    3157            0 :             {
    3158            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    3159          316 :             }
    3160          316 :             total_layer_size += layer.layer_desc().file_size;
    3161          316 :             if cancel.is_cancelled() {
    3162            0 :                 return Err(CompactionError::ShuttingDown);
    3163          316 :             }
    3164          316 :             let should_yield = yield_for_l0
    3165            0 :                 && self
    3166            0 :                     .l0_compaction_trigger
    3167            0 :                     .notified()
    3168            0 :                     .now_or_never()
    3169            0 :                     .is_some();
    3170          316 :             if should_yield {
    3171            0 :                 tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
    3172            0 :                 return Ok(CompactionOutcome::YieldForL0);
    3173          316 :             }
    3174          316 :             let resident_layer = layer
    3175          316 :                 .download_and_keep_resident(ctx)
    3176          316 :                 .await
    3177          316 :                 .context("failed to download and keep resident layer")
    3178          316 :                 .map_err(CompactionError::Other)?;
    3179          316 :             downloaded_layers.push(resident_layer);
    3180              :         }
    3181          108 :         info!(
    3182            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    3183            0 :             total_downloaded_size,
    3184            0 :             total_layer_size,
    3185            0 :             total_downloaded_size as f64 / total_layer_size as f64
    3186              :         );
    3187          424 :         for resident_layer in &downloaded_layers {
    3188          316 :             if resident_layer.layer_desc().is_delta() {
    3189          176 :                 let layer = resident_layer
    3190          176 :                     .get_as_delta(ctx)
    3191          176 :                     .await
    3192          176 :                     .context("failed to get delta layer")
    3193          176 :                     .map_err(CompactionError::Other)?;
    3194          176 :                 delta_layers.push(layer);
    3195              :             } else {
    3196          140 :                 let layer = resident_layer
    3197          140 :                     .get_as_image(ctx)
    3198          140 :                     .await
    3199          140 :                     .context("failed to get image layer")
    3200          140 :                     .map_err(CompactionError::Other)?;
    3201          140 :                 image_layers.push(layer);
    3202              :             }
    3203              :         }
    3204          108 :         let (dense_ks, sparse_ks) = self
    3205          108 :             .collect_gc_compaction_keyspace()
    3206          108 :             .await
    3207          108 :             .context("failed to collect gc compaction keyspace")
    3208          108 :             .map_err(CompactionError::Other)?;
    3209          108 :         let mut merge_iter = FilterIterator::create(
    3210          108 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    3211          108 :             dense_ks,
    3212          108 :             sparse_ks,
    3213          108 :         )
    3214          108 :         .context("failed to create filter iterator")
    3215          108 :         .map_err(CompactionError::Other)?;
    3216              : 
    3217          108 :         let time_download_layer = timer.elapsed();
    3218          108 :         let mut timer = Instant::now();
    3219          108 : 
    3220          108 :         // Step 2: Produce images+deltas.
    3221          108 :         let mut accumulated_values = Vec::new();
    3222          108 :         let mut last_key: Option<Key> = None;
    3223              : 
    3224              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    3225              :         // when some condition meet.
    3226          108 :         let mut image_layer_writer = if !has_data_below {
    3227              :             Some(
    3228           88 :                 SplitImageLayerWriter::new(
    3229           88 :                     self.conf,
    3230           88 :                     self.timeline_id,
    3231           88 :                     self.tenant_shard_id,
    3232           88 :                     job_desc.compaction_key_range.start,
    3233           88 :                     lowest_retain_lsn,
    3234           88 :                     self.get_compaction_target_size(),
    3235           88 :                     ctx,
    3236           88 :                 )
    3237           88 :                 .await
    3238           88 :                 .context("failed to create image layer writer")
    3239           88 :                 .map_err(CompactionError::Other)?,
    3240              :             )
    3241              :         } else {
    3242           20 :             None
    3243              :         };
    3244              : 
    3245          108 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    3246          108 :             self.conf,
    3247          108 :             self.timeline_id,
    3248          108 :             self.tenant_shard_id,
    3249          108 :             lowest_retain_lsn..end_lsn,
    3250          108 :             self.get_compaction_target_size(),
    3251          108 :         )
    3252          108 :         .await
    3253          108 :         .context("failed to create delta layer writer")
    3254          108 :         .map_err(CompactionError::Other)?;
    3255              : 
    3256              :         #[derive(Default)]
    3257              :         struct RewritingLayers {
    3258              :             before: Option<DeltaLayerWriter>,
    3259              :             after: Option<DeltaLayerWriter>,
    3260              :         }
    3261          108 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    3262              : 
    3263              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    3264              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    3265              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    3266              :         ///
    3267              :         /// This function unifies the cases so that later code doesn't have to think about it.
    3268              :         ///
    3269              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    3270              :         /// is needed for reconstruction. This should be fixed in the future.
    3271              :         ///
    3272              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    3273              :         /// images.
    3274         1280 :         async fn get_ancestor_image(
    3275         1280 :             this_tline: &Arc<Timeline>,
    3276         1280 :             key: Key,
    3277         1280 :             ctx: &RequestContext,
    3278         1280 :             has_data_below: bool,
    3279         1280 :             history_lsn_point: Lsn,
    3280         1280 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    3281         1280 :             if !has_data_below {
    3282         1204 :                 return Ok(None);
    3283           76 :             };
    3284              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    3285              :             // as much existing code as possible.
    3286           76 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    3287           76 :             Ok(Some((key, history_lsn_point, img)))
    3288         1280 :         }
    3289              : 
    3290              :         // Actually, we can decide not to write to the image layer at all at this point because
    3291              :         // the key and LSN range are determined. However, to keep things simple here, we still
    3292              :         // create this writer, and discard the writer in the end.
    3293          108 :         let mut time_to_first_kv_pair = None;
    3294              : 
    3295         1984 :         while let Some(((key, lsn, val), desc)) = merge_iter
    3296         1984 :             .next_with_trace()
    3297         1984 :             .await
    3298         1984 :             .context("failed to get next key-value pair")
    3299         1984 :             .map_err(CompactionError::Other)?
    3300              :         {
    3301         1880 :             if time_to_first_kv_pair.is_none() {
    3302          108 :                 time_to_first_kv_pair = Some(timer.elapsed());
    3303          108 :                 timer = Instant::now();
    3304         1772 :             }
    3305              : 
    3306         1880 :             if cancel.is_cancelled() {
    3307            0 :                 return Err(CompactionError::ShuttingDown);
    3308         1880 :             }
    3309              : 
    3310         1880 :             let should_yield = yield_for_l0
    3311            0 :                 && self
    3312            0 :                     .l0_compaction_trigger
    3313            0 :                     .notified()
    3314            0 :                     .now_or_never()
    3315            0 :                     .is_some();
    3316         1880 :             if should_yield {
    3317            0 :                 tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
    3318            0 :                 return Ok(CompactionOutcome::YieldForL0);
    3319         1880 :             }
    3320         1880 :             if self.shard_identity.is_key_disposable(&key) {
    3321              :                 // If this shard does not need to store this key, simply skip it.
    3322              :                 //
    3323              :                 // This is not handled in the filter iterator because shard is determined by hash.
    3324              :                 // Therefore, it does not give us any performance benefit to do things like skip
    3325              :                 // a whole layer file as handling key spaces (ranges).
    3326            0 :                 if cfg!(debug_assertions) {
    3327            0 :                     let shard = self.shard_identity.shard_index();
    3328            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    3329            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    3330            0 :                 }
    3331            0 :                 continue;
    3332         1880 :             }
    3333         1880 :             if !job_desc.compaction_key_range.contains(&key) {
    3334          128 :                 if !desc.is_delta {
    3335          120 :                     continue;
    3336            8 :                 }
    3337            8 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    3338            8 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    3339            0 :                     if rewriter.before.is_none() {
    3340            0 :                         rewriter.before = Some(
    3341            0 :                             DeltaLayerWriter::new(
    3342            0 :                                 self.conf,
    3343            0 :                                 self.timeline_id,
    3344            0 :                                 self.tenant_shard_id,
    3345            0 :                                 desc.key_range.start,
    3346            0 :                                 desc.lsn_range.clone(),
    3347            0 :                                 ctx,
    3348            0 :                             )
    3349            0 :                             .await
    3350            0 :                             .context("failed to create delta layer writer")
    3351            0 :                             .map_err(CompactionError::Other)?,
    3352              :                         );
    3353            0 :                     }
    3354            0 :                     rewriter.before.as_mut().unwrap()
    3355            8 :                 } else if key >= job_desc.compaction_key_range.end {
    3356            8 :                     if rewriter.after.is_none() {
    3357            4 :                         rewriter.after = Some(
    3358            4 :                             DeltaLayerWriter::new(
    3359            4 :                                 self.conf,
    3360            4 :                                 self.timeline_id,
    3361            4 :                                 self.tenant_shard_id,
    3362            4 :                                 job_desc.compaction_key_range.end,
    3363            4 :                                 desc.lsn_range.clone(),
    3364            4 :                                 ctx,
    3365            4 :                             )
    3366            4 :                             .await
    3367            4 :                             .context("failed to create delta layer writer")
    3368            4 :                             .map_err(CompactionError::Other)?,
    3369              :                         );
    3370            4 :                     }
    3371            8 :                     rewriter.after.as_mut().unwrap()
    3372              :                 } else {
    3373            0 :                     unreachable!()
    3374              :                 };
    3375            8 :                 rewriter
    3376            8 :                     .put_value(key, lsn, val, ctx)
    3377            8 :                     .await
    3378            8 :                     .context("failed to put value")
    3379            8 :                     .map_err(CompactionError::Other)?;
    3380            8 :                 continue;
    3381         1752 :             }
    3382         1752 :             match val {
    3383         1260 :                 Value::Image(_) => stat.visit_image_key(&val),
    3384          492 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    3385              :             }
    3386         1752 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    3387          576 :                 if last_key.is_none() {
    3388          108 :                     last_key = Some(key);
    3389          468 :                 }
    3390          576 :                 accumulated_values.push((key, lsn, val));
    3391              :             } else {
    3392         1176 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    3393         1176 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    3394         1176 :                 let retention = self
    3395         1176 :                     .generate_key_retention(
    3396         1176 :                         *last_key,
    3397         1176 :                         &accumulated_values,
    3398         1176 :                         job_desc.gc_cutoff,
    3399         1176 :                         &job_desc.retain_lsns_below_horizon,
    3400         1176 :                         COMPACTION_DELTA_THRESHOLD,
    3401         1176 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    3402         1176 :                             .await
    3403         1176 :                             .context("failed to get ancestor image")
    3404         1176 :                             .map_err(CompactionError::Other)?,
    3405         1176 :                         verification,
    3406         1176 :                     )
    3407         1176 :                     .await
    3408         1176 :                     .context("failed to generate key retention")
    3409         1176 :                     .map_err(CompactionError::Other)?;
    3410         1172 :                 retention
    3411         1172 :                     .pipe_to(
    3412         1172 :                         *last_key,
    3413         1172 :                         &mut delta_layer_writer,
    3414         1172 :                         image_layer_writer.as_mut(),
    3415         1172 :                         &mut stat,
    3416         1172 :                         ctx,
    3417         1172 :                     )
    3418         1172 :                     .await
    3419         1172 :                     .context("failed to pipe to delta layer writer")
    3420         1172 :                     .map_err(CompactionError::Other)?;
    3421         1172 :                 accumulated_values.clear();
    3422         1172 :                 *last_key = key;
    3423         1172 :                 accumulated_values.push((key, lsn, val));
    3424              :             }
    3425              :         }
    3426              : 
    3427              :         // TODO: move the below part to the loop body
    3428          104 :         let Some(last_key) = last_key else {
    3429            0 :             return Err(CompactionError::Other(anyhow!(
    3430            0 :                 "no keys produced during compaction"
    3431            0 :             )));
    3432              :         };
    3433          104 :         stat.on_unique_key_visited();
    3434              : 
    3435          104 :         let retention = self
    3436          104 :             .generate_key_retention(
    3437          104 :                 last_key,
    3438          104 :                 &accumulated_values,
    3439          104 :                 job_desc.gc_cutoff,
    3440          104 :                 &job_desc.retain_lsns_below_horizon,
    3441          104 :                 COMPACTION_DELTA_THRESHOLD,
    3442          104 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
    3443          104 :                     .await
    3444          104 :                     .context("failed to get ancestor image")
    3445          104 :                     .map_err(CompactionError::Other)?,
    3446          104 :                 verification,
    3447          104 :             )
    3448          104 :             .await
    3449          104 :             .context("failed to generate key retention")
    3450          104 :             .map_err(CompactionError::Other)?;
    3451          104 :         retention
    3452          104 :             .pipe_to(
    3453          104 :                 last_key,
    3454          104 :                 &mut delta_layer_writer,
    3455          104 :                 image_layer_writer.as_mut(),
    3456          104 :                 &mut stat,
    3457          104 :                 ctx,
    3458          104 :             )
    3459          104 :             .await
    3460          104 :             .context("failed to pipe to delta layer writer")
    3461          104 :             .map_err(CompactionError::Other)?;
    3462              :         // end: move the above part to the loop body
    3463              : 
    3464          104 :         let time_main_loop = timer.elapsed();
    3465          104 :         let timer = Instant::now();
    3466          104 : 
    3467          104 :         let mut rewrote_delta_layers = Vec::new();
    3468          108 :         for (key, writers) in delta_layer_rewriters {
    3469            4 :             if let Some(delta_writer_before) = writers.before {
    3470            0 :                 let (desc, path) = delta_writer_before
    3471            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    3472            0 :                     .await
    3473            0 :                     .context("failed to finish delta layer writer")
    3474            0 :                     .map_err(CompactionError::Other)?;
    3475            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)
    3476            0 :                     .context("failed to finish creating delta layer")
    3477            0 :                     .map_err(CompactionError::Other)?;
    3478            0 :                 rewrote_delta_layers.push(layer);
    3479            4 :             }
    3480            4 :             if let Some(delta_writer_after) = writers.after {
    3481            4 :                 let (desc, path) = delta_writer_after
    3482            4 :                     .finish(key.key_range.end, ctx)
    3483            4 :                     .await
    3484            4 :                     .context("failed to finish delta layer writer")
    3485            4 :                     .map_err(CompactionError::Other)?;
    3486            4 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)
    3487            4 :                     .context("failed to finish creating delta layer")
    3488            4 :                     .map_err(CompactionError::Other)?;
    3489            4 :                 rewrote_delta_layers.push(layer);
    3490            0 :             }
    3491              :         }
    3492              : 
    3493          148 :         let discard = |key: &PersistentLayerKey| {
    3494          148 :             let key = key.clone();
    3495          148 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    3496          148 :         };
    3497              : 
    3498          104 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    3499           84 :             if !dry_run {
    3500           76 :                 let end_key = job_desc.compaction_key_range.end;
    3501           76 :                 writer
    3502           76 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    3503           76 :                     .await
    3504           76 :                     .context("failed to finish image layer writer")
    3505           76 :                     .map_err(CompactionError::Other)?
    3506              :             } else {
    3507            8 :                 drop(writer);
    3508            8 :                 Vec::new()
    3509              :             }
    3510              :         } else {
    3511           20 :             Vec::new()
    3512              :         };
    3513              : 
    3514          104 :         let produced_delta_layers = if !dry_run {
    3515           96 :             delta_layer_writer
    3516           96 :                 .finish_with_discard_fn(self, ctx, discard)
    3517           96 :                 .await
    3518           96 :                 .context("failed to finish delta layer writer")
    3519           96 :                 .map_err(CompactionError::Other)?
    3520              :         } else {
    3521            8 :             drop(delta_layer_writer);
    3522            8 :             Vec::new()
    3523              :         };
    3524              : 
    3525              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    3526              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    3527          104 :         let mut compact_to = Vec::new();
    3528          104 :         let mut keep_layers = HashSet::new();
    3529          104 :         let produced_delta_layers_len = produced_delta_layers.len();
    3530          104 :         let produced_image_layers_len = produced_image_layers.len();
    3531          104 : 
    3532          104 :         let layer_selection_by_key = job_desc
    3533          104 :             .selected_layers
    3534          104 :             .iter()
    3535          304 :             .map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
    3536          104 :             .collect::<HashMap<_, _>>();
    3537              : 
    3538          176 :         for action in produced_delta_layers {
    3539           72 :             match action {
    3540           44 :                 BatchWriterResult::Produced(layer) => {
    3541           44 :                     if cfg!(debug_assertions) {
    3542           44 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    3543            0 :                     }
    3544           44 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    3545           44 :                     compact_to.push(layer);
    3546              :                 }
    3547           28 :                 BatchWriterResult::Discarded(l) => {
    3548           28 :                     if cfg!(debug_assertions) {
    3549           28 :                         info!("discarded delta layer: {}", l);
    3550            0 :                     }
    3551           28 :                     if let Some(layer_desc) = layer_selection_by_key.get(&l) {
    3552           28 :                         stat.discard_delta_layer(layer_desc.file_size());
    3553           28 :                     } else {
    3554            0 :                         tracing::warn!(
    3555            0 :                             "discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
    3556              :                             l
    3557              :                         );
    3558            0 :                         stat.discard_delta_layer(0);
    3559              :                     }
    3560           28 :                     keep_layers.insert(l);
    3561              :                 }
    3562              :             }
    3563              :         }
    3564          108 :         for layer in &rewrote_delta_layers {
    3565            4 :             debug!(
    3566            0 :                 "produced rewritten delta layer: {}",
    3567            0 :                 layer.layer_desc().key()
    3568              :             );
    3569              :             // For now, we include rewritten delta layer size in the "produce_delta_layer". We could
    3570              :             // make it a separate statistics in the future.
    3571            4 :             stat.produce_delta_layer(layer.layer_desc().file_size());
    3572              :         }
    3573          104 :         compact_to.extend(rewrote_delta_layers);
    3574          180 :         for action in produced_image_layers {
    3575           76 :             match action {
    3576           60 :                 BatchWriterResult::Produced(layer) => {
    3577           60 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    3578           60 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    3579           60 :                     compact_to.push(layer);
    3580              :                 }
    3581           16 :                 BatchWriterResult::Discarded(l) => {
    3582           16 :                     debug!("discarded image layer: {}", l);
    3583           16 :                     if let Some(layer_desc) = layer_selection_by_key.get(&l) {
    3584           16 :                         stat.discard_image_layer(layer_desc.file_size());
    3585           16 :                     } else {
    3586            0 :                         tracing::warn!(
    3587            0 :                             "discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
    3588              :                             l
    3589              :                         );
    3590            0 :                         stat.discard_image_layer(0);
    3591              :                     }
    3592           16 :                     keep_layers.insert(l);
    3593              :                 }
    3594              :             }
    3595              :         }
    3596              : 
    3597          104 :         let mut layer_selection = job_desc.selected_layers;
    3598              : 
    3599              :         // Partial compaction might select more data than it processes, e.g., if
    3600              :         // the compaction_key_range only partially overlaps:
    3601              :         //
    3602              :         //         [---compaction_key_range---]
    3603              :         //   [---A----][----B----][----C----][----D----]
    3604              :         //
    3605              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    3606              :         // the compaction key range, so we can always discard them. However, for image
    3607              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    3608              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    3609              :         //
    3610              :         // The created image layers contain whatever is needed from B, C, and from
    3611              :         // `----]` of A, and from  `[---` of D.
    3612              :         //
    3613              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    3614              :         // keep that data.
    3615              :         //
    3616              :         // The solution for now is to keep A and D completely if they are image layers.
    3617              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    3618              :         // is _not_ fully covered by compaction_key_range).
    3619          408 :         for layer in &layer_selection {
    3620          304 :             if !layer.layer_desc().is_delta() {
    3621          132 :                 if !overlaps_with(
    3622          132 :                     &layer.layer_desc().key_range,
    3623          132 :                     &job_desc.compaction_key_range,
    3624          132 :                 ) {
    3625            0 :                     return Err(CompactionError::Other(anyhow!(
    3626            0 :                         "violated constraint: image layer outside of compaction key range"
    3627            0 :                     )));
    3628          132 :                 }
    3629          132 :                 if !fully_contains(
    3630          132 :                     &job_desc.compaction_key_range,
    3631          132 :                     &layer.layer_desc().key_range,
    3632          132 :                 ) {
    3633           16 :                     keep_layers.insert(layer.layer_desc().key());
    3634          116 :                 }
    3635          172 :             }
    3636              :         }
    3637              : 
    3638          304 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    3639          104 : 
    3640          104 :         let time_final_phase = timer.elapsed();
    3641          104 : 
    3642          104 :         stat.time_final_phase_secs = time_final_phase.as_secs_f64();
    3643          104 :         stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
    3644          104 :             .unwrap_or(Duration::ZERO)
    3645          104 :             .as_secs_f64();
    3646          104 :         stat.time_main_loop_secs = time_main_loop.as_secs_f64();
    3647          104 :         stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
    3648          104 :         stat.time_download_layer_secs = time_download_layer.as_secs_f64();
    3649          104 :         stat.time_analyze_secs = time_analyze.as_secs_f64();
    3650          104 :         stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
    3651          104 :         stat.finalize();
    3652          104 : 
    3653          104 :         info!(
    3654            0 :             "gc-compaction statistics: {}",
    3655            0 :             serde_json::to_string(&stat)
    3656            0 :                 .context("failed to serialize gc-compaction statistics")
    3657            0 :                 .map_err(CompactionError::Other)?
    3658              :         );
    3659              : 
    3660          104 :         if dry_run {
    3661            8 :             return Ok(CompactionOutcome::Done);
    3662           96 :         }
    3663           96 : 
    3664           96 :         info!(
    3665            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    3666            0 :             produced_delta_layers_len,
    3667            0 :             produced_image_layers_len,
    3668            0 :             keep_layers.len()
    3669              :         );
    3670              : 
    3671              :         // Step 3: Place back to the layer map.
    3672              : 
    3673              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    3674           96 :         let all_layers = {
    3675           96 :             let guard = self.layers.read().await;
    3676           96 :             let layer_map = guard.layer_map()?;
    3677           96 :             layer_map.iter_historic_layers().collect_vec()
    3678           96 :         };
    3679           96 : 
    3680           96 :         let mut final_layers = all_layers
    3681           96 :             .iter()
    3682          428 :             .map(|layer| layer.layer_name())
    3683           96 :             .collect::<HashSet<_>>();
    3684          304 :         for layer in &layer_selection {
    3685          208 :             final_layers.remove(&layer.layer_desc().layer_name());
    3686          208 :         }
    3687          204 :         for layer in &compact_to {
    3688          108 :             final_layers.insert(layer.layer_desc().layer_name());
    3689          108 :         }
    3690           96 :         let final_layers = final_layers.into_iter().collect_vec();
    3691              : 
    3692              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    3693              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    3694              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    3695           96 :         if let Some(err) = check_valid_layermap(&final_layers) {
    3696            0 :             return Err(CompactionError::Other(anyhow!(
    3697            0 :                 "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
    3698            0 :                 err
    3699            0 :             )));
    3700           96 :         }
    3701              : 
    3702              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    3703              :         // operate on L1 layers.
    3704              :         {
    3705              :             // Gc-compaction will rewrite the history of a key. This could happen in two ways:
    3706              :             //
    3707              :             // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
    3708              :             // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
    3709              :             // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
    3710              :             // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
    3711              :             // now gets stored in I at the compact LSN.
    3712              :             //
    3713              :             // ---------------                                       ---------------
    3714              :             //   delta1@LSN20                                         image1@LSN20
    3715              :             // ---------------  (read path collects delta@LSN20,  => ---------------  (read path cannot find anything
    3716              :             //   delta1@LSN10    yields)                                               below LSN 20)
    3717              :             // ---------------
    3718              :             //
    3719              :             // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
    3720              :             // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
    3721              :             // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
    3722              :             //
    3723              :             // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
    3724              :             // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
    3725              :             // layer containing 4, yields, and we update the layer map to put the delta layer.
    3726              :             //
    3727              :             // ---------------                                      ---------------
    3728              :             //   delta1@LSN4                                          image1@LSN4
    3729              :             // ---------------  (read path collects delta@LSN4,  => ---------------  (read path collects LSN4 and LSN1,
    3730              :             //  delta1@LSN1-3    yields)                              delta1@LSN1     which is an invalid history)
    3731              :             // ---------------                                      ---------------
    3732              :             //
    3733              :             // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
    3734              :             // and only allow reads to continue after the update is finished.
    3735              : 
    3736           96 :             let update_guard = self.gc_compaction_layer_update_lock.write().await;
    3737              :             // Acquiring the update guard ensures current read operations end and new read operations are blocked.
    3738              :             // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
    3739           96 :             let mut guard = self.layers.write().await;
    3740           96 :             guard
    3741           96 :                 .open_mut()?
    3742           96 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
    3743           96 :             drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
    3744           96 :         };
    3745           96 : 
    3746           96 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    3747           96 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    3748           96 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    3749           96 :         // be batched into `schedule_compaction_update`.
    3750           96 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    3751           96 :         self.schedule_uploads(disk_consistent_lsn, None)
    3752           96 :             .context("failed to schedule uploads")
    3753           96 :             .map_err(CompactionError::Other)?;
    3754              :         // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
    3755              :         // of `compact_from`.
    3756           96 :         let compact_from = {
    3757           96 :             let mut compact_from = Vec::new();
    3758           96 :             let mut compact_to_set = HashMap::new();
    3759          204 :             for layer in &compact_to {
    3760          108 :                 compact_to_set.insert(layer.layer_desc().key(), layer);
    3761          108 :             }
    3762          304 :             for layer in &layer_selection {
    3763          208 :                 if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
    3764            0 :                     tracing::info!(
    3765            0 :                         "skipping delete {} because found same layer key at different generation {}",
    3766              :                         layer,
    3767              :                         to
    3768              :                     );
    3769          208 :                 } else {
    3770          208 :                     compact_from.push(layer.clone());
    3771          208 :                 }
    3772              :             }
    3773           96 :             compact_from
    3774           96 :         };
    3775           96 :         self.remote_client
    3776           96 :             .schedule_compaction_update(&compact_from, &compact_to)?;
    3777              : 
    3778           96 :         drop(gc_lock);
    3779           96 : 
    3780           96 :         Ok(CompactionOutcome::Done)
    3781          112 :     }
    3782              : }
    3783              : 
    3784              : struct TimelineAdaptor {
    3785              :     timeline: Arc<Timeline>,
    3786              : 
    3787              :     keyspace: (Lsn, KeySpace),
    3788              : 
    3789              :     new_deltas: Vec<ResidentLayer>,
    3790              :     new_images: Vec<ResidentLayer>,
    3791              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    3792              : }
    3793              : 
    3794              : impl TimelineAdaptor {
    3795            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    3796            0 :         Self {
    3797            0 :             timeline: timeline.clone(),
    3798            0 :             keyspace,
    3799            0 :             new_images: Vec::new(),
    3800            0 :             new_deltas: Vec::new(),
    3801            0 :             layers_to_delete: Vec::new(),
    3802            0 :         }
    3803            0 :     }
    3804              : 
    3805            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    3806            0 :         let layers_to_delete = {
    3807            0 :             let guard = self.timeline.layers.read().await;
    3808            0 :             self.layers_to_delete
    3809            0 :                 .iter()
    3810            0 :                 .map(|x| guard.get_from_desc(x))
    3811            0 :                 .collect::<Vec<Layer>>()
    3812            0 :         };
    3813            0 :         self.timeline
    3814            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    3815            0 :             .await?;
    3816              : 
    3817            0 :         self.timeline
    3818            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    3819              : 
    3820            0 :         self.new_deltas.clear();
    3821            0 :         self.layers_to_delete.clear();
    3822            0 :         Ok(())
    3823            0 :     }
    3824              : }
    3825              : 
    3826              : #[derive(Clone)]
    3827              : struct ResidentDeltaLayer(ResidentLayer);
    3828              : #[derive(Clone)]
    3829              : struct ResidentImageLayer(ResidentLayer);
    3830              : 
    3831              : impl CompactionJobExecutor for TimelineAdaptor {
    3832              :     type Key = pageserver_api::key::Key;
    3833              : 
    3834              :     type Layer = OwnArc<PersistentLayerDesc>;
    3835              :     type DeltaLayer = ResidentDeltaLayer;
    3836              :     type ImageLayer = ResidentImageLayer;
    3837              : 
    3838              :     type RequestContext = crate::context::RequestContext;
    3839              : 
    3840            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    3841            0 :         self.timeline.get_shard_identity()
    3842            0 :     }
    3843              : 
    3844            0 :     async fn get_layers(
    3845            0 :         &mut self,
    3846            0 :         key_range: &Range<Key>,
    3847            0 :         lsn_range: &Range<Lsn>,
    3848            0 :         _ctx: &RequestContext,
    3849            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    3850            0 :         self.flush_updates().await?;
    3851              : 
    3852            0 :         let guard = self.timeline.layers.read().await;
    3853            0 :         let layer_map = guard.layer_map()?;
    3854              : 
    3855            0 :         let result = layer_map
    3856            0 :             .iter_historic_layers()
    3857            0 :             .filter(|l| {
    3858            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    3859            0 :             })
    3860            0 :             .map(OwnArc)
    3861            0 :             .collect();
    3862            0 :         Ok(result)
    3863            0 :     }
    3864              : 
    3865            0 :     async fn get_keyspace(
    3866            0 :         &mut self,
    3867            0 :         key_range: &Range<Key>,
    3868            0 :         lsn: Lsn,
    3869            0 :         _ctx: &RequestContext,
    3870            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    3871            0 :         if lsn == self.keyspace.0 {
    3872            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    3873            0 :                 &self.keyspace.1.ranges,
    3874            0 :                 key_range,
    3875            0 :             ))
    3876              :         } else {
    3877              :             // The current compaction implementation only ever requests the key space
    3878              :             // at the compaction end LSN.
    3879            0 :             anyhow::bail!("keyspace not available for requested lsn");
    3880              :         }
    3881            0 :     }
    3882              : 
    3883            0 :     async fn downcast_delta_layer(
    3884            0 :         &self,
    3885            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3886            0 :         ctx: &RequestContext,
    3887            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    3888            0 :         // this is a lot more complex than a simple downcast...
    3889            0 :         if layer.is_delta() {
    3890            0 :             let l = {
    3891            0 :                 let guard = self.timeline.layers.read().await;
    3892            0 :                 guard.get_from_desc(layer)
    3893              :             };
    3894            0 :             let result = l.download_and_keep_resident(ctx).await?;
    3895              : 
    3896            0 :             Ok(Some(ResidentDeltaLayer(result)))
    3897              :         } else {
    3898            0 :             Ok(None)
    3899              :         }
    3900            0 :     }
    3901              : 
    3902            0 :     async fn create_image(
    3903            0 :         &mut self,
    3904            0 :         lsn: Lsn,
    3905            0 :         key_range: &Range<Key>,
    3906            0 :         ctx: &RequestContext,
    3907            0 :     ) -> anyhow::Result<()> {
    3908            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    3909            0 :     }
    3910              : 
    3911            0 :     async fn create_delta(
    3912            0 :         &mut self,
    3913            0 :         lsn_range: &Range<Lsn>,
    3914            0 :         key_range: &Range<Key>,
    3915            0 :         input_layers: &[ResidentDeltaLayer],
    3916            0 :         ctx: &RequestContext,
    3917            0 :     ) -> anyhow::Result<()> {
    3918            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3919              : 
    3920            0 :         let mut all_entries = Vec::new();
    3921            0 :         for dl in input_layers.iter() {
    3922            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    3923              :         }
    3924              : 
    3925              :         // The current stdlib sorting implementation is designed in a way where it is
    3926              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3927            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3928              : 
    3929            0 :         let mut writer = DeltaLayerWriter::new(
    3930            0 :             self.timeline.conf,
    3931            0 :             self.timeline.timeline_id,
    3932            0 :             self.timeline.tenant_shard_id,
    3933            0 :             key_range.start,
    3934            0 :             lsn_range.clone(),
    3935            0 :             ctx,
    3936            0 :         )
    3937            0 :         .await?;
    3938              : 
    3939            0 :         let mut dup_values = 0;
    3940            0 : 
    3941            0 :         // This iterator walks through all key-value pairs from all the layers
    3942            0 :         // we're compacting, in key, LSN order.
    3943            0 :         let mut prev: Option<(Key, Lsn)> = None;
    3944              :         for &DeltaEntry {
    3945            0 :             key, lsn, ref val, ..
    3946            0 :         } in all_entries.iter()
    3947              :         {
    3948            0 :             if prev == Some((key, lsn)) {
    3949              :                 // This is a duplicate. Skip it.
    3950              :                 //
    3951              :                 // It can happen if compaction is interrupted after writing some
    3952              :                 // layers but not all, and we are compacting the range again.
    3953              :                 // The calculations in the algorithm assume that there are no
    3954              :                 // duplicates, so the math on targeted file size is likely off,
    3955              :                 // and we will create smaller files than expected.
    3956            0 :                 dup_values += 1;
    3957            0 :                 continue;
    3958            0 :             }
    3959              : 
    3960            0 :             let value = val.load(ctx).await?;
    3961              : 
    3962            0 :             writer.put_value(key, lsn, value, ctx).await?;
    3963              : 
    3964            0 :             prev = Some((key, lsn));
    3965              :         }
    3966              : 
    3967            0 :         if dup_values > 0 {
    3968            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    3969            0 :         }
    3970              : 
    3971            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3972            0 :             Err(anyhow::anyhow!(
    3973            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    3974            0 :             ))
    3975            0 :         });
    3976              : 
    3977            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    3978            0 :         let new_delta_layer =
    3979            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3980              : 
    3981            0 :         self.new_deltas.push(new_delta_layer);
    3982            0 :         Ok(())
    3983            0 :     }
    3984              : 
    3985            0 :     async fn delete_layer(
    3986            0 :         &mut self,
    3987            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3988            0 :         _ctx: &RequestContext,
    3989            0 :     ) -> anyhow::Result<()> {
    3990            0 :         self.layers_to_delete.push(layer.clone().0);
    3991            0 :         Ok(())
    3992            0 :     }
    3993              : }
    3994              : 
    3995              : impl TimelineAdaptor {
    3996            0 :     async fn create_image_impl(
    3997            0 :         &mut self,
    3998            0 :         lsn: Lsn,
    3999            0 :         key_range: &Range<Key>,
    4000            0 :         ctx: &RequestContext,
    4001            0 :     ) -> Result<(), CreateImageLayersError> {
    4002            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    4003              : 
    4004            0 :         let image_layer_writer = ImageLayerWriter::new(
    4005            0 :             self.timeline.conf,
    4006            0 :             self.timeline.timeline_id,
    4007            0 :             self.timeline.tenant_shard_id,
    4008            0 :             key_range,
    4009            0 :             lsn,
    4010            0 :             ctx,
    4011            0 :         )
    4012            0 :         .await?;
    4013              : 
    4014            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    4015            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    4016            0 :                 "failpoint image-layer-writer-fail-before-finish"
    4017            0 :             )))
    4018            0 :         });
    4019              : 
    4020            0 :         let keyspace = KeySpace {
    4021            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    4022              :         };
    4023              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    4024            0 :         let outcome = self
    4025            0 :             .timeline
    4026            0 :             .create_image_layer_for_rel_blocks(
    4027            0 :                 &keyspace,
    4028            0 :                 image_layer_writer,
    4029            0 :                 lsn,
    4030            0 :                 ctx,
    4031            0 :                 key_range.clone(),
    4032            0 :                 IoConcurrency::sequential(),
    4033            0 :             )
    4034            0 :             .await?;
    4035              : 
    4036              :         if let ImageLayerCreationOutcome::Generated {
    4037            0 :             unfinished_image_layer,
    4038            0 :         } = outcome
    4039              :         {
    4040            0 :             let (desc, path) = unfinished_image_layer.finish(ctx).await?;
    4041            0 :             let image_layer =
    4042            0 :                 Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    4043            0 :             self.new_images.push(image_layer);
    4044            0 :         }
    4045              : 
    4046            0 :         timer.stop_and_record();
    4047            0 : 
    4048            0 :         Ok(())
    4049            0 :     }
    4050              : }
    4051              : 
    4052              : impl CompactionRequestContext for crate::context::RequestContext {}
    4053              : 
    4054              : #[derive(Debug, Clone)]
    4055              : pub struct OwnArc<T>(pub Arc<T>);
    4056              : 
    4057              : impl<T> Deref for OwnArc<T> {
    4058              :     type Target = <Arc<T> as Deref>::Target;
    4059            0 :     fn deref(&self) -> &Self::Target {
    4060            0 :         &self.0
    4061            0 :     }
    4062              : }
    4063              : 
    4064              : impl<T> AsRef<T> for OwnArc<T> {
    4065            0 :     fn as_ref(&self) -> &T {
    4066            0 :         self.0.as_ref()
    4067            0 :     }
    4068              : }
    4069              : 
    4070              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    4071            0 :     fn key_range(&self) -> &Range<Key> {
    4072            0 :         &self.key_range
    4073            0 :     }
    4074            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4075            0 :         &self.lsn_range
    4076            0 :     }
    4077            0 :     fn file_size(&self) -> u64 {
    4078            0 :         self.file_size
    4079            0 :     }
    4080            0 :     fn short_id(&self) -> std::string::String {
    4081            0 :         self.as_ref().short_id().to_string()
    4082            0 :     }
    4083            0 :     fn is_delta(&self) -> bool {
    4084            0 :         self.as_ref().is_delta()
    4085            0 :     }
    4086              : }
    4087              : 
    4088              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    4089            0 :     fn key_range(&self) -> &Range<Key> {
    4090            0 :         &self.layer_desc().key_range
    4091            0 :     }
    4092            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4093            0 :         &self.layer_desc().lsn_range
    4094            0 :     }
    4095            0 :     fn file_size(&self) -> u64 {
    4096            0 :         self.layer_desc().file_size
    4097            0 :     }
    4098            0 :     fn short_id(&self) -> std::string::String {
    4099            0 :         self.layer_desc().short_id().to_string()
    4100            0 :     }
    4101            0 :     fn is_delta(&self) -> bool {
    4102            0 :         true
    4103            0 :     }
    4104              : }
    4105              : 
    4106              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    4107            0 :     fn key_range(&self) -> &Range<Key> {
    4108            0 :         &self.0.layer_desc().key_range
    4109            0 :     }
    4110            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4111            0 :         &self.0.layer_desc().lsn_range
    4112            0 :     }
    4113            0 :     fn file_size(&self) -> u64 {
    4114            0 :         self.0.layer_desc().file_size
    4115            0 :     }
    4116            0 :     fn short_id(&self) -> std::string::String {
    4117            0 :         self.0.layer_desc().short_id().to_string()
    4118            0 :     }
    4119            0 :     fn is_delta(&self) -> bool {
    4120            0 :         true
    4121            0 :     }
    4122              : }
    4123              : 
    4124              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    4125              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    4126              : 
    4127            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    4128            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    4129            0 :     }
    4130              : }
    4131              : 
    4132              : impl CompactionLayer<Key> for ResidentImageLayer {
    4133            0 :     fn key_range(&self) -> &Range<Key> {
    4134            0 :         &self.0.layer_desc().key_range
    4135            0 :     }
    4136            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    4137            0 :         &self.0.layer_desc().lsn_range
    4138            0 :     }
    4139            0 :     fn file_size(&self) -> u64 {
    4140            0 :         self.0.layer_desc().file_size
    4141            0 :     }
    4142            0 :     fn short_id(&self) -> std::string::String {
    4143            0 :         self.0.layer_desc().short_id().to_string()
    4144            0 :     }
    4145            0 :     fn is_delta(&self) -> bool {
    4146            0 :         false
    4147            0 :     }
    4148              : }
    4149              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta