LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 91bf6c8f32e5e69adde6241313e732fdd6d6e277.info Lines: 52.1 % 2484 1294
Test Date: 2025-03-04 12:19:20 Functions: 37.9 % 169 64

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
      14              :     GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
      15              :     Timeline,
      16              : };
      17              : 
      18              : use anyhow::{Context, anyhow, bail};
      19              : use bytes::Bytes;
      20              : use enumset::EnumSet;
      21              : use fail::fail_point;
      22              : use itertools::Itertools;
      23              : use once_cell::sync::Lazy;
      24              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
      25              : use pageserver_api::key::{KEY_SIZE, Key};
      26              : use pageserver_api::keyspace::{KeySpace, ShardedRange};
      27              : use pageserver_api::models::CompactInfoResponse;
      28              : use pageserver_api::record::NeonWalRecord;
      29              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      30              : use pageserver_api::value::Value;
      31              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      32              : use pageserver_compaction::interface::*;
      33              : use serde::Serialize;
      34              : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
      37              : use utils::critical;
      38              : use utils::id::TimelineId;
      39              : use utils::lsn::Lsn;
      40              : 
      41              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      42              : use crate::page_cache;
      43              : use crate::statvfs::Statvfs;
      44              : use crate::tenant::checks::check_valid_layermap;
      45              : use crate::tenant::gc_block::GcBlock;
      46              : use crate::tenant::layer_map::LayerMap;
      47              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      48              : use crate::tenant::remote_timeline_client::index::GcCompactionState;
      49              : use crate::tenant::storage_layer::batch_split_writer::{
      50              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      51              : };
      52              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      53              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      54              : use crate::tenant::storage_layer::{
      55              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      56              : };
      57              : use crate::tenant::timeline::{
      58              :     DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
      59              :     ResidentLayer, drop_rlock,
      60              : };
      61              : use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
      62              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      63              : 
      64              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      65              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      66              : 
      67              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
      68              : pub struct GcCompactionJobId(pub usize);
      69              : 
      70              : impl std::fmt::Display for GcCompactionJobId {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         write!(f, "{}", self.0)
      73            0 :     }
      74              : }
      75              : 
      76              : pub struct GcCompactionCombinedSettings {
      77              :     pub gc_compaction_enabled: bool,
      78              :     pub gc_compaction_initial_threshold_kb: u64,
      79              :     pub gc_compaction_ratio_percent: u64,
      80              : }
      81              : 
      82              : #[derive(Debug, Clone)]
      83              : pub enum GcCompactionQueueItem {
      84              :     MetaJob {
      85              :         /// Compaction options
      86              :         options: CompactOptions,
      87              :         /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
      88              :         auto: bool,
      89              :     },
      90              :     SubCompactionJob(CompactOptions),
      91              :     Notify(GcCompactionJobId, Option<Lsn>),
      92              : }
      93              : 
      94              : impl GcCompactionQueueItem {
      95            0 :     pub fn into_compact_info_resp(
      96            0 :         self,
      97            0 :         id: GcCompactionJobId,
      98            0 :         running: bool,
      99            0 :     ) -> Option<CompactInfoResponse> {
     100            0 :         match self {
     101            0 :             GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
     102            0 :                 compact_key_range: options.compact_key_range,
     103            0 :                 compact_lsn_range: options.compact_lsn_range,
     104            0 :                 sub_compaction: options.sub_compaction,
     105            0 :                 running,
     106            0 :                 job_id: id.0,
     107            0 :             }),
     108            0 :             GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
     109            0 :                 compact_key_range: options.compact_key_range,
     110            0 :                 compact_lsn_range: options.compact_lsn_range,
     111            0 :                 sub_compaction: options.sub_compaction,
     112            0 :                 running,
     113            0 :                 job_id: id.0,
     114            0 :             }),
     115            0 :             GcCompactionQueueItem::Notify(_, _) => None,
     116              :         }
     117            0 :     }
     118              : }
     119              : 
     120              : #[derive(Default)]
     121              : struct GcCompactionGuardItems {
     122              :     notify: Option<tokio::sync::oneshot::Sender<()>>,
     123              :     gc_guard: Option<gc_block::Guard>,
     124              :     permit: Option<OwnedSemaphorePermit>,
     125              : }
     126              : 
     127              : struct GcCompactionQueueInner {
     128              :     running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     129              :     queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     130              :     guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
     131              :     last_id: GcCompactionJobId,
     132              : }
     133              : 
     134              : impl GcCompactionQueueInner {
     135            0 :     fn next_id(&mut self) -> GcCompactionJobId {
     136            0 :         let id = self.last_id;
     137            0 :         self.last_id = GcCompactionJobId(id.0 + 1);
     138            0 :         id
     139            0 :     }
     140              : }
     141              : 
     142              : /// A structure to store gc_compaction jobs.
     143              : pub struct GcCompactionQueue {
     144              :     /// All items in the queue, and the currently-running job.
     145              :     inner: std::sync::Mutex<GcCompactionQueueInner>,
     146              :     /// Ensure only one thread is consuming the queue.
     147              :     consumer_lock: tokio::sync::Mutex<()>,
     148              : }
     149              : 
     150            0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
     151            0 :     // Only allow two timelines on one pageserver to run gc compaction at a time.
     152            0 :     Arc::new(Semaphore::new(2))
     153            0 : });
     154              : 
     155              : impl GcCompactionQueue {
     156            0 :     pub fn new() -> Self {
     157            0 :         GcCompactionQueue {
     158            0 :             inner: std::sync::Mutex::new(GcCompactionQueueInner {
     159            0 :                 running: None,
     160            0 :                 queued: VecDeque::new(),
     161            0 :                 guards: HashMap::new(),
     162            0 :                 last_id: GcCompactionJobId(0),
     163            0 :             }),
     164            0 :             consumer_lock: tokio::sync::Mutex::new(()),
     165            0 :         }
     166            0 :     }
     167              : 
     168            0 :     pub fn cancel_scheduled(&self) {
     169            0 :         let mut guard = self.inner.lock().unwrap();
     170            0 :         guard.queued.clear();
     171            0 :         // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
     172            0 :         // API is only used for testing purposes, so we can drop everything here.
     173            0 :         guard.guards.clear();
     174            0 :     }
     175              : 
     176              :     /// Schedule a manual compaction job.
     177            0 :     pub fn schedule_manual_compaction(
     178            0 :         &self,
     179            0 :         options: CompactOptions,
     180            0 :         notify: Option<tokio::sync::oneshot::Sender<()>>,
     181            0 :     ) -> GcCompactionJobId {
     182            0 :         let mut guard = self.inner.lock().unwrap();
     183            0 :         let id = guard.next_id();
     184            0 :         guard.queued.push_back((
     185            0 :             id,
     186            0 :             GcCompactionQueueItem::MetaJob {
     187            0 :                 options,
     188            0 :                 auto: false,
     189            0 :             },
     190            0 :         ));
     191            0 :         guard.guards.entry(id).or_default().notify = notify;
     192            0 :         info!("scheduled compaction job id={}", id);
     193            0 :         id
     194            0 :     }
     195              : 
     196              :     /// Schedule an auto compaction job.
     197            0 :     fn schedule_auto_compaction(
     198            0 :         &self,
     199            0 :         options: CompactOptions,
     200            0 :         permit: OwnedSemaphorePermit,
     201            0 :     ) -> GcCompactionJobId {
     202            0 :         let mut guard = self.inner.lock().unwrap();
     203            0 :         let id = guard.next_id();
     204            0 :         guard.queued.push_back((
     205            0 :             id,
     206            0 :             GcCompactionQueueItem::MetaJob {
     207            0 :                 options,
     208            0 :                 auto: true,
     209            0 :             },
     210            0 :         ));
     211            0 :         guard.guards.entry(id).or_default().permit = Some(permit);
     212            0 :         id
     213            0 :     }
     214              : 
     215              :     /// Trigger an auto compaction.
     216            0 :     pub async fn trigger_auto_compaction(
     217            0 :         &self,
     218            0 :         timeline: &Arc<Timeline>,
     219            0 :     ) -> Result<(), CompactionError> {
     220            0 :         let GcCompactionCombinedSettings {
     221            0 :             gc_compaction_enabled,
     222            0 :             gc_compaction_initial_threshold_kb,
     223            0 :             gc_compaction_ratio_percent,
     224            0 :         } = timeline.get_gc_compaction_settings();
     225            0 :         if !gc_compaction_enabled {
     226            0 :             return Ok(());
     227            0 :         }
     228            0 :         if self.remaining_jobs_num() > 0 {
     229              :             // Only schedule auto compaction when the queue is empty
     230            0 :             return Ok(());
     231            0 :         }
     232            0 :         if timeline.ancestor_timeline().is_some() {
     233              :             // Do not trigger auto compaction for child timelines. We haven't tested
     234              :             // it enough in staging yet.
     235            0 :             return Ok(());
     236            0 :         }
     237              : 
     238            0 :         let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
     239              :             // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
     240              :             // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
     241              :             // to ensure the fairness while avoid starving other tasks.
     242            0 :             return Ok(());
     243              :         };
     244              : 
     245            0 :         let gc_compaction_state = timeline.get_gc_compaction_state();
     246            0 :         let l2_lsn = gc_compaction_state
     247            0 :             .map(|x| x.last_completed_lsn)
     248            0 :             .unwrap_or(Lsn::INVALID);
     249              : 
     250            0 :         let layers = {
     251            0 :             let guard = timeline.layers.read().await;
     252            0 :             let layer_map = guard.layer_map()?;
     253            0 :             layer_map.iter_historic_layers().collect_vec()
     254            0 :         };
     255            0 :         let mut l2_size: u64 = 0;
     256            0 :         let mut l1_size = 0;
     257            0 :         let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
     258            0 :         for layer in layers {
     259            0 :             if layer.lsn_range.start <= l2_lsn {
     260            0 :                 l2_size += layer.file_size();
     261            0 :             } else if layer.lsn_range.start <= gc_cutoff {
     262            0 :                 l1_size += layer.file_size();
     263            0 :             }
     264              :         }
     265              : 
     266            0 :         fn trigger_compaction(
     267            0 :             l1_size: u64,
     268            0 :             l2_size: u64,
     269            0 :             gc_compaction_initial_threshold_kb: u64,
     270            0 :             gc_compaction_ratio_percent: u64,
     271            0 :         ) -> bool {
     272              :             const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
     273            0 :             if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
     274              :                 // Do not auto-trigger when physical size >= 150GB
     275            0 :                 return false;
     276            0 :             }
     277            0 :             // initial trigger
     278            0 :             if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
     279            0 :                 info!(
     280            0 :                     "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
     281              :                     l1_size, gc_compaction_initial_threshold_kb
     282              :                 );
     283            0 :                 return true;
     284            0 :             }
     285            0 :             // size ratio trigger
     286            0 :             if l2_size == 0 {
     287            0 :                 return false;
     288            0 :             }
     289            0 :             if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
     290            0 :                 info!(
     291            0 :                     "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
     292              :                     l1_size, l2_size, gc_compaction_ratio_percent
     293              :                 );
     294            0 :                 return true;
     295            0 :             }
     296            0 :             false
     297            0 :         }
     298              : 
     299            0 :         if trigger_compaction(
     300            0 :             l1_size,
     301            0 :             l2_size,
     302            0 :             gc_compaction_initial_threshold_kb,
     303            0 :             gc_compaction_ratio_percent,
     304            0 :         ) {
     305            0 :             self.schedule_auto_compaction(
     306            0 :                 CompactOptions {
     307            0 :                     flags: {
     308            0 :                         let mut flags = EnumSet::new();
     309            0 :                         flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     310            0 :                         flags
     311            0 :                     },
     312            0 :                     sub_compaction: true,
     313            0 :                     compact_key_range: None,
     314            0 :                     compact_lsn_range: None,
     315            0 :                     sub_compaction_max_job_size_mb: None,
     316            0 :                 },
     317            0 :                 permit,
     318            0 :             );
     319            0 :             info!(
     320            0 :                 "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     321              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     322              :             );
     323              :         } else {
     324            0 :             debug!(
     325            0 :                 "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
     326              :                 l1_size, l2_size, l2_lsn, gc_cutoff
     327              :             );
     328              :         }
     329            0 :         Ok(())
     330            0 :     }
     331              : 
     332              :     /// Notify the caller the job has finished and unblock GC.
     333            0 :     fn notify_and_unblock(&self, id: GcCompactionJobId) {
     334            0 :         info!("compaction job id={} finished", id);
     335            0 :         let mut guard = self.inner.lock().unwrap();
     336            0 :         if let Some(items) = guard.guards.remove(&id) {
     337            0 :             drop(items.gc_guard);
     338            0 :             if let Some(tx) = items.notify {
     339            0 :                 let _ = tx.send(());
     340            0 :             }
     341            0 :         }
     342            0 :     }
     343              : 
     344            0 :     async fn handle_sub_compaction(
     345            0 :         &self,
     346            0 :         id: GcCompactionJobId,
     347            0 :         options: CompactOptions,
     348            0 :         timeline: &Arc<Timeline>,
     349            0 :         gc_block: &GcBlock,
     350            0 :         auto: bool,
     351            0 :     ) -> Result<(), CompactionError> {
     352            0 :         info!(
     353            0 :             "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     354              :         );
     355            0 :         let jobs = timeline
     356            0 :             .gc_compaction_split_jobs(
     357            0 :                 GcCompactJob::from_compact_options(options.clone()),
     358            0 :                 options.sub_compaction_max_job_size_mb,
     359            0 :             )
     360            0 :             .await
     361            0 :             .map_err(CompactionError::Other)?;
     362            0 :         if jobs.is_empty() {
     363            0 :             info!("no jobs to run, skipping scheduled compaction task");
     364            0 :             self.notify_and_unblock(id);
     365              :         } else {
     366            0 :             let gc_guard = match gc_block.start().await {
     367            0 :                 Ok(guard) => guard,
     368            0 :                 Err(e) => {
     369            0 :                     return Err(CompactionError::Other(anyhow!(
     370            0 :                         "cannot run gc-compaction because gc is blocked: {}",
     371            0 :                         e
     372            0 :                     )));
     373              :                 }
     374              :             };
     375              : 
     376            0 :             let jobs_len = jobs.len();
     377            0 :             let mut pending_tasks = Vec::new();
     378            0 :             // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
     379            0 :             // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
     380            0 :             let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
     381            0 :             for job in jobs {
     382              :                 // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
     383              :                 // until we do further refactors to allow directly call `compact_with_gc`.
     384            0 :                 let mut flags: EnumSet<CompactFlags> = EnumSet::default();
     385            0 :                 flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     386            0 :                 if job.dry_run {
     387            0 :                     flags |= CompactFlags::DryRun;
     388            0 :                 }
     389            0 :                 let options = CompactOptions {
     390            0 :                     flags,
     391            0 :                     sub_compaction: false,
     392            0 :                     compact_key_range: Some(job.compact_key_range.into()),
     393            0 :                     compact_lsn_range: Some(job.compact_lsn_range.into()),
     394            0 :                     sub_compaction_max_job_size_mb: None,
     395            0 :                 };
     396            0 :                 pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
     397              :             }
     398              : 
     399            0 :             if !auto {
     400            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
     401            0 :             } else {
     402            0 :                 pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
     403            0 :             }
     404              : 
     405              :             {
     406            0 :                 let mut guard = self.inner.lock().unwrap();
     407            0 :                 guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
     408            0 :                 let mut tasks = Vec::new();
     409            0 :                 for task in pending_tasks {
     410            0 :                     let id = guard.next_id();
     411            0 :                     tasks.push((id, task));
     412            0 :                 }
     413            0 :                 tasks.reverse();
     414            0 :                 for item in tasks {
     415            0 :                     guard.queued.push_front(item);
     416            0 :                 }
     417              :             }
     418            0 :             info!(
     419            0 :                 "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
     420              :                 jobs_len
     421              :             );
     422              :         }
     423            0 :         Ok(())
     424            0 :     }
     425              : 
     426              :     /// Take a job from the queue and process it. Returns if there are still pending tasks.
     427            0 :     pub async fn iteration(
     428            0 :         &self,
     429            0 :         cancel: &CancellationToken,
     430            0 :         ctx: &RequestContext,
     431            0 :         gc_block: &GcBlock,
     432            0 :         timeline: &Arc<Timeline>,
     433            0 :     ) -> Result<CompactionOutcome, CompactionError> {
     434            0 :         let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
     435            0 :             return Err(CompactionError::AlreadyRunning(
     436            0 :                 "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
     437            0 :             ));
     438              :         };
     439              :         let has_pending_tasks;
     440            0 :         let Some((id, item)) = ({
     441            0 :             let mut guard = self.inner.lock().unwrap();
     442            0 :             if let Some((id, item)) = guard.queued.pop_front() {
     443            0 :                 guard.running = Some((id, item.clone()));
     444            0 :                 has_pending_tasks = !guard.queued.is_empty();
     445            0 :                 Some((id, item))
     446              :             } else {
     447            0 :                 has_pending_tasks = false;
     448            0 :                 None
     449              :             }
     450              :         }) else {
     451            0 :             self.trigger_auto_compaction(timeline).await?;
     452              :             // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
     453              :             // have not implemented preemption mechanism yet. We always want to yield it to more important
     454              :             // tasks if there is one.
     455            0 :             return Ok(CompactionOutcome::Done);
     456              :         };
     457            0 :         match item {
     458            0 :             GcCompactionQueueItem::MetaJob { options, auto } => {
     459            0 :                 if !options
     460            0 :                     .flags
     461            0 :                     .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     462              :                 {
     463            0 :                     warn!(
     464            0 :                         "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
     465              :                         options
     466              :                     );
     467            0 :                 } else if options.sub_compaction {
     468            0 :                     info!(
     469            0 :                         "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
     470              :                     );
     471            0 :                     self.handle_sub_compaction(id, options, timeline, gc_block, auto)
     472            0 :                         .await?;
     473              :                 } else {
     474              :                     // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
     475              :                     // in this branch.
     476            0 :                     let gc_guard = match gc_block.start().await {
     477            0 :                         Ok(guard) => guard,
     478            0 :                         Err(e) => {
     479            0 :                             return Err(CompactionError::Other(anyhow!(
     480            0 :                                 "cannot run gc-compaction because gc is blocked: {}",
     481            0 :                                 e
     482            0 :                             )));
     483              :                         }
     484              :                     };
     485            0 :                     {
     486            0 :                         let mut guard = self.inner.lock().unwrap();
     487            0 :                         guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
     488            0 :                     }
     489            0 :                     let _ = timeline.compact_with_options(cancel, options, ctx).await?;
     490            0 :                     self.notify_and_unblock(id);
     491              :                 }
     492              :             }
     493            0 :             GcCompactionQueueItem::SubCompactionJob(options) => {
     494            0 :                 // TODO: error handling, clear the queue if any task fails?
     495            0 :                 let _ = timeline.compact_with_options(cancel, options, ctx).await?;
     496              :             }
     497            0 :             GcCompactionQueueItem::Notify(id, l2_lsn) => {
     498            0 :                 self.notify_and_unblock(id);
     499            0 :                 if let Some(l2_lsn) = l2_lsn {
     500            0 :                     let current_l2_lsn = timeline
     501            0 :                         .get_gc_compaction_state()
     502            0 :                         .map(|x| x.last_completed_lsn)
     503            0 :                         .unwrap_or(Lsn::INVALID);
     504            0 :                     if l2_lsn >= current_l2_lsn {
     505            0 :                         info!("l2_lsn updated to {}", l2_lsn);
     506            0 :                         timeline
     507            0 :                             .update_gc_compaction_state(GcCompactionState {
     508            0 :                                 last_completed_lsn: l2_lsn,
     509            0 :                             })
     510            0 :                             .map_err(CompactionError::Other)?;
     511              :                     } else {
     512            0 :                         warn!(
     513            0 :                             "l2_lsn updated to {} but it is less than the current l2_lsn {}",
     514              :                             l2_lsn, current_l2_lsn
     515              :                         );
     516              :                     }
     517            0 :                 }
     518              :             }
     519              :         }
     520            0 :         {
     521            0 :             let mut guard = self.inner.lock().unwrap();
     522            0 :             guard.running = None;
     523            0 :         }
     524            0 :         Ok(if has_pending_tasks {
     525            0 :             CompactionOutcome::Pending
     526              :         } else {
     527            0 :             CompactionOutcome::Done
     528              :         })
     529            0 :     }
     530              : 
     531              :     #[allow(clippy::type_complexity)]
     532            0 :     pub fn remaining_jobs(
     533            0 :         &self,
     534            0 :     ) -> (
     535            0 :         Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     536            0 :         VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     537            0 :     ) {
     538            0 :         let guard = self.inner.lock().unwrap();
     539            0 :         (guard.running.clone(), guard.queued.clone())
     540            0 :     }
     541              : 
     542            0 :     pub fn remaining_jobs_num(&self) -> usize {
     543            0 :         let guard = self.inner.lock().unwrap();
     544            0 :         guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
     545            0 :     }
     546              : }
     547              : 
     548              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
     549              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
     550              : /// called.
     551              : #[derive(Debug, Clone)]
     552              : pub(crate) struct GcCompactJob {
     553              :     pub dry_run: bool,
     554              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
     555              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
     556              :     pub compact_key_range: Range<Key>,
     557              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
     558              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
     559              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
     560              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
     561              :     pub compact_lsn_range: Range<Lsn>,
     562              : }
     563              : 
     564              : impl GcCompactJob {
     565          108 :     pub fn from_compact_options(options: CompactOptions) -> Self {
     566          108 :         GcCompactJob {
     567          108 :             dry_run: options.flags.contains(CompactFlags::DryRun),
     568          108 :             compact_key_range: options
     569          108 :                 .compact_key_range
     570          108 :                 .map(|x| x.into())
     571          108 :                 .unwrap_or(Key::MIN..Key::MAX),
     572          108 :             compact_lsn_range: options
     573          108 :                 .compact_lsn_range
     574          108 :                 .map(|x| x.into())
     575          108 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     576          108 :         }
     577          108 :     }
     578              : }
     579              : 
     580              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     581              : /// and contains the exact layers we want to compact.
     582              : pub struct GcCompactionJobDescription {
     583              :     /// All layers to read in the compaction job
     584              :     selected_layers: Vec<Layer>,
     585              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     586              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     587              :     gc_cutoff: Lsn,
     588              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     589              :     /// generate an image == this LSN.
     590              :     retain_lsns_below_horizon: Vec<Lsn>,
     591              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     592              :     /// \>= this LSN will be kept and will not be rewritten.
     593              :     max_layer_lsn: Lsn,
     594              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     595              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     596              :     /// k-merge within gc-compaction.
     597              :     min_layer_lsn: Lsn,
     598              :     /// Only compact layers overlapping with this range.
     599              :     compaction_key_range: Range<Key>,
     600              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     601              :     /// This field is here solely for debugging. The field will not be read once the compaction
     602              :     /// description is generated.
     603              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     604              : }
     605              : 
     606              : /// The result of bottom-most compaction for a single key at each LSN.
     607              : #[derive(Debug)]
     608              : #[cfg_attr(test, derive(PartialEq))]
     609              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     610              : 
     611              : /// The result of bottom-most compaction.
     612              : #[derive(Debug)]
     613              : #[cfg_attr(test, derive(PartialEq))]
     614              : pub(crate) struct KeyHistoryRetention {
     615              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     616              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     617              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     618              :     pub(crate) above_horizon: KeyLogAtLsn,
     619              : }
     620              : 
     621              : impl KeyHistoryRetention {
     622              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     623              :     ///
     624              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     625              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     626              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     627              :     /// Then we delete branch @ 0x20.
     628              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     629              :     /// And that wouldnt' change the shape of the layer.
     630              :     ///
     631              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     632              :     ///
     633              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     634          148 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     635          148 :         if dry_run {
     636            0 :             return true;
     637          148 :         }
     638          148 :         if LayerMap::is_l0(&key.key_range, key.is_delta) {
     639              :             // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
     640              :             // We should ignore such layers.
     641            0 :             return true;
     642          148 :         }
     643              :         let layer_generation;
     644              :         {
     645          148 :             let guard = tline.layers.read().await;
     646          148 :             if !guard.contains_key(key) {
     647          104 :                 return false;
     648           44 :             }
     649           44 :             layer_generation = guard.get_from_key(key).metadata().generation;
     650           44 :         }
     651           44 :         if layer_generation == tline.generation {
     652           44 :             info!(
     653              :                 key=%key,
     654              :                 ?layer_generation,
     655            0 :                 "discard layer due to duplicated layer key in the same generation",
     656              :             );
     657           44 :             true
     658              :         } else {
     659            0 :             false
     660              :         }
     661          148 :     }
     662              : 
     663              :     /// Pipe a history of a single key to the writers.
     664              :     ///
     665              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     666              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     667              :     #[allow(clippy::too_many_arguments)]
     668         1244 :     async fn pipe_to(
     669         1244 :         self,
     670         1244 :         key: Key,
     671         1244 :         delta_writer: &mut SplitDeltaLayerWriter,
     672         1244 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     673         1244 :         stat: &mut CompactionStatistics,
     674         1244 :         ctx: &RequestContext,
     675         1244 :     ) -> anyhow::Result<()> {
     676         1244 :         let mut first_batch = true;
     677         4024 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     678         2780 :             if first_batch {
     679         1244 :                 if logs.len() == 1 && logs[0].1.is_image() {
     680         1168 :                     let Value::Image(img) = &logs[0].1 else {
     681            0 :                         unreachable!()
     682              :                     };
     683         1168 :                     stat.produce_image_key(img);
     684         1168 :                     if let Some(image_writer) = image_writer.as_mut() {
     685         1168 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     686              :                     } else {
     687            0 :                         delta_writer
     688            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     689            0 :                             .await?;
     690              :                     }
     691              :                 } else {
     692          132 :                     for (lsn, val) in logs {
     693           56 :                         stat.produce_key(&val);
     694           56 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     695              :                     }
     696              :                 }
     697         1244 :                 first_batch = false;
     698              :             } else {
     699         1768 :                 for (lsn, val) in logs {
     700          232 :                     stat.produce_key(&val);
     701          232 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     702              :                 }
     703              :             }
     704              :         }
     705         1244 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     706         1360 :         for (lsn, val) in above_horizon_logs {
     707          116 :             stat.produce_key(&val);
     708          116 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     709              :         }
     710         1244 :         Ok(())
     711         1244 :     }
     712              : }
     713              : 
     714              : #[derive(Debug, Serialize, Default)]
     715              : struct CompactionStatisticsNumSize {
     716              :     num: u64,
     717              :     size: u64,
     718              : }
     719              : 
     720              : #[derive(Debug, Serialize, Default)]
     721              : pub struct CompactionStatistics {
     722              :     delta_layer_visited: CompactionStatisticsNumSize,
     723              :     image_layer_visited: CompactionStatisticsNumSize,
     724              :     delta_layer_produced: CompactionStatisticsNumSize,
     725              :     image_layer_produced: CompactionStatisticsNumSize,
     726              :     num_delta_layer_discarded: usize,
     727              :     num_image_layer_discarded: usize,
     728              :     num_unique_keys_visited: usize,
     729              :     wal_keys_visited: CompactionStatisticsNumSize,
     730              :     image_keys_visited: CompactionStatisticsNumSize,
     731              :     wal_produced: CompactionStatisticsNumSize,
     732              :     image_produced: CompactionStatisticsNumSize,
     733              : }
     734              : 
     735              : impl CompactionStatistics {
     736         2084 :     fn estimated_size_of_value(val: &Value) -> usize {
     737          864 :         match val {
     738         1220 :             Value::Image(img) => img.len(),
     739            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     740          864 :             _ => std::mem::size_of::<NeonWalRecord>(),
     741              :         }
     742         2084 :     }
     743         3272 :     fn estimated_size_of_key() -> usize {
     744         3272 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     745         3272 :     }
     746          172 :     fn visit_delta_layer(&mut self, size: u64) {
     747          172 :         self.delta_layer_visited.num += 1;
     748          172 :         self.delta_layer_visited.size += size;
     749          172 :     }
     750          132 :     fn visit_image_layer(&mut self, size: u64) {
     751          132 :         self.image_layer_visited.num += 1;
     752          132 :         self.image_layer_visited.size += size;
     753          132 :     }
     754         1244 :     fn on_unique_key_visited(&mut self) {
     755         1244 :         self.num_unique_keys_visited += 1;
     756         1244 :     }
     757          480 :     fn visit_wal_key(&mut self, val: &Value) {
     758          480 :         self.wal_keys_visited.num += 1;
     759          480 :         self.wal_keys_visited.size +=
     760          480 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     761          480 :     }
     762         1220 :     fn visit_image_key(&mut self, val: &Value) {
     763         1220 :         self.image_keys_visited.num += 1;
     764         1220 :         self.image_keys_visited.size +=
     765         1220 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     766         1220 :     }
     767          404 :     fn produce_key(&mut self, val: &Value) {
     768          404 :         match val {
     769           20 :             Value::Image(img) => self.produce_image_key(img),
     770          384 :             Value::WalRecord(_) => self.produce_wal_key(val),
     771              :         }
     772          404 :     }
     773          384 :     fn produce_wal_key(&mut self, val: &Value) {
     774          384 :         self.wal_produced.num += 1;
     775          384 :         self.wal_produced.size +=
     776          384 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     777          384 :     }
     778         1188 :     fn produce_image_key(&mut self, val: &Bytes) {
     779         1188 :         self.image_produced.num += 1;
     780         1188 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     781         1188 :     }
     782           28 :     fn discard_delta_layer(&mut self) {
     783           28 :         self.num_delta_layer_discarded += 1;
     784           28 :     }
     785           16 :     fn discard_image_layer(&mut self) {
     786           16 :         self.num_image_layer_discarded += 1;
     787           16 :     }
     788           44 :     fn produce_delta_layer(&mut self, size: u64) {
     789           44 :         self.delta_layer_produced.num += 1;
     790           44 :         self.delta_layer_produced.size += size;
     791           44 :     }
     792           60 :     fn produce_image_layer(&mut self, size: u64) {
     793           60 :         self.image_layer_produced.num += 1;
     794           60 :         self.image_layer_produced.size += size;
     795           60 :     }
     796              : }
     797              : 
     798              : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
     799              : pub enum CompactionOutcome {
     800              :     #[default]
     801              :     /// No layers need to be compacted after this round. Compaction doesn't need
     802              :     /// to be immediately scheduled.
     803              :     Done,
     804              :     /// Still has pending layers to be compacted after this round. Ideally, the scheduler
     805              :     /// should immediately schedule another compaction.
     806              :     Pending,
     807              :     /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
     808              :     /// guaranteed when `compaction_l0_first` is enabled).
     809              :     YieldForL0,
     810              :     /// Compaction was skipped, because the timeline is ineligible for compaction.
     811              :     Skipped,
     812              : }
     813              : 
     814              : impl Timeline {
     815              :     /// TODO: cancellation
     816              :     ///
     817              :     /// Returns whether the compaction has pending tasks.
     818          728 :     pub(crate) async fn compact_legacy(
     819          728 :         self: &Arc<Self>,
     820          728 :         cancel: &CancellationToken,
     821          728 :         options: CompactOptions,
     822          728 :         ctx: &RequestContext,
     823          728 :     ) -> Result<CompactionOutcome, CompactionError> {
     824          728 :         if options
     825          728 :             .flags
     826          728 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     827              :         {
     828            0 :             self.compact_with_gc(cancel, options, ctx)
     829            0 :                 .await
     830            0 :                 .map_err(CompactionError::Other)?;
     831            0 :             return Ok(CompactionOutcome::Done);
     832          728 :         }
     833          728 : 
     834          728 :         if options.flags.contains(CompactFlags::DryRun) {
     835            0 :             return Err(CompactionError::Other(anyhow!(
     836            0 :                 "dry-run mode is not supported for legacy compaction for now"
     837            0 :             )));
     838          728 :         }
     839          728 : 
     840          728 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
     841              :             // maybe useful in the future? could implement this at some point
     842            0 :             return Err(CompactionError::Other(anyhow!(
     843            0 :                 "compaction range is not supported for legacy compaction for now"
     844            0 :             )));
     845          728 :         }
     846          728 : 
     847          728 :         // High level strategy for compaction / image creation:
     848          728 :         //
     849          728 :         // 1. First, do a L0 compaction to ensure we move the L0
     850          728 :         // layers into the historic layer map get flat levels of
     851          728 :         // layers. If we did not compact all L0 layers, we will
     852          728 :         // prioritize compacting the timeline again and not do
     853          728 :         // any of the compactions below.
     854          728 :         //
     855          728 :         // 2. Then, calculate the desired "partitioning" of the
     856          728 :         // currently in-use key space. The goal is to partition the
     857          728 :         // key space into roughly fixed-size chunks, but also take into
     858          728 :         // account any existing image layers, and try to align the
     859          728 :         // chunk boundaries with the existing image layers to avoid
     860          728 :         // too much churn. Also try to align chunk boundaries with
     861          728 :         // relation boundaries.  In principle, we don't know about
     862          728 :         // relation boundaries here, we just deal with key-value
     863          728 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     864          728 :         // map relations into key-value pairs. But in practice we know
     865          728 :         // that 'field6' is the block number, and the fields 1-5
     866          728 :         // identify a relation. This is just an optimization,
     867          728 :         // though.
     868          728 :         //
     869          728 :         // 3. Once we know the partitioning, for each partition,
     870          728 :         // decide if it's time to create a new image layer. The
     871          728 :         // criteria is: there has been too much "churn" since the last
     872          728 :         // image layer? The "churn" is fuzzy concept, it's a
     873          728 :         // combination of too many delta files, or too much WAL in
     874          728 :         // total in the delta file. Or perhaps: if creating an image
     875          728 :         // file would allow to delete some older files.
     876          728 :         //
     877          728 :         // 4. In the end, if the tenant gets auto-sharded, we will run
     878          728 :         // a shard-ancestor compaction.
     879          728 : 
     880          728 :         // Is the timeline being deleted?
     881          728 :         if self.is_stopping() {
     882            0 :             trace!("Dropping out of compaction on timeline shutdown");
     883            0 :             return Err(CompactionError::ShuttingDown);
     884          728 :         }
     885          728 : 
     886          728 :         let target_file_size = self.get_checkpoint_distance();
     887              : 
     888              :         // Define partitioning schema if needed
     889              : 
     890              :         // 1. L0 Compact
     891          728 :         let l0_outcome = {
     892          728 :             let timer = self.metrics.compact_time_histo.start_timer();
     893          728 :             let l0_outcome = self
     894          728 :                 .compact_level0(
     895          728 :                     target_file_size,
     896          728 :                     options.flags.contains(CompactFlags::ForceL0Compaction),
     897          728 :                     ctx,
     898          728 :                 )
     899          728 :                 .await?;
     900          728 :             timer.stop_and_record();
     901          728 :             l0_outcome
     902          728 :         };
     903          728 : 
     904          728 :         if options.flags.contains(CompactFlags::OnlyL0Compaction) {
     905            0 :             return Ok(l0_outcome);
     906          728 :         }
     907          728 : 
     908          728 :         // Yield if we have pending L0 compaction. The scheduler will do another pass.
     909          728 :         if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
     910            0 :             && !options.flags.contains(CompactFlags::NoYield)
     911              :         {
     912            0 :             info!("image/ancestor compaction yielding for L0 compaction");
     913            0 :             return Ok(CompactionOutcome::YieldForL0);
     914          728 :         }
     915          728 : 
     916          728 :         // 2. Repartition and create image layers if necessary
     917          728 :         match self
     918          728 :             .repartition(
     919          728 :                 self.get_last_record_lsn(),
     920          728 :                 self.get_compaction_target_size(),
     921          728 :                 options.flags,
     922          728 :                 ctx,
     923          728 :             )
     924          728 :             .await
     925              :         {
     926          728 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     927          728 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     928          728 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     929          728 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     930          728 :                     .build();
     931          728 : 
     932          728 :                 let mut partitioning = dense_partitioning;
     933          728 :                 partitioning
     934          728 :                     .parts
     935          728 :                     .extend(sparse_partitioning.into_dense().parts);
     936              : 
     937              :                 // 3. Create new image layers for partitions that have been modified "enough".
     938          728 :                 let (image_layers, outcome) = self
     939          728 :                     .create_image_layers(
     940          728 :                         &partitioning,
     941          728 :                         lsn,
     942          728 :                         if options
     943          728 :                             .flags
     944          728 :                             .contains(CompactFlags::ForceImageLayerCreation)
     945              :                         {
     946           28 :                             ImageLayerCreationMode::Force
     947              :                         } else {
     948          700 :                             ImageLayerCreationMode::Try
     949              :                         },
     950          728 :                         &image_ctx,
     951          728 :                         self.last_image_layer_creation_status
     952          728 :                             .load()
     953          728 :                             .as_ref()
     954          728 :                             .clone(),
     955          728 :                         !options.flags.contains(CompactFlags::NoYield),
     956          728 :                     )
     957          728 :                     .await
     958          728 :                     .inspect_err(|err| {
     959              :                         if let CreateImageLayersError::GetVectoredError(
     960              :                             GetVectoredError::MissingKey(_),
     961            0 :                         ) = err
     962              :                         {
     963            0 :                             critical!("missing key during compaction: {err:?}");
     964            0 :                         }
     965          728 :                     })?;
     966              : 
     967          728 :                 self.last_image_layer_creation_status
     968          728 :                     .store(Arc::new(outcome.clone()));
     969          728 : 
     970          728 :                 self.upload_new_image_layers(image_layers)?;
     971          728 :                 if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
     972              :                     // Yield and do not do any other kind of compaction.
     973            0 :                     info!(
     974            0 :                         "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
     975              :                     );
     976            0 :                     return Ok(CompactionOutcome::YieldForL0);
     977          728 :                 }
     978              :             }
     979              : 
     980              :             // Suppress errors when cancelled.
     981            0 :             Err(_) if self.cancel.is_cancelled() => {}
     982            0 :             Err(err) if err.is_cancel() => {}
     983              : 
     984              :             // Alert on critical errors that indicate data corruption.
     985            0 :             Err(err) if err.is_critical() => {
     986            0 :                 critical!("could not compact, repartitioning keyspace failed: {err:?}");
     987              :             }
     988              : 
     989              :             // Log other errors. No partitioning? This is normal, if the timeline was just created
     990              :             // as an empty timeline. Also in unit tests, when we use the timeline as a simple
     991              :             // key-value store, ignoring the datadir layout. Log the error but continue.
     992            0 :             Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
     993              :         };
     994              : 
     995          728 :         let partition_count = self.partitioning.read().0.0.parts.len();
     996          728 : 
     997          728 :         // 4. Shard ancestor compaction
     998          728 : 
     999          728 :         if self.shard_identity.count >= ShardCount::new(2) {
    1000              :             // Limit the number of layer rewrites to the number of partitions: this means its
    1001              :             // runtime should be comparable to a full round of image layer creations, rather than
    1002              :             // being potentially much longer.
    1003            0 :             let rewrite_max = partition_count;
    1004            0 : 
    1005            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
    1006          728 :         }
    1007              : 
    1008          728 :         Ok(CompactionOutcome::Done)
    1009          728 :     }
    1010              : 
    1011              :     /// Check for layers that are elegible to be rewritten:
    1012              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
    1013              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
    1014              :     /// - For future use: layers beyond pitr_interval that are in formats we would
    1015              :     ///   rather not maintain compatibility with indefinitely.
    1016              :     ///
    1017              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
    1018              :     /// how much work it will try to do in each compaction pass.
    1019            0 :     async fn compact_shard_ancestors(
    1020            0 :         self: &Arc<Self>,
    1021            0 :         rewrite_max: usize,
    1022            0 :         ctx: &RequestContext,
    1023            0 :     ) -> Result<(), CompactionError> {
    1024            0 :         let mut drop_layers = Vec::new();
    1025            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
    1026            0 : 
    1027            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
    1028            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
    1029            0 :         // pitr_interval, for example because a branchpoint references it.
    1030            0 :         //
    1031            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
    1032            0 :         // are rewriting layers.
    1033            0 :         let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
    1034            0 : 
    1035            0 :         tracing::info!(
    1036            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
    1037            0 :             *latest_gc_cutoff,
    1038            0 :             self.gc_info.read().unwrap().cutoffs.time
    1039              :         );
    1040              : 
    1041            0 :         let layers = self.layers.read().await;
    1042            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
    1043            0 :             let layer = layers.get_from_desc(&layer_desc);
    1044            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
    1045              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
    1046            0 :                 continue;
    1047            0 :             }
    1048            0 : 
    1049            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
    1050            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
    1051            0 :             let layer_local_page_count = sharded_range.page_count();
    1052            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
    1053            0 :             if layer_local_page_count == 0 {
    1054              :                 // This ancestral layer only covers keys that belong to other shards.
    1055              :                 // We include the full metadata in the log: if we had some critical bug that caused
    1056              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
    1057            0 :                 info!(%layer, old_metadata=?layer.metadata(),
    1058            0 :                     "dropping layer after shard split, contains no keys for this shard.",
    1059              :                 );
    1060              : 
    1061            0 :                 if cfg!(debug_assertions) {
    1062              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
    1063              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
    1064              :                     // should be !is_key_disposable()
    1065            0 :                     let range = layer_desc.get_key_range();
    1066            0 :                     let mut key = range.start;
    1067            0 :                     while key < range.end {
    1068            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
    1069            0 :                         key = key.next();
    1070              :                     }
    1071            0 :                 }
    1072              : 
    1073            0 :                 drop_layers.push(layer);
    1074            0 :                 continue;
    1075            0 :             } else if layer_local_page_count != u32::MAX
    1076            0 :                 && layer_local_page_count == layer_raw_page_count
    1077              :             {
    1078            0 :                 debug!(%layer,
    1079            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
    1080              :                     layer_local_page_count
    1081              :                 );
    1082            0 :                 continue;
    1083            0 :             }
    1084            0 : 
    1085            0 :             // Don't bother re-writing a layer unless it will at least halve its size
    1086            0 :             if layer_local_page_count != u32::MAX
    1087            0 :                 && layer_local_page_count > layer_raw_page_count / 2
    1088              :             {
    1089            0 :                 debug!(%layer,
    1090            0 :                     "layer is already mostly local ({}/{}), not rewriting",
    1091              :                     layer_local_page_count,
    1092              :                     layer_raw_page_count
    1093              :                 );
    1094            0 :             }
    1095              : 
    1096              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
    1097              :             // without incurring the I/O cost of a rewrite.
    1098            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
    1099            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
    1100            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
    1101            0 :                 continue;
    1102            0 :             }
    1103            0 : 
    1104            0 :             if layer_desc.is_delta() {
    1105              :                 // We do not yet implement rewrite of delta layers
    1106            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
    1107            0 :                 continue;
    1108            0 :             }
    1109            0 : 
    1110            0 :             // Only rewrite layers if their generations differ.  This guarantees:
    1111            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
    1112            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
    1113            0 :             if layer.metadata().generation == self.generation {
    1114            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
    1115            0 :                 continue;
    1116            0 :             }
    1117            0 : 
    1118            0 :             if layers_to_rewrite.len() >= rewrite_max {
    1119            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
    1120            0 :                     layers_to_rewrite.len()
    1121              :                 );
    1122            0 :                 continue;
    1123            0 :             }
    1124            0 : 
    1125            0 :             // Fall through: all our conditions for doing a rewrite passed.
    1126            0 :             layers_to_rewrite.push(layer);
    1127              :         }
    1128              : 
    1129              :         // Drop read lock on layer map before we start doing time-consuming I/O
    1130            0 :         drop(layers);
    1131            0 : 
    1132            0 :         let mut replace_image_layers = Vec::new();
    1133              : 
    1134            0 :         for layer in layers_to_rewrite {
    1135            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
    1136            0 :             let mut image_layer_writer = ImageLayerWriter::new(
    1137            0 :                 self.conf,
    1138            0 :                 self.timeline_id,
    1139            0 :                 self.tenant_shard_id,
    1140            0 :                 &layer.layer_desc().key_range,
    1141            0 :                 layer.layer_desc().image_layer_lsn(),
    1142            0 :                 ctx,
    1143            0 :             )
    1144            0 :             .await
    1145            0 :             .map_err(CompactionError::Other)?;
    1146              : 
    1147              :             // Safety of layer rewrites:
    1148              :             // - We are writing to a different local file path than we are reading from, so the old Layer
    1149              :             //   cannot interfere with the new one.
    1150              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
    1151              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
    1152              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
    1153              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
    1154              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
    1155              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
    1156              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
    1157              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
    1158              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
    1159              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
    1160            0 :             let resident = layer.download_and_keep_resident(ctx).await?;
    1161              : 
    1162            0 :             let keys_written = resident
    1163            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
    1164            0 :                 .await?;
    1165              : 
    1166            0 :             if keys_written > 0 {
    1167            0 :                 let (desc, path) = image_layer_writer
    1168            0 :                     .finish(ctx)
    1169            0 :                     .await
    1170            0 :                     .map_err(CompactionError::Other)?;
    1171            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
    1172            0 :                     .map_err(CompactionError::Other)?;
    1173            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
    1174            0 :                     layer.metadata().file_size,
    1175            0 :                     new_layer.metadata().file_size);
    1176              : 
    1177            0 :                 replace_image_layers.push((layer, new_layer));
    1178            0 :             } else {
    1179            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
    1180            0 :                 // the layer has no data for us with the ShardedRange check above, but
    1181            0 :                 drop_layers.push(layer);
    1182            0 :             }
    1183              :         }
    1184              : 
    1185              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
    1186              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
    1187              :         // to remote index) and be removed. This is inefficient but safe.
    1188            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
    1189            0 : 
    1190            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
    1191            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
    1192            0 :             .await?;
    1193              : 
    1194            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
    1195            0 : 
    1196            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
    1197            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
    1198            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
    1199            0 :         // load.
    1200            0 :         match self.remote_client.wait_completion().await {
    1201            0 :             Ok(()) => (),
    1202            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
    1203              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
    1204            0 :                 return Err(CompactionError::ShuttingDown);
    1205              :             }
    1206              :         }
    1207              : 
    1208            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
    1209            0 : 
    1210            0 :         Ok(())
    1211            0 :     }
    1212              : 
    1213              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
    1214              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
    1215              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
    1216              :     ///
    1217              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
    1218              :     /// that we know won't be needed for reads.
    1219          468 :     pub(crate) async fn update_layer_visibility(
    1220          468 :         &self,
    1221          468 :     ) -> Result<(), super::layer_manager::Shutdown> {
    1222          468 :         let head_lsn = self.get_last_record_lsn();
    1223              : 
    1224              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
    1225              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
    1226              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
    1227              :         // they will be subject to L0->L1 compaction in the near future.
    1228          468 :         let layer_manager = self.layers.read().await;
    1229          468 :         let layer_map = layer_manager.layer_map()?;
    1230              : 
    1231          468 :         let readable_points = {
    1232          468 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
    1233          468 : 
    1234          468 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
    1235          468 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
    1236            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
    1237            0 :                     continue;
    1238            0 :                 }
    1239            0 :                 readable_points.push(*child_lsn);
    1240              :             }
    1241          468 :             readable_points.push(head_lsn);
    1242          468 :             readable_points
    1243          468 :         };
    1244          468 : 
    1245          468 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
    1246         1184 :         for (layer_desc, visibility) in layer_visibility {
    1247          716 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
    1248          716 :             let layer = layer_manager.get_from_desc(&layer_desc);
    1249          716 :             layer.set_visibility(visibility);
    1250          716 :         }
    1251              : 
    1252              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
    1253              :         // avoid assuming that everything at a branch point is visible.
    1254          468 :         drop(covered);
    1255          468 :         Ok(())
    1256          468 :     }
    1257              : 
    1258              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    1259              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
    1260          728 :     async fn compact_level0(
    1261          728 :         self: &Arc<Self>,
    1262          728 :         target_file_size: u64,
    1263          728 :         force_compaction_ignore_threshold: bool,
    1264          728 :         ctx: &RequestContext,
    1265          728 :     ) -> Result<CompactionOutcome, CompactionError> {
    1266              :         let CompactLevel0Phase1Result {
    1267          728 :             new_layers,
    1268          728 :             deltas_to_compact,
    1269          728 :             outcome,
    1270              :         } = {
    1271          728 :             let phase1_span = info_span!("compact_level0_phase1");
    1272          728 :             let ctx = ctx.attached_child();
    1273          728 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    1274          728 :                 version: Some(2),
    1275          728 :                 tenant_id: Some(self.tenant_shard_id),
    1276          728 :                 timeline_id: Some(self.timeline_id),
    1277          728 :                 ..Default::default()
    1278          728 :             };
    1279          728 : 
    1280          728 :             let begin = tokio::time::Instant::now();
    1281          728 :             let phase1_layers_locked = self.layers.read().await;
    1282          728 :             let now = tokio::time::Instant::now();
    1283          728 :             stats.read_lock_acquisition_micros =
    1284          728 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    1285          728 :             self.compact_level0_phase1(
    1286          728 :                 phase1_layers_locked,
    1287          728 :                 stats,
    1288          728 :                 target_file_size,
    1289          728 :                 force_compaction_ignore_threshold,
    1290          728 :                 &ctx,
    1291          728 :             )
    1292          728 :             .instrument(phase1_span)
    1293          728 :             .await?
    1294              :         };
    1295              : 
    1296          728 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    1297              :             // nothing to do
    1298          672 :             return Ok(CompactionOutcome::Done);
    1299           56 :         }
    1300           56 : 
    1301           56 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    1302           56 :             .await?;
    1303           56 :         Ok(outcome)
    1304          728 :     }
    1305              : 
    1306              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    1307          728 :     async fn compact_level0_phase1<'a>(
    1308          728 :         self: &'a Arc<Self>,
    1309          728 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
    1310          728 :         mut stats: CompactLevel0Phase1StatsBuilder,
    1311          728 :         target_file_size: u64,
    1312          728 :         force_compaction_ignore_threshold: bool,
    1313          728 :         ctx: &RequestContext,
    1314          728 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    1315          728 :         stats.read_lock_held_spawn_blocking_startup_micros =
    1316          728 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    1317          728 :         let layers = guard.layer_map()?;
    1318          728 :         let level0_deltas = layers.level0_deltas();
    1319          728 :         stats.level0_deltas_count = Some(level0_deltas.len());
    1320          728 : 
    1321          728 :         // Only compact if enough layers have accumulated.
    1322          728 :         let threshold = self.get_compaction_threshold();
    1323          728 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    1324          672 :             if force_compaction_ignore_threshold {
    1325            0 :                 if !level0_deltas.is_empty() {
    1326            0 :                     info!(
    1327            0 :                         level0_deltas = level0_deltas.len(),
    1328            0 :                         threshold, "too few deltas to compact, but forcing compaction"
    1329              :                     );
    1330              :                 } else {
    1331            0 :                     info!(
    1332            0 :                         level0_deltas = level0_deltas.len(),
    1333            0 :                         threshold, "too few deltas to compact, cannot force compaction"
    1334              :                     );
    1335            0 :                     return Ok(CompactLevel0Phase1Result::default());
    1336              :                 }
    1337              :             } else {
    1338          672 :                 debug!(
    1339            0 :                     level0_deltas = level0_deltas.len(),
    1340            0 :                     threshold, "too few deltas to compact"
    1341              :                 );
    1342          672 :                 return Ok(CompactLevel0Phase1Result::default());
    1343              :             }
    1344           56 :         }
    1345              : 
    1346           56 :         let mut level0_deltas = level0_deltas
    1347           56 :             .iter()
    1348          804 :             .map(|x| guard.get_from_desc(x))
    1349           56 :             .collect::<Vec<_>>();
    1350           56 : 
    1351           56 :         // Gather the files to compact in this iteration.
    1352           56 :         //
    1353           56 :         // Start with the oldest Level 0 delta file, and collect any other
    1354           56 :         // level 0 files that form a contiguous sequence, such that the end
    1355           56 :         // LSN of previous file matches the start LSN of the next file.
    1356           56 :         //
    1357           56 :         // Note that if the files don't form such a sequence, we might
    1358           56 :         // "compact" just a single file. That's a bit pointless, but it allows
    1359           56 :         // us to get rid of the level 0 file, and compact the other files on
    1360           56 :         // the next iteration. This could probably made smarter, but such
    1361           56 :         // "gaps" in the sequence of level 0 files should only happen in case
    1362           56 :         // of a crash, partial download from cloud storage, or something like
    1363           56 :         // that, so it's not a big deal in practice.
    1364         1496 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    1365           56 :         let mut level0_deltas_iter = level0_deltas.iter();
    1366           56 : 
    1367           56 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    1368           56 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    1369           56 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    1370           56 : 
    1371           56 :         // Accumulate the size of layers in `deltas_to_compact`
    1372           56 :         let mut deltas_to_compact_bytes = 0;
    1373           56 : 
    1374           56 :         // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
    1375           56 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
    1376           56 :         // work in this function to only operate on this much delta data at once.
    1377           56 :         //
    1378           56 :         // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
    1379           56 :         // the constraint is not respected, we use the larger of the two.
    1380           56 :         let delta_size_limit = std::cmp::max(
    1381           56 :             self.get_compaction_upper_limit(),
    1382           56 :             self.get_compaction_threshold(),
    1383           56 :         ) as u64
    1384           56 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
    1385           56 : 
    1386           56 :         let mut fully_compacted = true;
    1387           56 : 
    1388           56 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
    1389          804 :         for l in level0_deltas_iter {
    1390          748 :             let lsn_range = &l.layer_desc().lsn_range;
    1391          748 : 
    1392          748 :             if lsn_range.start != prev_lsn_end {
    1393            0 :                 break;
    1394          748 :             }
    1395          748 :             deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
    1396          748 :             deltas_to_compact_bytes += l.metadata().file_size;
    1397          748 :             prev_lsn_end = lsn_range.end;
    1398          748 : 
    1399          748 :             if deltas_to_compact_bytes >= delta_size_limit {
    1400            0 :                 info!(
    1401            0 :                     l0_deltas_selected = deltas_to_compact.len(),
    1402            0 :                     l0_deltas_total = level0_deltas.len(),
    1403            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
    1404              :                     delta_size_limit
    1405              :                 );
    1406            0 :                 fully_compacted = false;
    1407            0 : 
    1408            0 :                 // Proceed with compaction, but only a subset of L0s
    1409            0 :                 break;
    1410          748 :             }
    1411              :         }
    1412           56 :         let lsn_range = Range {
    1413           56 :             start: deltas_to_compact
    1414           56 :                 .first()
    1415           56 :                 .unwrap()
    1416           56 :                 .layer_desc()
    1417           56 :                 .lsn_range
    1418           56 :                 .start,
    1419           56 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    1420           56 :         };
    1421           56 : 
    1422           56 :         info!(
    1423            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    1424            0 :             lsn_range.start,
    1425            0 :             lsn_range.end,
    1426            0 :             deltas_to_compact.len(),
    1427            0 :             level0_deltas.len()
    1428              :         );
    1429              : 
    1430          804 :         for l in deltas_to_compact.iter() {
    1431          804 :             info!("compact includes {l}");
    1432              :         }
    1433              : 
    1434              :         // We don't need the original list of layers anymore. Drop it so that
    1435              :         // we don't accidentally use it later in the function.
    1436           56 :         drop(level0_deltas);
    1437           56 : 
    1438           56 :         stats.read_lock_held_prerequisites_micros = stats
    1439           56 :             .read_lock_held_spawn_blocking_startup_micros
    1440           56 :             .till_now();
    1441              : 
    1442              :         // TODO: replace with streaming k-merge
    1443           56 :         let all_keys = {
    1444           56 :             let mut all_keys = Vec::new();
    1445          804 :             for l in deltas_to_compact.iter() {
    1446          804 :                 if self.cancel.is_cancelled() {
    1447            0 :                     return Err(CompactionError::ShuttingDown);
    1448          804 :                 }
    1449          804 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1450          804 :                 let keys = delta
    1451          804 :                     .index_entries(ctx)
    1452          804 :                     .await
    1453          804 :                     .map_err(CompactionError::Other)?;
    1454          804 :                 all_keys.extend(keys);
    1455              :             }
    1456              :             // The current stdlib sorting implementation is designed in a way where it is
    1457              :             // particularly fast where the slice is made up of sorted sub-ranges.
    1458      8847544 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1459           56 :             all_keys
    1460           56 :         };
    1461           56 : 
    1462           56 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    1463              : 
    1464              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
    1465              :         //
    1466              :         // A hole is a key range for which this compaction doesn't have any WAL records.
    1467              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
    1468              :         // cover the hole, but actually don't contain any WAL records for that key range.
    1469              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
    1470              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
    1471              :         //
    1472              :         // The algorithm chooses holes as follows.
    1473              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
    1474              :         // - Filter: min threshold on range length
    1475              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
    1476              :         //
    1477              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
    1478              :         #[derive(PartialEq, Eq)]
    1479              :         struct Hole {
    1480              :             key_range: Range<Key>,
    1481              :             coverage_size: usize,
    1482              :         }
    1483           56 :         let holes: Vec<Hole> = {
    1484              :             use std::cmp::Ordering;
    1485              :             impl Ord for Hole {
    1486            0 :                 fn cmp(&self, other: &Self) -> Ordering {
    1487            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
    1488            0 :                 }
    1489              :             }
    1490              :             impl PartialOrd for Hole {
    1491            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    1492            0 :                     Some(self.cmp(other))
    1493            0 :                 }
    1494              :             }
    1495           56 :             let max_holes = deltas_to_compact.len();
    1496           56 :             let last_record_lsn = self.get_last_record_lsn();
    1497           56 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    1498           56 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
    1499           56 :             // min-heap (reserve space for one more element added before eviction)
    1500           56 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    1501           56 :             let mut prev: Option<Key> = None;
    1502              : 
    1503      4128076 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    1504      4128076 :                 if let Some(prev_key) = prev {
    1505              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
    1506              :                     // compaction is the gap between data key and metadata keys.
    1507      4128020 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
    1508            0 :                         && !Key::is_metadata_key(&prev_key)
    1509              :                     {
    1510            0 :                         let key_range = prev_key..next_key;
    1511            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
    1512            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
    1513            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    1514            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
    1515            0 :                         let coverage_size =
    1516            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
    1517            0 :                         if coverage_size >= min_hole_coverage_size {
    1518            0 :                             heap.push(Hole {
    1519            0 :                                 key_range,
    1520            0 :                                 coverage_size,
    1521            0 :                             });
    1522            0 :                             if heap.len() > max_holes {
    1523            0 :                                 heap.pop(); // remove smallest hole
    1524            0 :                             }
    1525            0 :                         }
    1526      4128020 :                     }
    1527           56 :                 }
    1528      4128076 :                 prev = Some(next_key.next());
    1529              :             }
    1530           56 :             let mut holes = heap.into_vec();
    1531           56 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1532           56 :             holes
    1533           56 :         };
    1534           56 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1535           56 :         drop_rlock(guard);
    1536           56 : 
    1537           56 :         if self.cancel.is_cancelled() {
    1538            0 :             return Err(CompactionError::ShuttingDown);
    1539           56 :         }
    1540           56 : 
    1541           56 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1542              : 
    1543              :         // This iterator walks through all key-value pairs from all the layers
    1544              :         // we're compacting, in key, LSN order.
    1545              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1546              :         // then the Value::Image is ordered before Value::WalRecord.
    1547           56 :         let mut all_values_iter = {
    1548           56 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1549          804 :             for l in deltas_to_compact.iter() {
    1550          804 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1551          804 :                 deltas.push(l);
    1552              :             }
    1553           56 :             MergeIterator::create(&deltas, &[], ctx)
    1554           56 :         };
    1555           56 : 
    1556           56 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1557           56 :         let mut all_keys_iter = all_keys
    1558           56 :             .iter()
    1559      4128076 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1560      4128020 :             .coalesce(|mut prev, cur| {
    1561      4128020 :                 // Coalesce keys that belong to the same key pair.
    1562      4128020 :                 // This ensures that compaction doesn't put them
    1563      4128020 :                 // into different layer files.
    1564      4128020 :                 // Still limit this by the target file size,
    1565      4128020 :                 // so that we keep the size of the files in
    1566      4128020 :                 // check.
    1567      4128020 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1568        80076 :                     prev.2 += cur.2;
    1569        80076 :                     Ok(prev)
    1570              :                 } else {
    1571      4047944 :                     Err((prev, cur))
    1572              :                 }
    1573      4128020 :             });
    1574           56 : 
    1575           56 :         // Merge the contents of all the input delta layers into a new set
    1576           56 :         // of delta layers, based on the current partitioning.
    1577           56 :         //
    1578           56 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1579           56 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1580           56 :         // would be too large. In that case, we also split on the LSN dimension.
    1581           56 :         //
    1582           56 :         // LSN
    1583           56 :         //  ^
    1584           56 :         //  |
    1585           56 :         //  | +-----------+            +--+--+--+--+
    1586           56 :         //  | |           |            |  |  |  |  |
    1587           56 :         //  | +-----------+            |  |  |  |  |
    1588           56 :         //  | |           |            |  |  |  |  |
    1589           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1590           56 :         //  | |           |            |  |  |  |  |
    1591           56 :         //  | +-----------+            |  |  |  |  |
    1592           56 :         //  | |           |            |  |  |  |  |
    1593           56 :         //  | +-----------+            +--+--+--+--+
    1594           56 :         //  |
    1595           56 :         //  +--------------> key
    1596           56 :         //
    1597           56 :         //
    1598           56 :         // If one key (X) has a lot of page versions:
    1599           56 :         //
    1600           56 :         // LSN
    1601           56 :         //  ^
    1602           56 :         //  |                                 (X)
    1603           56 :         //  | +-----------+            +--+--+--+--+
    1604           56 :         //  | |           |            |  |  |  |  |
    1605           56 :         //  | +-----------+            |  |  +--+  |
    1606           56 :         //  | |           |            |  |  |  |  |
    1607           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1608           56 :         //  | |           |            |  |  +--+  |
    1609           56 :         //  | +-----------+            |  |  |  |  |
    1610           56 :         //  | |           |            |  |  |  |  |
    1611           56 :         //  | +-----------+            +--+--+--+--+
    1612           56 :         //  |
    1613           56 :         //  +--------------> key
    1614           56 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1615           56 :         // based on the partitioning.
    1616           56 :         //
    1617           56 :         // TODO: we should also opportunistically materialize and
    1618           56 :         // garbage collect what we can.
    1619           56 :         let mut new_layers = Vec::new();
    1620           56 :         let mut prev_key: Option<Key> = None;
    1621           56 :         let mut writer: Option<DeltaLayerWriter> = None;
    1622           56 :         let mut key_values_total_size = 0u64;
    1623           56 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1624           56 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1625           56 :         let mut next_hole = 0; // index of next hole in holes vector
    1626           56 : 
    1627           56 :         let mut keys = 0;
    1628              : 
    1629      4128132 :         while let Some((key, lsn, value)) = all_values_iter
    1630      4128132 :             .next()
    1631      4128132 :             .await
    1632      4128132 :             .map_err(CompactionError::Other)?
    1633              :         {
    1634      4128076 :             keys += 1;
    1635      4128076 : 
    1636      4128076 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1637              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1638              :                 // shuffling an order of million keys per layer, this means we'll check it
    1639              :                 // around tens of times per layer.
    1640            0 :                 return Err(CompactionError::ShuttingDown);
    1641      4128076 :             }
    1642      4128076 : 
    1643      4128076 :             let same_key = prev_key == Some(key);
    1644      4128076 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1645      4128076 :             if !same_key || lsn == dup_end_lsn {
    1646      4048000 :                 let mut next_key_size = 0u64;
    1647      4048000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1648      4048000 :                 dup_start_lsn = Lsn::INVALID;
    1649      4048000 :                 if !same_key {
    1650      4048000 :                     dup_end_lsn = Lsn::INVALID;
    1651      4048000 :                 }
    1652              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1653      4048000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1654      4048000 :                     next_key_size = next_size;
    1655      4048000 :                     if key != next_key {
    1656      4047944 :                         if dup_end_lsn.is_valid() {
    1657            0 :                             // We are writting segment with duplicates:
    1658            0 :                             // place all remaining values of this key in separate segment
    1659            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1660            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1661      4047944 :                         }
    1662      4047944 :                         break;
    1663           56 :                     }
    1664           56 :                     key_values_total_size += next_size;
    1665           56 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1666           56 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1667           56 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1668              :                         // Split key between multiple layers: such layer can contain only single key
    1669            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1670            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1671              :                         } else {
    1672            0 :                             lsn // start with the first LSN for this key
    1673              :                         };
    1674            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1675            0 :                         break;
    1676           56 :                     }
    1677              :                 }
    1678              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1679      4048000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1680            0 :                     dup_start_lsn = dup_end_lsn;
    1681            0 :                     dup_end_lsn = lsn_range.end;
    1682      4048000 :                 }
    1683      4048000 :                 if writer.is_some() {
    1684      4047944 :                     let written_size = writer.as_mut().unwrap().size();
    1685      4047944 :                     let contains_hole =
    1686      4047944 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1687              :                     // check if key cause layer overflow or contains hole...
    1688      4047944 :                     if is_dup_layer
    1689      4047944 :                         || dup_end_lsn.is_valid()
    1690      4047944 :                         || written_size + key_values_total_size > target_file_size
    1691      4047384 :                         || contains_hole
    1692              :                     {
    1693              :                         // ... if so, flush previous layer and prepare to write new one
    1694          560 :                         let (desc, path) = writer
    1695          560 :                             .take()
    1696          560 :                             .unwrap()
    1697          560 :                             .finish(prev_key.unwrap().next(), ctx)
    1698          560 :                             .await
    1699          560 :                             .map_err(CompactionError::Other)?;
    1700          560 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1701          560 :                             .map_err(CompactionError::Other)?;
    1702              : 
    1703          560 :                         new_layers.push(new_delta);
    1704          560 :                         writer = None;
    1705          560 : 
    1706          560 :                         if contains_hole {
    1707            0 :                             // skip hole
    1708            0 :                             next_hole += 1;
    1709          560 :                         }
    1710      4047384 :                     }
    1711           56 :                 }
    1712              :                 // Remember size of key value because at next iteration we will access next item
    1713      4048000 :                 key_values_total_size = next_key_size;
    1714        80076 :             }
    1715      4128076 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1716            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1717            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1718            0 :                 )))
    1719      4128076 :             });
    1720              : 
    1721      4128076 :             if !self.shard_identity.is_key_disposable(&key) {
    1722      4128076 :                 if writer.is_none() {
    1723          616 :                     if self.cancel.is_cancelled() {
    1724              :                         // to be somewhat responsive to cancellation, check for each new layer
    1725            0 :                         return Err(CompactionError::ShuttingDown);
    1726          616 :                     }
    1727              :                     // Create writer if not initiaized yet
    1728          616 :                     writer = Some(
    1729              :                         DeltaLayerWriter::new(
    1730          616 :                             self.conf,
    1731          616 :                             self.timeline_id,
    1732          616 :                             self.tenant_shard_id,
    1733          616 :                             key,
    1734          616 :                             if dup_end_lsn.is_valid() {
    1735              :                                 // this is a layer containing slice of values of the same key
    1736            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1737            0 :                                 dup_start_lsn..dup_end_lsn
    1738              :                             } else {
    1739          616 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1740          616 :                                 lsn_range.clone()
    1741              :                             },
    1742          616 :                             ctx,
    1743          616 :                         )
    1744          616 :                         .await
    1745          616 :                         .map_err(CompactionError::Other)?,
    1746              :                     );
    1747              : 
    1748          616 :                     keys = 0;
    1749      4127460 :                 }
    1750              : 
    1751      4128076 :                 writer
    1752      4128076 :                     .as_mut()
    1753      4128076 :                     .unwrap()
    1754      4128076 :                     .put_value(key, lsn, value, ctx)
    1755      4128076 :                     .await
    1756      4128076 :                     .map_err(CompactionError::Other)?;
    1757              :             } else {
    1758            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1759            0 : 
    1760            0 :                 // This happens after a shard split, when we're compacting an L0 created by our parent shard
    1761            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1762              :             }
    1763              : 
    1764      4128076 :             if !new_layers.is_empty() {
    1765        39572 :                 fail_point!("after-timeline-compacted-first-L1");
    1766      4088504 :             }
    1767              : 
    1768      4128076 :             prev_key = Some(key);
    1769              :         }
    1770           56 :         if let Some(writer) = writer {
    1771           56 :             let (desc, path) = writer
    1772           56 :                 .finish(prev_key.unwrap().next(), ctx)
    1773           56 :                 .await
    1774           56 :                 .map_err(CompactionError::Other)?;
    1775           56 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1776           56 :                 .map_err(CompactionError::Other)?;
    1777           56 :             new_layers.push(new_delta);
    1778            0 :         }
    1779              : 
    1780              :         // Sync layers
    1781           56 :         if !new_layers.is_empty() {
    1782              :             // Print a warning if the created layer is larger than double the target size
    1783              :             // Add two pages for potential overhead. This should in theory be already
    1784              :             // accounted for in the target calculation, but for very small targets,
    1785              :             // we still might easily hit the limit otherwise.
    1786           56 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1787          616 :             for layer in new_layers.iter() {
    1788          616 :                 if layer.layer_desc().file_size > warn_limit {
    1789            0 :                     warn!(
    1790              :                         %layer,
    1791            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1792              :                     );
    1793          616 :                 }
    1794              :             }
    1795              : 
    1796              :             // The writer.finish() above already did the fsync of the inodes.
    1797              :             // We just need to fsync the directory in which these inodes are linked,
    1798              :             // which we know to be the timeline directory.
    1799              :             //
    1800              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1801              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1802              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1803           56 :             let timeline_dir = VirtualFile::open(
    1804           56 :                 &self
    1805           56 :                     .conf
    1806           56 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1807           56 :                 ctx,
    1808           56 :             )
    1809           56 :             .await
    1810           56 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1811           56 :             timeline_dir
    1812           56 :                 .sync_all()
    1813           56 :                 .await
    1814           56 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1815            0 :         }
    1816              : 
    1817           56 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1818           56 :         stats.new_deltas_count = Some(new_layers.len());
    1819          616 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1820           56 : 
    1821           56 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1822           56 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1823              :         {
    1824           56 :             Ok(stats_json) => {
    1825           56 :                 info!(
    1826            0 :                     stats_json = stats_json.as_str(),
    1827            0 :                     "compact_level0_phase1 stats available"
    1828              :                 )
    1829              :             }
    1830            0 :             Err(e) => {
    1831            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1832              :             }
    1833              :         }
    1834              : 
    1835              :         // Without this, rustc complains about deltas_to_compact still
    1836              :         // being borrowed when we `.into_iter()` below.
    1837           56 :         drop(all_values_iter);
    1838           56 : 
    1839           56 :         Ok(CompactLevel0Phase1Result {
    1840           56 :             new_layers,
    1841           56 :             deltas_to_compact: deltas_to_compact
    1842           56 :                 .into_iter()
    1843          804 :                 .map(|x| x.drop_eviction_guard())
    1844           56 :                 .collect::<Vec<_>>(),
    1845           56 :             outcome: if fully_compacted {
    1846           56 :                 CompactionOutcome::Done
    1847              :             } else {
    1848            0 :                 CompactionOutcome::Pending
    1849              :             },
    1850              :         })
    1851          728 :     }
    1852              : }
    1853              : 
    1854              : #[derive(Default)]
    1855              : struct CompactLevel0Phase1Result {
    1856              :     new_layers: Vec<ResidentLayer>,
    1857              :     deltas_to_compact: Vec<Layer>,
    1858              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1859              :     // L0 compaction size limit.
    1860              :     outcome: CompactionOutcome,
    1861              : }
    1862              : 
    1863              : #[derive(Default)]
    1864              : struct CompactLevel0Phase1StatsBuilder {
    1865              :     version: Option<u64>,
    1866              :     tenant_id: Option<TenantShardId>,
    1867              :     timeline_id: Option<TimelineId>,
    1868              :     read_lock_acquisition_micros: DurationRecorder,
    1869              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1870              :     read_lock_held_key_sort_micros: DurationRecorder,
    1871              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1872              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1873              :     read_lock_drop_micros: DurationRecorder,
    1874              :     write_layer_files_micros: DurationRecorder,
    1875              :     level0_deltas_count: Option<usize>,
    1876              :     new_deltas_count: Option<usize>,
    1877              :     new_deltas_size: Option<u64>,
    1878              : }
    1879              : 
    1880              : #[derive(serde::Serialize)]
    1881              : struct CompactLevel0Phase1Stats {
    1882              :     version: u64,
    1883              :     tenant_id: TenantShardId,
    1884              :     timeline_id: TimelineId,
    1885              :     read_lock_acquisition_micros: RecordedDuration,
    1886              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1887              :     read_lock_held_key_sort_micros: RecordedDuration,
    1888              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1889              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1890              :     read_lock_drop_micros: RecordedDuration,
    1891              :     write_layer_files_micros: RecordedDuration,
    1892              :     level0_deltas_count: usize,
    1893              :     new_deltas_count: usize,
    1894              :     new_deltas_size: u64,
    1895              : }
    1896              : 
    1897              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1898              :     type Error = anyhow::Error;
    1899              : 
    1900           56 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1901           56 :         Ok(Self {
    1902           56 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1903           56 :             tenant_id: value
    1904           56 :                 .tenant_id
    1905           56 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1906           56 :             timeline_id: value
    1907           56 :                 .timeline_id
    1908           56 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1909           56 :             read_lock_acquisition_micros: value
    1910           56 :                 .read_lock_acquisition_micros
    1911           56 :                 .into_recorded()
    1912           56 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1913           56 :             read_lock_held_spawn_blocking_startup_micros: value
    1914           56 :                 .read_lock_held_spawn_blocking_startup_micros
    1915           56 :                 .into_recorded()
    1916           56 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1917           56 :             read_lock_held_key_sort_micros: value
    1918           56 :                 .read_lock_held_key_sort_micros
    1919           56 :                 .into_recorded()
    1920           56 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1921           56 :             read_lock_held_prerequisites_micros: value
    1922           56 :                 .read_lock_held_prerequisites_micros
    1923           56 :                 .into_recorded()
    1924           56 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1925           56 :             read_lock_held_compute_holes_micros: value
    1926           56 :                 .read_lock_held_compute_holes_micros
    1927           56 :                 .into_recorded()
    1928           56 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1929           56 :             read_lock_drop_micros: value
    1930           56 :                 .read_lock_drop_micros
    1931           56 :                 .into_recorded()
    1932           56 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1933           56 :             write_layer_files_micros: value
    1934           56 :                 .write_layer_files_micros
    1935           56 :                 .into_recorded()
    1936           56 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1937           56 :             level0_deltas_count: value
    1938           56 :                 .level0_deltas_count
    1939           56 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1940           56 :             new_deltas_count: value
    1941           56 :                 .new_deltas_count
    1942           56 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1943           56 :             new_deltas_size: value
    1944           56 :                 .new_deltas_size
    1945           56 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1946              :         })
    1947           56 :     }
    1948              : }
    1949              : 
    1950              : impl Timeline {
    1951              :     /// Entry point for new tiered compaction algorithm.
    1952              :     ///
    1953              :     /// All the real work is in the implementation in the pageserver_compaction
    1954              :     /// crate. The code here would apply to any algorithm implemented by the
    1955              :     /// same interface, but tiered is the only one at the moment.
    1956              :     ///
    1957              :     /// TODO: cancellation
    1958            0 :     pub(crate) async fn compact_tiered(
    1959            0 :         self: &Arc<Self>,
    1960            0 :         _cancel: &CancellationToken,
    1961            0 :         ctx: &RequestContext,
    1962            0 :     ) -> Result<(), CompactionError> {
    1963            0 :         let fanout = self.get_compaction_threshold() as u64;
    1964            0 :         let target_file_size = self.get_checkpoint_distance();
    1965              : 
    1966              :         // Find the top of the historical layers
    1967            0 :         let end_lsn = {
    1968            0 :             let guard = self.layers.read().await;
    1969            0 :             let layers = guard.layer_map()?;
    1970              : 
    1971            0 :             let l0_deltas = layers.level0_deltas();
    1972            0 : 
    1973            0 :             // As an optimization, if we find that there are too few L0 layers,
    1974            0 :             // bail out early. We know that the compaction algorithm would do
    1975            0 :             // nothing in that case.
    1976            0 :             if l0_deltas.len() < fanout as usize {
    1977              :                 // doesn't need compacting
    1978            0 :                 return Ok(());
    1979            0 :             }
    1980            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1981            0 :         };
    1982            0 : 
    1983            0 :         // Is the timeline being deleted?
    1984            0 :         if self.is_stopping() {
    1985            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1986            0 :             return Err(CompactionError::ShuttingDown);
    1987            0 :         }
    1988              : 
    1989            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1990              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1991            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1992            0 : 
    1993            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1994            0 :             &mut adaptor,
    1995            0 :             end_lsn,
    1996            0 :             target_file_size,
    1997            0 :             fanout,
    1998            0 :             ctx,
    1999            0 :         )
    2000            0 :         .await
    2001              :         // TODO: compact_tiered needs to return CompactionError
    2002            0 :         .map_err(CompactionError::Other)?;
    2003              : 
    2004            0 :         adaptor.flush_updates().await?;
    2005            0 :         Ok(())
    2006            0 :     }
    2007              : 
    2008              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    2009              :     ///
    2010              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    2011              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    2012              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    2013              :     ///
    2014              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    2015              :     ///
    2016              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    2017              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    2018              :     ///
    2019              :     /// The function will produce:
    2020              :     ///
    2021              :     /// ```plain
    2022              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    2023              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    2024              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    2025              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    2026              :     /// ```
    2027              :     ///
    2028              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    2029         1260 :     pub(crate) async fn generate_key_retention(
    2030         1260 :         self: &Arc<Timeline>,
    2031         1260 :         key: Key,
    2032         1260 :         full_history: &[(Key, Lsn, Value)],
    2033         1260 :         horizon: Lsn,
    2034         1260 :         retain_lsn_below_horizon: &[Lsn],
    2035         1260 :         delta_threshold_cnt: usize,
    2036         1260 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    2037         1260 :     ) -> anyhow::Result<KeyHistoryRetention> {
    2038              :         // Pre-checks for the invariants
    2039              : 
    2040         1260 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2041              : 
    2042         1260 :         if debug_mode {
    2043         3060 :             for (log_key, _, _) in full_history {
    2044         1800 :                 assert_eq!(log_key, &key, "mismatched key");
    2045              :             }
    2046         1260 :             for i in 1..full_history.len() {
    2047          540 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    2048          540 :                 if full_history[i - 1].1 == full_history[i].1 {
    2049            0 :                     assert!(
    2050            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    2051            0 :                         "unordered delta/image, or duplicated delta"
    2052              :                     );
    2053          540 :                 }
    2054              :             }
    2055              :             // There was an assertion for no base image that checks if the first
    2056              :             // record in the history is `will_init` before, but it was removed.
    2057              :             // This is explained in the test cases for generate_key_retention.
    2058              :             // Search "incomplete history" for more information.
    2059         2820 :             for lsn in retain_lsn_below_horizon {
    2060         1560 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    2061              :             }
    2062         1260 :             for i in 1..retain_lsn_below_horizon.len() {
    2063          712 :                 assert!(
    2064          712 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    2065            0 :                     "unordered LSN"
    2066              :                 );
    2067              :             }
    2068            0 :         }
    2069         1260 :         let has_ancestor = base_img_from_ancestor.is_some();
    2070              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    2071              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    2072         1260 :         let (mut split_history, lsn_split_points) = {
    2073         1260 :             let mut split_history = Vec::new();
    2074         1260 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    2075         1260 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    2076         2820 :             for lsn in retain_lsn_below_horizon {
    2077         1560 :                 lsn_split_points.push(*lsn);
    2078         1560 :             }
    2079         1260 :             lsn_split_points.push(horizon);
    2080         1260 :             let mut current_idx = 0;
    2081         3060 :             for item @ (_, lsn, _) in full_history {
    2082         2288 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    2083          488 :                     current_idx += 1;
    2084          488 :                 }
    2085         1800 :                 split_history[current_idx].push(item);
    2086              :             }
    2087         1260 :             (split_history, lsn_split_points)
    2088              :         };
    2089              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    2090         5340 :         for split_for_lsn in &mut split_history {
    2091         4080 :             let mut prev_lsn = None;
    2092         4080 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    2093         4080 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    2094         1800 :                 if let Some(prev_lsn) = &prev_lsn {
    2095          236 :                     if *prev_lsn == lsn {
    2096              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    2097              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    2098              :                         // drop this delta and keep the image.
    2099              :                         //
    2100              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    2101              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    2102              :                         // dropped.
    2103              :                         //
    2104              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    2105              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    2106            0 :                         continue;
    2107          236 :                     }
    2108         1564 :                 }
    2109         1800 :                 prev_lsn = Some(lsn);
    2110         1800 :                 new_split_for_lsn.push(record);
    2111              :             }
    2112         4080 :             *split_for_lsn = new_split_for_lsn;
    2113              :         }
    2114              :         // Step 3: generate images when necessary
    2115         1260 :         let mut retention = Vec::with_capacity(split_history.len());
    2116         1260 :         let mut records_since_last_image = 0;
    2117         1260 :         let batch_cnt = split_history.len();
    2118         1260 :         assert!(
    2119         1260 :             batch_cnt >= 2,
    2120            0 :             "should have at least below + above horizon batches"
    2121              :         );
    2122         1260 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    2123         1260 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    2124           84 :             replay_history.push((key, lsn, Value::Image(img)));
    2125         1176 :         }
    2126              : 
    2127              :         /// Generate debug information for the replay history
    2128            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    2129              :             use std::fmt::Write;
    2130            0 :             let mut output = String::new();
    2131            0 :             if let Some((key, _, _)) = replay_history.first() {
    2132            0 :                 write!(output, "key={} ", key).unwrap();
    2133            0 :                 let mut cnt = 0;
    2134            0 :                 for (_, lsn, val) in replay_history {
    2135            0 :                     if val.is_image() {
    2136            0 :                         write!(output, "i@{} ", lsn).unwrap();
    2137            0 :                     } else if val.will_init() {
    2138            0 :                         write!(output, "di@{} ", lsn).unwrap();
    2139            0 :                     } else {
    2140            0 :                         write!(output, "d@{} ", lsn).unwrap();
    2141            0 :                     }
    2142            0 :                     cnt += 1;
    2143            0 :                     if cnt >= 128 {
    2144            0 :                         write!(output, "... and more").unwrap();
    2145            0 :                         break;
    2146            0 :                     }
    2147              :                 }
    2148            0 :             } else {
    2149            0 :                 write!(output, "<no history>").unwrap();
    2150            0 :             }
    2151            0 :             output
    2152            0 :         }
    2153              : 
    2154            0 :         fn generate_debug_trace(
    2155            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    2156            0 :             full_history: &[(Key, Lsn, Value)],
    2157            0 :             lsns: &[Lsn],
    2158            0 :             horizon: Lsn,
    2159            0 :         ) -> String {
    2160              :             use std::fmt::Write;
    2161            0 :             let mut output = String::new();
    2162            0 :             if let Some(replay_history) = replay_history {
    2163            0 :                 writeln!(
    2164            0 :                     output,
    2165            0 :                     "replay_history: {}",
    2166            0 :                     generate_history_trace(replay_history)
    2167            0 :                 )
    2168            0 :                 .unwrap();
    2169            0 :             } else {
    2170            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    2171            0 :             }
    2172            0 :             writeln!(
    2173            0 :                 output,
    2174            0 :                 "full_history: {}",
    2175            0 :                 generate_history_trace(full_history)
    2176            0 :             )
    2177            0 :             .unwrap();
    2178            0 :             writeln!(
    2179            0 :                 output,
    2180            0 :                 "when processing: [{}] horizon={}",
    2181            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    2182            0 :                 horizon
    2183            0 :             )
    2184            0 :             .unwrap();
    2185            0 :             output
    2186            0 :         }
    2187              : 
    2188         1260 :         let mut key_exists = false;
    2189         4080 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    2190              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    2191         4080 :             records_since_last_image += split_for_lsn.len();
    2192              :             // Whether to produce an image into the final layer files
    2193         4080 :             let produce_image = if i == 0 && !has_ancestor {
    2194              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    2195         1176 :                 true
    2196         2904 :             } else if i == batch_cnt - 1 {
    2197              :                 // Do not generate images for the last batch (above horizon)
    2198         1260 :                 false
    2199         1644 :             } else if records_since_last_image == 0 {
    2200         1288 :                 false
    2201          356 :             } else if records_since_last_image >= delta_threshold_cnt {
    2202              :                 // Generate images when there are too many records
    2203           12 :                 true
    2204              :             } else {
    2205          344 :                 false
    2206              :             };
    2207         4080 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    2208              :             // Only retain the items after the last image record
    2209         5028 :             for idx in (0..replay_history.len()).rev() {
    2210         5028 :                 if replay_history[idx].2.will_init() {
    2211         4080 :                     replay_history = replay_history[idx..].to_vec();
    2212         4080 :                     break;
    2213          948 :                 }
    2214              :             }
    2215         4080 :             if replay_history.is_empty() && !key_exists {
    2216              :                 // The key does not exist at earlier LSN, we can skip this iteration.
    2217            0 :                 retention.push(Vec::new());
    2218            0 :                 continue;
    2219         4080 :             } else {
    2220         4080 :                 key_exists = true;
    2221         4080 :             }
    2222         4080 :             let Some((_, _, val)) = replay_history.first() else {
    2223            0 :                 unreachable!("replay history should not be empty once it exists")
    2224              :             };
    2225         4080 :             if !val.will_init() {
    2226            0 :                 return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
    2227            0 :                     generate_debug_trace(
    2228            0 :                         Some(&replay_history),
    2229            0 :                         full_history,
    2230            0 :                         retain_lsn_below_horizon,
    2231            0 :                         horizon,
    2232            0 :                     )
    2233            0 :                 });
    2234         4080 :             }
    2235              :             // Whether to reconstruct the image. In debug mode, we will generate an image
    2236              :             // at every retain_lsn to ensure data is not corrupted, but we won't put the
    2237              :             // image into the final layer.
    2238         4080 :             let generate_image = produce_image || debug_mode;
    2239         4080 :             if produce_image {
    2240         1188 :                 records_since_last_image = 0;
    2241         2892 :             }
    2242         4080 :             let img_and_lsn = if generate_image {
    2243         4080 :                 let replay_history_for_debug = if debug_mode {
    2244         4080 :                     Some(replay_history.clone())
    2245              :                 } else {
    2246            0 :                     None
    2247              :                 };
    2248         4080 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    2249         4080 :                 let history = if produce_image {
    2250         1188 :                     std::mem::take(&mut replay_history)
    2251              :                 } else {
    2252         2892 :                     replay_history.clone()
    2253              :                 };
    2254         4080 :                 let mut img = None;
    2255         4080 :                 let mut records = Vec::with_capacity(history.len());
    2256         4080 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    2257         4036 :                     img = Some((*lsn, val.clone()));
    2258         4036 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    2259          920 :                         let Value::WalRecord(rec) = val else {
    2260            0 :                             return Err(anyhow::anyhow!(
    2261            0 :                                 "invalid record, first record is image, expect walrecords"
    2262            0 :                             ))
    2263            0 :                             .with_context(|| {
    2264            0 :                                 generate_debug_trace(
    2265            0 :                                     replay_history_for_debug_ref,
    2266            0 :                                     full_history,
    2267            0 :                                     retain_lsn_below_horizon,
    2268            0 :                                     horizon,
    2269            0 :                                 )
    2270            0 :                             });
    2271              :                         };
    2272          920 :                         records.push((lsn, rec));
    2273              :                     }
    2274              :                 } else {
    2275           72 :                     for (_, lsn, val) in history.into_iter() {
    2276           72 :                         let Value::WalRecord(rec) = val else {
    2277            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    2278            0 :                                 .with_context(|| generate_debug_trace(
    2279            0 :                                     replay_history_for_debug_ref,
    2280            0 :                                     full_history,
    2281            0 :                                     retain_lsn_below_horizon,
    2282            0 :                                     horizon,
    2283            0 :                                 ));
    2284              :                         };
    2285           72 :                         records.push((lsn, rec));
    2286              :                     }
    2287              :                 }
    2288         4080 :                 records.reverse();
    2289         4080 :                 let state = ValueReconstructState { img, records };
    2290              :                 // last batch does not generate image so i is always in range, unless we force generate
    2291              :                 // an image during testing
    2292         4080 :                 let request_lsn = if i >= lsn_split_points.len() {
    2293         1260 :                     Lsn::MAX
    2294              :                 } else {
    2295         2820 :                     lsn_split_points[i]
    2296              :                 };
    2297         4080 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    2298         4080 :                 Some((request_lsn, img))
    2299              :             } else {
    2300            0 :                 None
    2301              :             };
    2302         4080 :             if produce_image {
    2303         1188 :                 let (request_lsn, img) = img_and_lsn.unwrap();
    2304         1188 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    2305         1188 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    2306         2892 :             } else {
    2307         2892 :                 let deltas = split_for_lsn
    2308         2892 :                     .iter()
    2309         2892 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    2310         2892 :                     .collect_vec();
    2311         2892 :                 retention.push(deltas);
    2312         2892 :             }
    2313              :         }
    2314         1260 :         let mut result = Vec::with_capacity(retention.len());
    2315         1260 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    2316         4080 :         for (idx, logs) in retention.into_iter().enumerate() {
    2317         4080 :             if idx == lsn_split_points.len() {
    2318         1260 :                 return Ok(KeyHistoryRetention {
    2319         1260 :                     below_horizon: result,
    2320         1260 :                     above_horizon: KeyLogAtLsn(logs),
    2321         1260 :                 });
    2322         2820 :             } else {
    2323         2820 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    2324         2820 :             }
    2325              :         }
    2326            0 :         unreachable!("key retention is empty")
    2327         1260 :     }
    2328              : 
    2329              :     /// Check how much space is left on the disk
    2330          104 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    2331          104 :         let tenants_dir = self.conf.tenants_path();
    2332              : 
    2333          104 :         let stat = Statvfs::get(&tenants_dir, None)
    2334          104 :             .context("statvfs failed, presumably directory got unlinked")?;
    2335              : 
    2336          104 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    2337          104 : 
    2338          104 :         Ok(avail_bytes)
    2339          104 :     }
    2340              : 
    2341              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    2342              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    2343              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    2344              :     /// compaction.
    2345          104 :     async fn check_compaction_space(
    2346          104 :         self: &Arc<Self>,
    2347          104 :         layer_selection: &[Layer],
    2348          104 :     ) -> anyhow::Result<()> {
    2349          104 :         let available_space = self.check_available_space().await?;
    2350          104 :         let mut remote_layer_size = 0;
    2351          104 :         let mut all_layer_size = 0;
    2352          408 :         for layer in layer_selection {
    2353          304 :             let needs_download = layer.needs_download().await?;
    2354          304 :             if needs_download.is_some() {
    2355            0 :                 remote_layer_size += layer.layer_desc().file_size;
    2356          304 :             }
    2357          304 :             all_layer_size += layer.layer_desc().file_size;
    2358              :         }
    2359          104 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    2360          104 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    2361              :         {
    2362            0 :             return Err(anyhow!(
    2363            0 :                 "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    2364            0 :                 available_space,
    2365            0 :                 allocated_space,
    2366            0 :                 all_layer_size,
    2367            0 :                 remote_layer_size,
    2368            0 :                 all_layer_size + remote_layer_size
    2369            0 :             ));
    2370          104 :         }
    2371          104 :         Ok(())
    2372          104 :     }
    2373              : 
    2374              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    2375              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    2376              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    2377              :     /// here.
    2378          108 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    2379          108 :         let gc_cutoff_lsn = {
    2380          108 :             let gc_info = self.gc_info.read().unwrap();
    2381          108 :             gc_info.min_cutoff()
    2382          108 :         };
    2383          108 : 
    2384          108 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    2385          108 :         // let watermark = watermark.min(self.standby_horizon.load());
    2386          108 : 
    2387          108 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    2388          108 :         // them when computing the watermark.
    2389          108 :         gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
    2390          108 :     }
    2391              : 
    2392              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    2393              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    2394              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    2395              :     /// like a full compaction of the specified keyspace.
    2396            0 :     pub(crate) async fn gc_compaction_split_jobs(
    2397            0 :         self: &Arc<Self>,
    2398            0 :         job: GcCompactJob,
    2399            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    2400            0 :     ) -> anyhow::Result<Vec<GcCompactJob>> {
    2401            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    2402            0 :             job.compact_lsn_range.end
    2403              :         } else {
    2404            0 :             self.get_gc_compaction_watermark()
    2405              :         };
    2406              : 
    2407            0 :         if compact_below_lsn == Lsn::INVALID {
    2408            0 :             tracing::warn!(
    2409            0 :                 "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    2410              :             );
    2411            0 :             return Ok(vec![]);
    2412            0 :         }
    2413              : 
    2414              :         // Split compaction job to about 4GB each
    2415              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    2416            0 :         let sub_compaction_max_job_size_mb =
    2417            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    2418            0 : 
    2419            0 :         let mut compact_jobs = Vec::<GcCompactJob>::new();
    2420            0 :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    2421            0 :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    2422            0 :         let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
    2423              :         // Truncate the key range to be within user specified compaction range.
    2424            0 :         fn truncate_to(
    2425            0 :             source_start: &Key,
    2426            0 :             source_end: &Key,
    2427            0 :             target_start: &Key,
    2428            0 :             target_end: &Key,
    2429            0 :         ) -> Option<(Key, Key)> {
    2430            0 :             let start = source_start.max(target_start);
    2431            0 :             let end = source_end.min(target_end);
    2432            0 :             if start < end {
    2433            0 :                 Some((*start, *end))
    2434              :             } else {
    2435            0 :                 None
    2436              :             }
    2437            0 :         }
    2438            0 :         let mut split_key_ranges = Vec::new();
    2439            0 :         let ranges = dense_ks
    2440            0 :             .parts
    2441            0 :             .iter()
    2442            0 :             .map(|partition| partition.ranges.iter())
    2443            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    2444            0 :             .flatten()
    2445            0 :             .cloned()
    2446            0 :             .collect_vec();
    2447            0 :         for range in ranges.iter() {
    2448            0 :             let Some((start, end)) = truncate_to(
    2449            0 :                 &range.start,
    2450            0 :                 &range.end,
    2451            0 :                 &job.compact_key_range.start,
    2452            0 :                 &job.compact_key_range.end,
    2453            0 :             ) else {
    2454            0 :                 continue;
    2455              :             };
    2456            0 :             split_key_ranges.push((start, end));
    2457              :         }
    2458            0 :         split_key_ranges.sort();
    2459            0 :         let all_layers = {
    2460            0 :             let guard = self.layers.read().await;
    2461            0 :             let layer_map = guard.layer_map()?;
    2462            0 :             layer_map.iter_historic_layers().collect_vec()
    2463            0 :         };
    2464            0 :         let mut current_start = None;
    2465            0 :         let ranges_num = split_key_ranges.len();
    2466            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    2467            0 :             if current_start.is_none() {
    2468            0 :                 current_start = Some(start);
    2469            0 :             }
    2470            0 :             let start = current_start.unwrap();
    2471            0 :             if start >= end {
    2472              :                 // We have already processed this partition.
    2473            0 :                 continue;
    2474            0 :             }
    2475            0 :             let overlapping_layers = {
    2476            0 :                 let mut desc = Vec::new();
    2477            0 :                 for layer in all_layers.iter() {
    2478            0 :                     if overlaps_with(&layer.get_key_range(), &(start..end))
    2479            0 :                         && layer.get_lsn_range().start <= compact_below_lsn
    2480            0 :                     {
    2481            0 :                         desc.push(layer.clone());
    2482            0 :                     }
    2483              :                 }
    2484            0 :                 desc
    2485            0 :             };
    2486            0 :             let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
    2487            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    2488              :                 // Try to extend the compaction range so that we include at least one full layer file.
    2489            0 :                 let extended_end = overlapping_layers
    2490            0 :                     .iter()
    2491            0 :                     .map(|layer| layer.key_range.end)
    2492            0 :                     .min();
    2493              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    2494              :                 // In this case, we simply use the specified key range end.
    2495            0 :                 let end = if let Some(extended_end) = extended_end {
    2496            0 :                     extended_end.max(end)
    2497              :                 } else {
    2498            0 :                     end
    2499              :                 };
    2500            0 :                 let end = if ranges_num == idx + 1 {
    2501              :                     // extend the compaction range to the end of the key range if it's the last partition
    2502            0 :                     end.max(job.compact_key_range.end)
    2503              :                 } else {
    2504            0 :                     end
    2505              :                 };
    2506            0 :                 if total_size == 0 && !compact_jobs.is_empty() {
    2507            0 :                     info!(
    2508            0 :                         "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
    2509              :                         start, end, total_size
    2510              :                     );
    2511            0 :                     compact_jobs.last_mut().unwrap().compact_key_range.end = end;
    2512            0 :                     current_start = Some(end);
    2513              :                 } else {
    2514            0 :                     info!(
    2515            0 :                         "splitting compaction job: {}..{}, estimated_size={}",
    2516              :                         start, end, total_size
    2517              :                     );
    2518            0 :                     compact_jobs.push(GcCompactJob {
    2519            0 :                         dry_run: job.dry_run,
    2520            0 :                         compact_key_range: start..end,
    2521            0 :                         compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    2522            0 :                     });
    2523            0 :                     current_start = Some(end);
    2524              :                 }
    2525            0 :             }
    2526              :         }
    2527            0 :         Ok(compact_jobs)
    2528            0 :     }
    2529              : 
    2530              :     /// An experimental compaction building block that combines compaction with garbage collection.
    2531              :     ///
    2532              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    2533              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    2534              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    2535              :     /// and create delta layers with all deltas >= gc horizon.
    2536              :     ///
    2537              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    2538              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    2539              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    2540              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    2541              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    2542              :     /// part of the range.
    2543              :     ///
    2544              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    2545              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    2546          108 :     pub(crate) async fn compact_with_gc(
    2547          108 :         self: &Arc<Self>,
    2548          108 :         cancel: &CancellationToken,
    2549          108 :         options: CompactOptions,
    2550          108 :         ctx: &RequestContext,
    2551          108 :     ) -> anyhow::Result<()> {
    2552          108 :         let sub_compaction = options.sub_compaction;
    2553          108 :         let job = GcCompactJob::from_compact_options(options.clone());
    2554          108 :         if sub_compaction {
    2555            0 :             info!(
    2556            0 :                 "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
    2557              :             );
    2558            0 :             let jobs = self
    2559            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    2560            0 :                 .await?;
    2561            0 :             let jobs_len = jobs.len();
    2562            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    2563            0 :                 info!(
    2564            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    2565            0 :                     idx + 1,
    2566              :                     jobs_len
    2567              :                 );
    2568            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    2569              :             }
    2570            0 :             if jobs_len == 0 {
    2571            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    2572            0 :             }
    2573            0 :             return Ok(());
    2574          108 :         }
    2575          108 :         self.compact_with_gc_inner(cancel, job, ctx).await
    2576          108 :     }
    2577              : 
    2578          108 :     async fn compact_with_gc_inner(
    2579          108 :         self: &Arc<Self>,
    2580          108 :         cancel: &CancellationToken,
    2581          108 :         job: GcCompactJob,
    2582          108 :         ctx: &RequestContext,
    2583          108 :     ) -> anyhow::Result<()> {
    2584          108 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    2585          108 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    2586          108 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    2587          108 : 
    2588          108 :         let gc_lock = async {
    2589          108 :             tokio::select! {
    2590          108 :                 guard = self.gc_lock.lock() => Ok(guard),
    2591              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    2592          108 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    2593              :             }
    2594          108 :         };
    2595              : 
    2596          108 :         let gc_lock = crate::timed(
    2597          108 :             gc_lock,
    2598          108 :             "acquires gc lock",
    2599          108 :             std::time::Duration::from_secs(5),
    2600          108 :         )
    2601          108 :         .await?;
    2602              : 
    2603          108 :         let dry_run = job.dry_run;
    2604          108 :         let compact_key_range = job.compact_key_range;
    2605          108 :         let compact_lsn_range = job.compact_lsn_range;
    2606              : 
    2607          108 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2608              : 
    2609          108 :         info!(
    2610            0 :             "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
    2611              :             compact_key_range.start,
    2612              :             compact_key_range.end,
    2613              :             compact_lsn_range.start,
    2614              :             compact_lsn_range.end
    2615              :         );
    2616              : 
    2617          108 :         scopeguard::defer! {
    2618          108 :             info!("done enhanced gc bottom-most compaction");
    2619          108 :         };
    2620          108 : 
    2621          108 :         let mut stat = CompactionStatistics::default();
    2622              : 
    2623              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    2624              :         // The layer selection has the following properties:
    2625              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    2626              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    2627          104 :         let job_desc = {
    2628          108 :             let guard = self.layers.read().await;
    2629          108 :             let layers = guard.layer_map()?;
    2630          108 :             let gc_info = self.gc_info.read().unwrap();
    2631          108 :             let mut retain_lsns_below_horizon = Vec::new();
    2632          108 :             let gc_cutoff = {
    2633              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2634              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2635              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2636              :                 // to get the truth data.
    2637          108 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    2638              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2639              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2640              :                 // the real cutoff.
    2641          108 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2642           96 :                     if real_gc_cutoff == Lsn::INVALID {
    2643              :                         // If the gc_cutoff is not generated yet, we should not compact anything.
    2644            0 :                         tracing::warn!(
    2645            0 :                             "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
    2646              :                         );
    2647            0 :                         return Ok(());
    2648           96 :                     }
    2649           96 :                     real_gc_cutoff
    2650              :                 } else {
    2651           12 :                     compact_lsn_range.end
    2652              :                 };
    2653          108 :                 if gc_cutoff > real_gc_cutoff {
    2654            8 :                     warn!(
    2655            0 :                         "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
    2656              :                         gc_cutoff, real_gc_cutoff
    2657              :                     );
    2658            8 :                     gc_cutoff = real_gc_cutoff;
    2659          100 :                 }
    2660          108 :                 gc_cutoff
    2661              :             };
    2662          140 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2663          140 :                 if lsn < &gc_cutoff {
    2664          140 :                     retain_lsns_below_horizon.push(*lsn);
    2665          140 :                 }
    2666              :             }
    2667          108 :             for lsn in gc_info.leases.keys() {
    2668            0 :                 if lsn < &gc_cutoff {
    2669            0 :                     retain_lsns_below_horizon.push(*lsn);
    2670            0 :                 }
    2671              :             }
    2672          108 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2673          108 :             drop(gc_info);
    2674              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2675          108 :             let Some(max_layer_lsn) = layers
    2676          108 :                 .iter_historic_layers()
    2677          488 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2678          416 :                 .map(|desc| desc.get_lsn_range().end)
    2679          108 :                 .max()
    2680              :             else {
    2681            0 :                 info!(
    2682            0 :                     "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
    2683              :                     gc_cutoff
    2684              :                 );
    2685            0 :                 return Ok(());
    2686              :             };
    2687              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    2688              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    2689              :             // it is a branch.
    2690          108 :             let Some(min_layer_lsn) = layers
    2691          108 :                 .iter_historic_layers()
    2692          488 :                 .filter(|desc| {
    2693          488 :                     if compact_lsn_range.start == Lsn::INVALID {
    2694          396 :                         true // select all layers below if start == Lsn(0)
    2695              :                     } else {
    2696           92 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    2697              :                     }
    2698          488 :                 })
    2699          452 :                 .map(|desc| desc.get_lsn_range().start)
    2700          108 :                 .min()
    2701              :             else {
    2702            0 :                 info!(
    2703            0 :                     "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
    2704              :                     compact_lsn_range.end
    2705              :                 );
    2706            0 :                 return Ok(());
    2707              :             };
    2708              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2709              :             // layers to compact.
    2710          108 :             let mut rewrite_layers = Vec::new();
    2711          488 :             for desc in layers.iter_historic_layers() {
    2712          488 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2713          416 :                     && desc.get_lsn_range().start >= min_layer_lsn
    2714          380 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    2715              :                 {
    2716              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2717              :                     // even if it might contain extra keys
    2718          304 :                     selected_layers.push(guard.get_from_desc(&desc));
    2719          304 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2720          304 :                     // to overlap image layers)
    2721          304 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    2722            4 :                     {
    2723            4 :                         rewrite_layers.push(desc);
    2724          300 :                     }
    2725          184 :                 }
    2726              :             }
    2727          108 :             if selected_layers.is_empty() {
    2728            4 :                 info!(
    2729            0 :                     "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
    2730              :                     gc_cutoff, compact_key_range.start, compact_key_range.end
    2731              :                 );
    2732            4 :                 return Ok(());
    2733          104 :             }
    2734          104 :             retain_lsns_below_horizon.sort();
    2735          104 :             GcCompactionJobDescription {
    2736          104 :                 selected_layers,
    2737          104 :                 gc_cutoff,
    2738          104 :                 retain_lsns_below_horizon,
    2739          104 :                 min_layer_lsn,
    2740          104 :                 max_layer_lsn,
    2741          104 :                 compaction_key_range: compact_key_range,
    2742          104 :                 rewrite_layers,
    2743          104 :             }
    2744              :         };
    2745          104 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    2746              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    2747              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    2748           16 :             (true, job_desc.min_layer_lsn)
    2749           88 :         } else if self.ancestor_timeline.is_some() {
    2750              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    2751              :             // LSN ranges all the way to the ancestor timeline.
    2752            4 :             (true, self.ancestor_lsn)
    2753              :         } else {
    2754           84 :             let res = job_desc
    2755           84 :                 .retain_lsns_below_horizon
    2756           84 :                 .first()
    2757           84 :                 .copied()
    2758           84 :                 .unwrap_or(job_desc.gc_cutoff);
    2759           84 :             if debug_mode {
    2760           84 :                 assert_eq!(
    2761           84 :                     res,
    2762           84 :                     job_desc
    2763           84 :                         .retain_lsns_below_horizon
    2764           84 :                         .iter()
    2765           84 :                         .min()
    2766           84 :                         .copied()
    2767           84 :                         .unwrap_or(job_desc.gc_cutoff)
    2768           84 :                 );
    2769            0 :             }
    2770           84 :             (false, res)
    2771              :         };
    2772          104 :         info!(
    2773            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    2774            0 :             job_desc.selected_layers.len(),
    2775            0 :             job_desc.rewrite_layers.len(),
    2776              :             job_desc.max_layer_lsn,
    2777              :             job_desc.min_layer_lsn,
    2778              :             job_desc.gc_cutoff,
    2779              :             lowest_retain_lsn,
    2780              :             job_desc.compaction_key_range.start,
    2781              :             job_desc.compaction_key_range.end,
    2782              :             has_data_below,
    2783              :         );
    2784              : 
    2785          408 :         for layer in &job_desc.selected_layers {
    2786          304 :             debug!("read layer: {}", layer.layer_desc().key());
    2787              :         }
    2788          108 :         for layer in &job_desc.rewrite_layers {
    2789            4 :             debug!("rewrite layer: {}", layer.key());
    2790              :         }
    2791              : 
    2792          104 :         self.check_compaction_space(&job_desc.selected_layers)
    2793          104 :             .await?;
    2794              : 
    2795              :         // Generate statistics for the compaction
    2796          408 :         for layer in &job_desc.selected_layers {
    2797          304 :             let desc = layer.layer_desc();
    2798          304 :             if desc.is_delta() {
    2799          172 :                 stat.visit_delta_layer(desc.file_size());
    2800          172 :             } else {
    2801          132 :                 stat.visit_image_layer(desc.file_size());
    2802          132 :             }
    2803              :         }
    2804              : 
    2805              :         // Step 1: construct a k-merge iterator over all layers.
    2806              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2807          104 :         let layer_names = job_desc
    2808          104 :             .selected_layers
    2809          104 :             .iter()
    2810          304 :             .map(|layer| layer.layer_desc().layer_name())
    2811          104 :             .collect_vec();
    2812          104 :         if let Some(err) = check_valid_layermap(&layer_names) {
    2813            0 :             bail!(
    2814            0 :                 "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
    2815            0 :                 err
    2816            0 :             );
    2817          104 :         }
    2818          104 :         // The maximum LSN we are processing in this compaction loop
    2819          104 :         let end_lsn = job_desc
    2820          104 :             .selected_layers
    2821          104 :             .iter()
    2822          304 :             .map(|l| l.layer_desc().lsn_range.end)
    2823          104 :             .max()
    2824          104 :             .unwrap();
    2825          104 :         let mut delta_layers = Vec::new();
    2826          104 :         let mut image_layers = Vec::new();
    2827          104 :         let mut downloaded_layers = Vec::new();
    2828          104 :         let mut total_downloaded_size = 0;
    2829          104 :         let mut total_layer_size = 0;
    2830          408 :         for layer in &job_desc.selected_layers {
    2831          304 :             if layer.needs_download().await?.is_some() {
    2832            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    2833          304 :             }
    2834          304 :             total_layer_size += layer.layer_desc().file_size;
    2835          304 :             let resident_layer = layer.download_and_keep_resident(ctx).await?;
    2836          304 :             downloaded_layers.push(resident_layer);
    2837              :         }
    2838          104 :         info!(
    2839            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    2840            0 :             total_downloaded_size,
    2841            0 :             total_layer_size,
    2842            0 :             total_downloaded_size as f64 / total_layer_size as f64
    2843              :         );
    2844          408 :         for resident_layer in &downloaded_layers {
    2845          304 :             if resident_layer.layer_desc().is_delta() {
    2846          172 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2847          172 :                 delta_layers.push(layer);
    2848              :             } else {
    2849          132 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2850          132 :                 image_layers.push(layer);
    2851              :             }
    2852              :         }
    2853          104 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2854          104 :         let mut merge_iter = FilterIterator::create(
    2855          104 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2856          104 :             dense_ks,
    2857          104 :             sparse_ks,
    2858          104 :         )?;
    2859              : 
    2860              :         // Step 2: Produce images+deltas.
    2861          104 :         let mut accumulated_values = Vec::new();
    2862          104 :         let mut last_key: Option<Key> = None;
    2863              : 
    2864              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2865              :         // when some condition meet.
    2866          104 :         let mut image_layer_writer = if !has_data_below {
    2867              :             Some(
    2868           84 :                 SplitImageLayerWriter::new(
    2869           84 :                     self.conf,
    2870           84 :                     self.timeline_id,
    2871           84 :                     self.tenant_shard_id,
    2872           84 :                     job_desc.compaction_key_range.start,
    2873           84 :                     lowest_retain_lsn,
    2874           84 :                     self.get_compaction_target_size(),
    2875           84 :                     ctx,
    2876           84 :                 )
    2877           84 :                 .await?,
    2878              :             )
    2879              :         } else {
    2880           20 :             None
    2881              :         };
    2882              : 
    2883          104 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2884          104 :             self.conf,
    2885          104 :             self.timeline_id,
    2886          104 :             self.tenant_shard_id,
    2887          104 :             lowest_retain_lsn..end_lsn,
    2888          104 :             self.get_compaction_target_size(),
    2889          104 :         )
    2890          104 :         .await?;
    2891              : 
    2892              :         #[derive(Default)]
    2893              :         struct RewritingLayers {
    2894              :             before: Option<DeltaLayerWriter>,
    2895              :             after: Option<DeltaLayerWriter>,
    2896              :         }
    2897          104 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2898              : 
    2899              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    2900              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    2901              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    2902              :         ///
    2903              :         /// This function unifies the cases so that later code doesn't have to think about it.
    2904              :         ///
    2905              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2906              :         /// is needed for reconstruction. This should be fixed in the future.
    2907              :         ///
    2908              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2909              :         /// images.
    2910         1244 :         async fn get_ancestor_image(
    2911         1244 :             this_tline: &Arc<Timeline>,
    2912         1244 :             key: Key,
    2913         1244 :             ctx: &RequestContext,
    2914         1244 :             has_data_below: bool,
    2915         1244 :             history_lsn_point: Lsn,
    2916         1244 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2917         1244 :             if !has_data_below {
    2918         1168 :                 return Ok(None);
    2919           76 :             };
    2920              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2921              :             // as much existing code as possible.
    2922           76 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    2923           76 :             Ok(Some((key, history_lsn_point, img)))
    2924         1244 :         }
    2925              : 
    2926              :         // Actually, we can decide not to write to the image layer at all at this point because
    2927              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2928              :         // create this writer, and discard the writer in the end.
    2929              : 
    2930         1932 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2931         1828 :             if cancel.is_cancelled() {
    2932            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2933         1828 :             }
    2934         1828 :             if self.shard_identity.is_key_disposable(&key) {
    2935              :                 // If this shard does not need to store this key, simply skip it.
    2936              :                 //
    2937              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2938              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2939              :                 // a whole layer file as handling key spaces (ranges).
    2940            0 :                 if cfg!(debug_assertions) {
    2941            0 :                     let shard = self.shard_identity.shard_index();
    2942            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2943            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2944            0 :                 }
    2945            0 :                 continue;
    2946         1828 :             }
    2947         1828 :             if !job_desc.compaction_key_range.contains(&key) {
    2948          128 :                 if !desc.is_delta {
    2949          120 :                     continue;
    2950            8 :                 }
    2951            8 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2952            8 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2953            0 :                     if rewriter.before.is_none() {
    2954            0 :                         rewriter.before = Some(
    2955            0 :                             DeltaLayerWriter::new(
    2956            0 :                                 self.conf,
    2957            0 :                                 self.timeline_id,
    2958            0 :                                 self.tenant_shard_id,
    2959            0 :                                 desc.key_range.start,
    2960            0 :                                 desc.lsn_range.clone(),
    2961            0 :                                 ctx,
    2962            0 :                             )
    2963            0 :                             .await?,
    2964              :                         );
    2965            0 :                     }
    2966            0 :                     rewriter.before.as_mut().unwrap()
    2967            8 :                 } else if key >= job_desc.compaction_key_range.end {
    2968            8 :                     if rewriter.after.is_none() {
    2969            4 :                         rewriter.after = Some(
    2970            4 :                             DeltaLayerWriter::new(
    2971            4 :                                 self.conf,
    2972            4 :                                 self.timeline_id,
    2973            4 :                                 self.tenant_shard_id,
    2974            4 :                                 job_desc.compaction_key_range.end,
    2975            4 :                                 desc.lsn_range.clone(),
    2976            4 :                                 ctx,
    2977            4 :                             )
    2978            4 :                             .await?,
    2979              :                         );
    2980            4 :                     }
    2981            8 :                     rewriter.after.as_mut().unwrap()
    2982              :                 } else {
    2983            0 :                     unreachable!()
    2984              :                 };
    2985            8 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2986            8 :                 continue;
    2987         1700 :             }
    2988         1700 :             match val {
    2989         1220 :                 Value::Image(_) => stat.visit_image_key(&val),
    2990          480 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2991              :             }
    2992         1700 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2993          560 :                 if last_key.is_none() {
    2994          104 :                     last_key = Some(key);
    2995          456 :                 }
    2996          560 :                 accumulated_values.push((key, lsn, val));
    2997              :             } else {
    2998         1140 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2999         1140 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    3000         1140 :                 let retention = self
    3001         1140 :                     .generate_key_retention(
    3002         1140 :                         *last_key,
    3003         1140 :                         &accumulated_values,
    3004         1140 :                         job_desc.gc_cutoff,
    3005         1140 :                         &job_desc.retain_lsns_below_horizon,
    3006         1140 :                         COMPACTION_DELTA_THRESHOLD,
    3007         1140 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    3008         1140 :                             .await?,
    3009              :                     )
    3010         1140 :                     .await?;
    3011         1140 :                 retention
    3012         1140 :                     .pipe_to(
    3013         1140 :                         *last_key,
    3014         1140 :                         &mut delta_layer_writer,
    3015         1140 :                         image_layer_writer.as_mut(),
    3016         1140 :                         &mut stat,
    3017         1140 :                         ctx,
    3018         1140 :                     )
    3019         1140 :                     .await?;
    3020         1140 :                 accumulated_values.clear();
    3021         1140 :                 *last_key = key;
    3022         1140 :                 accumulated_values.push((key, lsn, val));
    3023              :             }
    3024              :         }
    3025              : 
    3026              :         // TODO: move the below part to the loop body
    3027          104 :         let last_key = last_key.expect("no keys produced during compaction");
    3028          104 :         stat.on_unique_key_visited();
    3029              : 
    3030          104 :         let retention = self
    3031          104 :             .generate_key_retention(
    3032          104 :                 last_key,
    3033          104 :                 &accumulated_values,
    3034          104 :                 job_desc.gc_cutoff,
    3035          104 :                 &job_desc.retain_lsns_below_horizon,
    3036          104 :                 COMPACTION_DELTA_THRESHOLD,
    3037          104 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
    3038              :             )
    3039          104 :             .await?;
    3040          104 :         retention
    3041          104 :             .pipe_to(
    3042          104 :                 last_key,
    3043          104 :                 &mut delta_layer_writer,
    3044          104 :                 image_layer_writer.as_mut(),
    3045          104 :                 &mut stat,
    3046          104 :                 ctx,
    3047          104 :             )
    3048          104 :             .await?;
    3049              :         // end: move the above part to the loop body
    3050              : 
    3051          104 :         let mut rewrote_delta_layers = Vec::new();
    3052          108 :         for (key, writers) in delta_layer_rewriters {
    3053            4 :             if let Some(delta_writer_before) = writers.before {
    3054            0 :                 let (desc, path) = delta_writer_before
    3055            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    3056            0 :                     .await?;
    3057            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    3058            0 :                 rewrote_delta_layers.push(layer);
    3059            4 :             }
    3060            4 :             if let Some(delta_writer_after) = writers.after {
    3061            4 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    3062            4 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    3063            4 :                 rewrote_delta_layers.push(layer);
    3064            0 :             }
    3065              :         }
    3066              : 
    3067          148 :         let discard = |key: &PersistentLayerKey| {
    3068          148 :             let key = key.clone();
    3069          148 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    3070          148 :         };
    3071              : 
    3072          104 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    3073           84 :             if !dry_run {
    3074           76 :                 let end_key = job_desc.compaction_key_range.end;
    3075           76 :                 writer
    3076           76 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    3077           76 :                     .await?
    3078              :             } else {
    3079            8 :                 drop(writer);
    3080            8 :                 Vec::new()
    3081              :             }
    3082              :         } else {
    3083           20 :             Vec::new()
    3084              :         };
    3085              : 
    3086          104 :         let produced_delta_layers = if !dry_run {
    3087           96 :             delta_layer_writer
    3088           96 :                 .finish_with_discard_fn(self, ctx, discard)
    3089           96 :                 .await?
    3090              :         } else {
    3091            8 :             drop(delta_layer_writer);
    3092            8 :             Vec::new()
    3093              :         };
    3094              : 
    3095              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    3096              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    3097          104 :         let mut compact_to = Vec::new();
    3098          104 :         let mut keep_layers = HashSet::new();
    3099          104 :         let produced_delta_layers_len = produced_delta_layers.len();
    3100          104 :         let produced_image_layers_len = produced_image_layers.len();
    3101          176 :         for action in produced_delta_layers {
    3102           72 :             match action {
    3103           44 :                 BatchWriterResult::Produced(layer) => {
    3104           44 :                     if cfg!(debug_assertions) {
    3105           44 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    3106            0 :                     }
    3107           44 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    3108           44 :                     compact_to.push(layer);
    3109              :                 }
    3110           28 :                 BatchWriterResult::Discarded(l) => {
    3111           28 :                     if cfg!(debug_assertions) {
    3112           28 :                         info!("discarded delta layer: {}", l);
    3113            0 :                     }
    3114           28 :                     keep_layers.insert(l);
    3115           28 :                     stat.discard_delta_layer();
    3116              :                 }
    3117              :             }
    3118              :         }
    3119          108 :         for layer in &rewrote_delta_layers {
    3120            4 :             debug!(
    3121            0 :                 "produced rewritten delta layer: {}",
    3122            0 :                 layer.layer_desc().key()
    3123              :             );
    3124              :         }
    3125          104 :         compact_to.extend(rewrote_delta_layers);
    3126          180 :         for action in produced_image_layers {
    3127           76 :             match action {
    3128           60 :                 BatchWriterResult::Produced(layer) => {
    3129           60 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    3130           60 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    3131           60 :                     compact_to.push(layer);
    3132              :                 }
    3133           16 :                 BatchWriterResult::Discarded(l) => {
    3134           16 :                     debug!("discarded image layer: {}", l);
    3135           16 :                     keep_layers.insert(l);
    3136           16 :                     stat.discard_image_layer();
    3137              :                 }
    3138              :             }
    3139              :         }
    3140              : 
    3141          104 :         let mut layer_selection = job_desc.selected_layers;
    3142              : 
    3143              :         // Partial compaction might select more data than it processes, e.g., if
    3144              :         // the compaction_key_range only partially overlaps:
    3145              :         //
    3146              :         //         [---compaction_key_range---]
    3147              :         //   [---A----][----B----][----C----][----D----]
    3148              :         //
    3149              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    3150              :         // the compaction key range, so we can always discard them. However, for image
    3151              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    3152              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    3153              :         //
    3154              :         // The created image layers contain whatever is needed from B, C, and from
    3155              :         // `----]` of A, and from  `[---` of D.
    3156              :         //
    3157              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    3158              :         // keep that data.
    3159              :         //
    3160              :         // The solution for now is to keep A and D completely if they are image layers.
    3161              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    3162              :         // is _not_ fully covered by compaction_key_range).
    3163          408 :         for layer in &layer_selection {
    3164          304 :             if !layer.layer_desc().is_delta() {
    3165          132 :                 if !overlaps_with(
    3166          132 :                     &layer.layer_desc().key_range,
    3167          132 :                     &job_desc.compaction_key_range,
    3168          132 :                 ) {
    3169            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    3170          132 :                 }
    3171          132 :                 if !fully_contains(
    3172          132 :                     &job_desc.compaction_key_range,
    3173          132 :                     &layer.layer_desc().key_range,
    3174          132 :                 ) {
    3175           16 :                     keep_layers.insert(layer.layer_desc().key());
    3176          116 :                 }
    3177          172 :             }
    3178              :         }
    3179              : 
    3180          304 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    3181          104 : 
    3182          104 :         info!(
    3183            0 :             "gc-compaction statistics: {}",
    3184            0 :             serde_json::to_string(&stat)?
    3185              :         );
    3186              : 
    3187          104 :         if dry_run {
    3188            8 :             return Ok(());
    3189           96 :         }
    3190           96 : 
    3191           96 :         info!(
    3192            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    3193            0 :             produced_delta_layers_len,
    3194            0 :             produced_image_layers_len,
    3195            0 :             keep_layers.len()
    3196              :         );
    3197              : 
    3198              :         // Step 3: Place back to the layer map.
    3199              : 
    3200              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    3201           96 :         let all_layers = {
    3202           96 :             let guard = self.layers.read().await;
    3203           96 :             let layer_map = guard.layer_map()?;
    3204           96 :             layer_map.iter_historic_layers().collect_vec()
    3205           96 :         };
    3206           96 : 
    3207           96 :         let mut final_layers = all_layers
    3208           96 :             .iter()
    3209          428 :             .map(|layer| layer.layer_name())
    3210           96 :             .collect::<HashSet<_>>();
    3211          304 :         for layer in &layer_selection {
    3212          208 :             final_layers.remove(&layer.layer_desc().layer_name());
    3213          208 :         }
    3214          204 :         for layer in &compact_to {
    3215          108 :             final_layers.insert(layer.layer_desc().layer_name());
    3216          108 :         }
    3217           96 :         let final_layers = final_layers.into_iter().collect_vec();
    3218              : 
    3219              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    3220              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    3221              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    3222           96 :         if let Some(err) = check_valid_layermap(&final_layers) {
    3223            0 :             bail!(
    3224            0 :                 "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
    3225            0 :                 err
    3226            0 :             );
    3227           96 :         }
    3228              : 
    3229              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    3230              :         // operate on L1 layers.
    3231              :         {
    3232              :             // Gc-compaction will rewrite the history of a key. This could happen in two ways:
    3233              :             //
    3234              :             // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
    3235              :             // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
    3236              :             // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
    3237              :             // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
    3238              :             // now gets stored in I at the compact LSN.
    3239              :             //
    3240              :             // ---------------                                       ---------------
    3241              :             //   delta1@LSN20                                         image1@LSN20
    3242              :             // ---------------  (read path collects delta@LSN20,  => ---------------  (read path cannot find anything
    3243              :             //   delta1@LSN10    yields)                                               below LSN 20)
    3244              :             // ---------------
    3245              :             //
    3246              :             // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
    3247              :             // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
    3248              :             // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
    3249              :             //
    3250              :             // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
    3251              :             // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
    3252              :             // layer containing 4, yields, and we update the layer map to put the delta layer.
    3253              :             //
    3254              :             // ---------------                                      ---------------
    3255              :             //   delta1@LSN4                                          image1@LSN4
    3256              :             // ---------------  (read path collects delta@LSN4,  => ---------------  (read path collects LSN4 and LSN1,
    3257              :             //  delta1@LSN1-3    yields)                              delta1@LSN1     which is an invalid history)
    3258              :             // ---------------                                      ---------------
    3259              :             //
    3260              :             // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
    3261              :             // and only allow reads to continue after the update is finished.
    3262              : 
    3263           96 :             let update_guard = self.gc_compaction_layer_update_lock.write().await;
    3264              :             // Acquiring the update guard ensures current read operations end and new read operations are blocked.
    3265              :             // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
    3266           96 :             let mut guard = self.layers.write().await;
    3267           96 :             guard
    3268           96 :                 .open_mut()?
    3269           96 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
    3270           96 :             drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
    3271           96 :         };
    3272           96 : 
    3273           96 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    3274           96 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    3275           96 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    3276           96 :         // be batched into `schedule_compaction_update`.
    3277           96 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    3278           96 :         self.schedule_uploads(disk_consistent_lsn, None)?;
    3279              :         // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
    3280              :         // of `compact_from`.
    3281           96 :         let compact_from = {
    3282           96 :             let mut compact_from = Vec::new();
    3283           96 :             let mut compact_to_set = HashMap::new();
    3284          204 :             for layer in &compact_to {
    3285          108 :                 compact_to_set.insert(layer.layer_desc().key(), layer);
    3286          108 :             }
    3287          304 :             for layer in &layer_selection {
    3288          208 :                 if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
    3289            0 :                     tracing::info!(
    3290            0 :                         "skipping delete {} because found same layer key at different generation {}",
    3291              :                         layer,
    3292              :                         to
    3293              :                     );
    3294          208 :                 } else {
    3295          208 :                     compact_from.push(layer.clone());
    3296          208 :                 }
    3297              :             }
    3298           96 :             compact_from
    3299           96 :         };
    3300           96 :         self.remote_client
    3301           96 :             .schedule_compaction_update(&compact_from, &compact_to)?;
    3302              : 
    3303           96 :         drop(gc_lock);
    3304           96 : 
    3305           96 :         Ok(())
    3306          108 :     }
    3307              : }
    3308              : 
    3309              : struct TimelineAdaptor {
    3310              :     timeline: Arc<Timeline>,
    3311              : 
    3312              :     keyspace: (Lsn, KeySpace),
    3313              : 
    3314              :     new_deltas: Vec<ResidentLayer>,
    3315              :     new_images: Vec<ResidentLayer>,
    3316              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    3317              : }
    3318              : 
    3319              : impl TimelineAdaptor {
    3320            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    3321            0 :         Self {
    3322            0 :             timeline: timeline.clone(),
    3323            0 :             keyspace,
    3324            0 :             new_images: Vec::new(),
    3325            0 :             new_deltas: Vec::new(),
    3326            0 :             layers_to_delete: Vec::new(),
    3327            0 :         }
    3328            0 :     }
    3329              : 
    3330            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    3331            0 :         let layers_to_delete = {
    3332            0 :             let guard = self.timeline.layers.read().await;
    3333            0 :             self.layers_to_delete
    3334            0 :                 .iter()
    3335            0 :                 .map(|x| guard.get_from_desc(x))
    3336            0 :                 .collect::<Vec<Layer>>()
    3337            0 :         };
    3338            0 :         self.timeline
    3339            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    3340            0 :             .await?;
    3341              : 
    3342            0 :         self.timeline
    3343            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    3344              : 
    3345            0 :         self.new_deltas.clear();
    3346            0 :         self.layers_to_delete.clear();
    3347            0 :         Ok(())
    3348            0 :     }
    3349              : }
    3350              : 
    3351              : #[derive(Clone)]
    3352              : struct ResidentDeltaLayer(ResidentLayer);
    3353              : #[derive(Clone)]
    3354              : struct ResidentImageLayer(ResidentLayer);
    3355              : 
    3356              : impl CompactionJobExecutor for TimelineAdaptor {
    3357              :     type Key = pageserver_api::key::Key;
    3358              : 
    3359              :     type Layer = OwnArc<PersistentLayerDesc>;
    3360              :     type DeltaLayer = ResidentDeltaLayer;
    3361              :     type ImageLayer = ResidentImageLayer;
    3362              : 
    3363              :     type RequestContext = crate::context::RequestContext;
    3364              : 
    3365            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    3366            0 :         self.timeline.get_shard_identity()
    3367            0 :     }
    3368              : 
    3369            0 :     async fn get_layers(
    3370            0 :         &mut self,
    3371            0 :         key_range: &Range<Key>,
    3372            0 :         lsn_range: &Range<Lsn>,
    3373            0 :         _ctx: &RequestContext,
    3374            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    3375            0 :         self.flush_updates().await?;
    3376              : 
    3377            0 :         let guard = self.timeline.layers.read().await;
    3378            0 :         let layer_map = guard.layer_map()?;
    3379              : 
    3380            0 :         let result = layer_map
    3381            0 :             .iter_historic_layers()
    3382            0 :             .filter(|l| {
    3383            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    3384            0 :             })
    3385            0 :             .map(OwnArc)
    3386            0 :             .collect();
    3387            0 :         Ok(result)
    3388            0 :     }
    3389              : 
    3390            0 :     async fn get_keyspace(
    3391            0 :         &mut self,
    3392            0 :         key_range: &Range<Key>,
    3393            0 :         lsn: Lsn,
    3394            0 :         _ctx: &RequestContext,
    3395            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    3396            0 :         if lsn == self.keyspace.0 {
    3397            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    3398            0 :                 &self.keyspace.1.ranges,
    3399            0 :                 key_range,
    3400            0 :             ))
    3401              :         } else {
    3402              :             // The current compaction implementation only ever requests the key space
    3403              :             // at the compaction end LSN.
    3404            0 :             anyhow::bail!("keyspace not available for requested lsn");
    3405              :         }
    3406            0 :     }
    3407              : 
    3408            0 :     async fn downcast_delta_layer(
    3409            0 :         &self,
    3410            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3411            0 :         ctx: &RequestContext,
    3412            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    3413            0 :         // this is a lot more complex than a simple downcast...
    3414            0 :         if layer.is_delta() {
    3415            0 :             let l = {
    3416            0 :                 let guard = self.timeline.layers.read().await;
    3417            0 :                 guard.get_from_desc(layer)
    3418              :             };
    3419            0 :             let result = l.download_and_keep_resident(ctx).await?;
    3420              : 
    3421            0 :             Ok(Some(ResidentDeltaLayer(result)))
    3422              :         } else {
    3423            0 :             Ok(None)
    3424              :         }
    3425            0 :     }
    3426              : 
    3427            0 :     async fn create_image(
    3428            0 :         &mut self,
    3429            0 :         lsn: Lsn,
    3430            0 :         key_range: &Range<Key>,
    3431            0 :         ctx: &RequestContext,
    3432            0 :     ) -> anyhow::Result<()> {
    3433            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    3434            0 :     }
    3435              : 
    3436            0 :     async fn create_delta(
    3437            0 :         &mut self,
    3438            0 :         lsn_range: &Range<Lsn>,
    3439            0 :         key_range: &Range<Key>,
    3440            0 :         input_layers: &[ResidentDeltaLayer],
    3441            0 :         ctx: &RequestContext,
    3442            0 :     ) -> anyhow::Result<()> {
    3443            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3444              : 
    3445            0 :         let mut all_entries = Vec::new();
    3446            0 :         for dl in input_layers.iter() {
    3447            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    3448              :         }
    3449              : 
    3450              :         // The current stdlib sorting implementation is designed in a way where it is
    3451              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3452            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3453              : 
    3454            0 :         let mut writer = DeltaLayerWriter::new(
    3455            0 :             self.timeline.conf,
    3456            0 :             self.timeline.timeline_id,
    3457            0 :             self.timeline.tenant_shard_id,
    3458            0 :             key_range.start,
    3459            0 :             lsn_range.clone(),
    3460            0 :             ctx,
    3461            0 :         )
    3462            0 :         .await?;
    3463              : 
    3464            0 :         let mut dup_values = 0;
    3465            0 : 
    3466            0 :         // This iterator walks through all key-value pairs from all the layers
    3467            0 :         // we're compacting, in key, LSN order.
    3468            0 :         let mut prev: Option<(Key, Lsn)> = None;
    3469              :         for &DeltaEntry {
    3470            0 :             key, lsn, ref val, ..
    3471            0 :         } in all_entries.iter()
    3472              :         {
    3473            0 :             if prev == Some((key, lsn)) {
    3474              :                 // This is a duplicate. Skip it.
    3475              :                 //
    3476              :                 // It can happen if compaction is interrupted after writing some
    3477              :                 // layers but not all, and we are compacting the range again.
    3478              :                 // The calculations in the algorithm assume that there are no
    3479              :                 // duplicates, so the math on targeted file size is likely off,
    3480              :                 // and we will create smaller files than expected.
    3481            0 :                 dup_values += 1;
    3482            0 :                 continue;
    3483            0 :             }
    3484              : 
    3485            0 :             let value = val.load(ctx).await?;
    3486              : 
    3487            0 :             writer.put_value(key, lsn, value, ctx).await?;
    3488              : 
    3489            0 :             prev = Some((key, lsn));
    3490              :         }
    3491              : 
    3492            0 :         if dup_values > 0 {
    3493            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    3494            0 :         }
    3495              : 
    3496            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3497            0 :             Err(anyhow::anyhow!(
    3498            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    3499            0 :             ))
    3500            0 :         });
    3501              : 
    3502            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    3503            0 :         let new_delta_layer =
    3504            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3505              : 
    3506            0 :         self.new_deltas.push(new_delta_layer);
    3507            0 :         Ok(())
    3508            0 :     }
    3509              : 
    3510            0 :     async fn delete_layer(
    3511            0 :         &mut self,
    3512            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3513            0 :         _ctx: &RequestContext,
    3514            0 :     ) -> anyhow::Result<()> {
    3515            0 :         self.layers_to_delete.push(layer.clone().0);
    3516            0 :         Ok(())
    3517            0 :     }
    3518              : }
    3519              : 
    3520              : impl TimelineAdaptor {
    3521            0 :     async fn create_image_impl(
    3522            0 :         &mut self,
    3523            0 :         lsn: Lsn,
    3524            0 :         key_range: &Range<Key>,
    3525            0 :         ctx: &RequestContext,
    3526            0 :     ) -> Result<(), CreateImageLayersError> {
    3527            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    3528              : 
    3529            0 :         let image_layer_writer = ImageLayerWriter::new(
    3530            0 :             self.timeline.conf,
    3531            0 :             self.timeline.timeline_id,
    3532            0 :             self.timeline.tenant_shard_id,
    3533            0 :             key_range,
    3534            0 :             lsn,
    3535            0 :             ctx,
    3536            0 :         )
    3537            0 :         .await?;
    3538              : 
    3539            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    3540            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3541            0 :                 "failpoint image-layer-writer-fail-before-finish"
    3542            0 :             )))
    3543            0 :         });
    3544              : 
    3545            0 :         let keyspace = KeySpace {
    3546            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    3547              :         };
    3548              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    3549            0 :         let outcome = self
    3550            0 :             .timeline
    3551            0 :             .create_image_layer_for_rel_blocks(
    3552            0 :                 &keyspace,
    3553            0 :                 image_layer_writer,
    3554            0 :                 lsn,
    3555            0 :                 ctx,
    3556            0 :                 key_range.clone(),
    3557            0 :                 IoConcurrency::sequential(),
    3558            0 :             )
    3559            0 :             .await?;
    3560              : 
    3561              :         if let ImageLayerCreationOutcome::Generated {
    3562            0 :             unfinished_image_layer,
    3563            0 :         } = outcome
    3564              :         {
    3565            0 :             let (desc, path) = unfinished_image_layer.finish(ctx).await?;
    3566            0 :             let image_layer =
    3567            0 :                 Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3568            0 :             self.new_images.push(image_layer);
    3569            0 :         }
    3570              : 
    3571            0 :         timer.stop_and_record();
    3572            0 : 
    3573            0 :         Ok(())
    3574            0 :     }
    3575              : }
    3576              : 
    3577              : impl CompactionRequestContext for crate::context::RequestContext {}
    3578              : 
    3579              : #[derive(Debug, Clone)]
    3580              : pub struct OwnArc<T>(pub Arc<T>);
    3581              : 
    3582              : impl<T> Deref for OwnArc<T> {
    3583              :     type Target = <Arc<T> as Deref>::Target;
    3584            0 :     fn deref(&self) -> &Self::Target {
    3585            0 :         &self.0
    3586            0 :     }
    3587              : }
    3588              : 
    3589              : impl<T> AsRef<T> for OwnArc<T> {
    3590            0 :     fn as_ref(&self) -> &T {
    3591            0 :         self.0.as_ref()
    3592            0 :     }
    3593              : }
    3594              : 
    3595              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    3596            0 :     fn key_range(&self) -> &Range<Key> {
    3597            0 :         &self.key_range
    3598            0 :     }
    3599            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3600            0 :         &self.lsn_range
    3601            0 :     }
    3602            0 :     fn file_size(&self) -> u64 {
    3603            0 :         self.file_size
    3604            0 :     }
    3605            0 :     fn short_id(&self) -> std::string::String {
    3606            0 :         self.as_ref().short_id().to_string()
    3607            0 :     }
    3608            0 :     fn is_delta(&self) -> bool {
    3609            0 :         self.as_ref().is_delta()
    3610            0 :     }
    3611              : }
    3612              : 
    3613              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    3614            0 :     fn key_range(&self) -> &Range<Key> {
    3615            0 :         &self.layer_desc().key_range
    3616            0 :     }
    3617            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3618            0 :         &self.layer_desc().lsn_range
    3619            0 :     }
    3620            0 :     fn file_size(&self) -> u64 {
    3621            0 :         self.layer_desc().file_size
    3622            0 :     }
    3623            0 :     fn short_id(&self) -> std::string::String {
    3624            0 :         self.layer_desc().short_id().to_string()
    3625            0 :     }
    3626            0 :     fn is_delta(&self) -> bool {
    3627            0 :         true
    3628            0 :     }
    3629              : }
    3630              : 
    3631              : use crate::tenant::timeline::DeltaEntry;
    3632              : 
    3633              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    3634            0 :     fn key_range(&self) -> &Range<Key> {
    3635            0 :         &self.0.layer_desc().key_range
    3636            0 :     }
    3637            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3638            0 :         &self.0.layer_desc().lsn_range
    3639            0 :     }
    3640            0 :     fn file_size(&self) -> u64 {
    3641            0 :         self.0.layer_desc().file_size
    3642            0 :     }
    3643            0 :     fn short_id(&self) -> std::string::String {
    3644            0 :         self.0.layer_desc().short_id().to_string()
    3645            0 :     }
    3646            0 :     fn is_delta(&self) -> bool {
    3647            0 :         true
    3648            0 :     }
    3649              : }
    3650              : 
    3651              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    3652              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    3653              : 
    3654            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    3655            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    3656            0 :     }
    3657              : }
    3658              : 
    3659              : impl CompactionLayer<Key> for ResidentImageLayer {
    3660            0 :     fn key_range(&self) -> &Range<Key> {
    3661            0 :         &self.0.layer_desc().key_range
    3662            0 :     }
    3663            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3664            0 :         &self.0.layer_desc().lsn_range
    3665            0 :     }
    3666            0 :     fn file_size(&self) -> u64 {
    3667            0 :         self.0.layer_desc().file_size
    3668            0 :     }
    3669            0 :     fn short_id(&self) -> std::string::String {
    3670            0 :         self.0.layer_desc().short_id().to_string()
    3671            0 :     }
    3672            0 :     fn is_delta(&self) -> bool {
    3673            0 :         false
    3674            0 :     }
    3675              : }
    3676              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta