LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 47d527da5e8405637e322911c55c08727c2fd272.info Lines: 55.6 % 2257 1256
Test Date: 2025-01-16 17:37:50 Functions: 39.8 % 161 64

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::key::KEY_SIZE;
      23              : use pageserver_api::keyspace::ShardedRange;
      24              : use pageserver_api::models::CompactInfoResponse;
      25              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      26              : use serde::Serialize;
      27              : use tokio_util::sync::CancellationToken;
      28              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      29              : use utils::id::TimelineId;
      30              : 
      31              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache;
      33              : use crate::statvfs::Statvfs;
      34              : use crate::tenant::checks::check_valid_layermap;
      35              : use crate::tenant::gc_block::GcBlock;
      36              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      37              : use crate::tenant::storage_layer::batch_split_writer::{
      38              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      39              : };
      40              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      41              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      42              : use crate::tenant::storage_layer::{
      43              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      44              : };
      45              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      46              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      47              : use crate::tenant::timeline::{Layer, ResidentLayer};
      48              : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
      49              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      50              : use pageserver_api::config::tenant_conf_defaults::{
      51              :     DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
      52              : };
      53              : 
      54              : use pageserver_api::key::Key;
      55              : use pageserver_api::keyspace::KeySpace;
      56              : use pageserver_api::record::NeonWalRecord;
      57              : use pageserver_api::value::Value;
      58              : 
      59              : use utils::lsn::Lsn;
      60              : 
      61              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      62              : use pageserver_compaction::interface::*;
      63              : 
      64              : use super::CompactionError;
      65              : 
      66              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      67              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      68              : 
      69              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
      70              : pub struct GcCompactionJobId(pub usize);
      71              : 
      72              : impl std::fmt::Display for GcCompactionJobId {
      73            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      74            0 :         write!(f, "{}", self.0)
      75            0 :     }
      76              : }
      77              : 
      78              : #[derive(Debug, Clone)]
      79              : pub enum GcCompactionQueueItem {
      80              :     Manual(CompactOptions),
      81              :     SubCompactionJob(CompactOptions),
      82              :     #[allow(dead_code)]
      83              :     UpdateL2Lsn(Lsn),
      84              :     Notify(GcCompactionJobId),
      85              : }
      86              : 
      87              : impl GcCompactionQueueItem {
      88            0 :     pub fn into_compact_info_resp(
      89            0 :         self,
      90            0 :         id: GcCompactionJobId,
      91            0 :         running: bool,
      92            0 :     ) -> Option<CompactInfoResponse> {
      93            0 :         match self {
      94            0 :             GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
      95            0 :                 compact_key_range: options.compact_key_range,
      96            0 :                 compact_lsn_range: options.compact_lsn_range,
      97            0 :                 sub_compaction: options.sub_compaction,
      98            0 :                 running,
      99            0 :                 job_id: id.0,
     100            0 :             }),
     101            0 :             GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
     102            0 :                 compact_key_range: options.compact_key_range,
     103            0 :                 compact_lsn_range: options.compact_lsn_range,
     104            0 :                 sub_compaction: options.sub_compaction,
     105            0 :                 running,
     106            0 :                 job_id: id.0,
     107            0 :             }),
     108            0 :             GcCompactionQueueItem::UpdateL2Lsn(_) => None,
     109            0 :             GcCompactionQueueItem::Notify(_) => None,
     110              :         }
     111            0 :     }
     112              : }
     113              : 
     114              : struct GcCompactionQueueInner {
     115              :     running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     116              :     queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     117              :     notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
     118              :     gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
     119              :     last_id: GcCompactionJobId,
     120              : }
     121              : 
     122              : impl GcCompactionQueueInner {
     123            0 :     fn next_id(&mut self) -> GcCompactionJobId {
     124            0 :         let id = self.last_id;
     125            0 :         self.last_id = GcCompactionJobId(id.0 + 1);
     126            0 :         id
     127            0 :     }
     128              : }
     129              : 
     130              : /// A structure to store gc_compaction jobs.
     131              : pub struct GcCompactionQueue {
     132              :     /// All items in the queue, and the currently-running job.
     133              :     inner: std::sync::Mutex<GcCompactionQueueInner>,
     134              :     /// Ensure only one thread is consuming the queue.
     135              :     consumer_lock: tokio::sync::Mutex<()>,
     136              : }
     137              : 
     138              : impl GcCompactionQueue {
     139            0 :     pub fn new() -> Self {
     140            0 :         GcCompactionQueue {
     141            0 :             inner: std::sync::Mutex::new(GcCompactionQueueInner {
     142            0 :                 running: None,
     143            0 :                 queued: VecDeque::new(),
     144            0 :                 notify: HashMap::new(),
     145            0 :                 gc_guards: HashMap::new(),
     146            0 :                 last_id: GcCompactionJobId(0),
     147            0 :             }),
     148            0 :             consumer_lock: tokio::sync::Mutex::new(()),
     149            0 :         }
     150            0 :     }
     151              : 
     152            0 :     pub fn cancel_scheduled(&self) {
     153            0 :         let mut guard = self.inner.lock().unwrap();
     154            0 :         guard.queued.clear();
     155            0 :         guard.notify.clear();
     156            0 :         guard.gc_guards.clear();
     157            0 :     }
     158              : 
     159              :     /// Schedule a manual compaction job.
     160            0 :     pub fn schedule_manual_compaction(
     161            0 :         &self,
     162            0 :         options: CompactOptions,
     163            0 :         notify: Option<tokio::sync::oneshot::Sender<()>>,
     164            0 :     ) -> GcCompactionJobId {
     165            0 :         let mut guard = self.inner.lock().unwrap();
     166            0 :         let id = guard.next_id();
     167            0 :         guard
     168            0 :             .queued
     169            0 :             .push_back((id, GcCompactionQueueItem::Manual(options)));
     170            0 :         if let Some(notify) = notify {
     171            0 :             guard.notify.insert(id, notify);
     172            0 :         }
     173            0 :         info!("scheduled compaction job id={}", id);
     174            0 :         id
     175            0 :     }
     176              : 
     177              :     /// Trigger an auto compaction.
     178              :     #[allow(dead_code)]
     179            0 :     pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
     180              : 
     181              :     /// Notify the caller the job has finished and unblock GC.
     182            0 :     fn notify_and_unblock(&self, id: GcCompactionJobId) {
     183            0 :         info!("compaction job id={} finished", id);
     184            0 :         let mut guard = self.inner.lock().unwrap();
     185            0 :         if let Some(blocking) = guard.gc_guards.remove(&id) {
     186            0 :             drop(blocking)
     187            0 :         }
     188            0 :         if let Some(tx) = guard.notify.remove(&id) {
     189            0 :             let _ = tx.send(());
     190            0 :         }
     191            0 :     }
     192              : 
     193            0 :     async fn handle_sub_compaction(
     194            0 :         &self,
     195            0 :         id: GcCompactionJobId,
     196            0 :         options: CompactOptions,
     197            0 :         timeline: &Arc<Timeline>,
     198            0 :         gc_block: &GcBlock,
     199            0 :     ) -> Result<(), CompactionError> {
     200            0 :         info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
     201            0 :         let jobs: Vec<GcCompactJob> = timeline
     202            0 :             .gc_compaction_split_jobs(
     203            0 :                 GcCompactJob::from_compact_options(options.clone()),
     204            0 :                 options.sub_compaction_max_job_size_mb,
     205            0 :             )
     206            0 :             .await
     207            0 :             .map_err(CompactionError::Other)?;
     208            0 :         if jobs.is_empty() {
     209            0 :             info!("no jobs to run, skipping scheduled compaction task");
     210            0 :             self.notify_and_unblock(id);
     211              :         } else {
     212            0 :             let gc_guard = match gc_block.start().await {
     213            0 :                 Ok(guard) => guard,
     214            0 :                 Err(e) => {
     215            0 :                     return Err(CompactionError::Other(anyhow!(
     216            0 :                         "cannot run gc-compaction because gc is blocked: {}",
     217            0 :                         e
     218            0 :                     )));
     219              :                 }
     220              :             };
     221              : 
     222            0 :             let jobs_len = jobs.len();
     223            0 :             let mut pending_tasks = Vec::new();
     224            0 :             for job in jobs {
     225              :                 // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
     226              :                 // until we do further refactors to allow directly call `compact_with_gc`.
     227            0 :                 let mut flags: EnumSet<CompactFlags> = EnumSet::default();
     228            0 :                 flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     229            0 :                 if job.dry_run {
     230            0 :                     flags |= CompactFlags::DryRun;
     231            0 :                 }
     232            0 :                 let options = CompactOptions {
     233            0 :                     flags,
     234            0 :                     sub_compaction: false,
     235            0 :                     compact_key_range: Some(job.compact_key_range.into()),
     236            0 :                     compact_lsn_range: Some(job.compact_lsn_range.into()),
     237            0 :                     sub_compaction_max_job_size_mb: None,
     238            0 :                 };
     239            0 :                 pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
     240              :             }
     241            0 :             pending_tasks.push(GcCompactionQueueItem::Notify(id));
     242            0 :             {
     243            0 :                 let mut guard = self.inner.lock().unwrap();
     244            0 :                 guard.gc_guards.insert(id, gc_guard);
     245            0 :                 let mut tasks = Vec::new();
     246            0 :                 for task in pending_tasks {
     247            0 :                     let id = guard.next_id();
     248            0 :                     tasks.push((id, task));
     249            0 :                 }
     250            0 :                 tasks.reverse();
     251            0 :                 for item in tasks {
     252            0 :                     guard.queued.push_front(item);
     253            0 :                 }
     254              :             }
     255            0 :             info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
     256              :         }
     257            0 :         Ok(())
     258            0 :     }
     259              : 
     260              :     /// Take a job from the queue and process it. Returns if there are still pending tasks.
     261            0 :     pub async fn iteration(
     262            0 :         &self,
     263            0 :         cancel: &CancellationToken,
     264            0 :         ctx: &RequestContext,
     265            0 :         gc_block: &GcBlock,
     266            0 :         timeline: &Arc<Timeline>,
     267            0 :     ) -> Result<bool, CompactionError> {
     268            0 :         let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
     269              :         let has_pending_tasks;
     270            0 :         let (id, item) = {
     271            0 :             let mut guard = self.inner.lock().unwrap();
     272            0 :             let Some((id, item)) = guard.queued.pop_front() else {
     273            0 :                 return Ok(false);
     274              :             };
     275            0 :             guard.running = Some((id, item.clone()));
     276            0 :             has_pending_tasks = !guard.queued.is_empty();
     277            0 :             (id, item)
     278            0 :         };
     279            0 : 
     280            0 :         match item {
     281            0 :             GcCompactionQueueItem::Manual(options) => {
     282            0 :                 if !options
     283            0 :                     .flags
     284            0 :                     .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     285              :                 {
     286            0 :                     warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
     287            0 :                 } else if options.sub_compaction {
     288            0 :                     self.handle_sub_compaction(id, options, timeline, gc_block)
     289            0 :                         .await?;
     290              :                 } else {
     291            0 :                     let gc_guard = match gc_block.start().await {
     292            0 :                         Ok(guard) => guard,
     293            0 :                         Err(e) => {
     294            0 :                             return Err(CompactionError::Other(anyhow!(
     295            0 :                                 "cannot run gc-compaction because gc is blocked: {}",
     296            0 :                                 e
     297            0 :                             )));
     298              :                         }
     299              :                     };
     300            0 :                     {
     301            0 :                         let mut guard = self.inner.lock().unwrap();
     302            0 :                         guard.gc_guards.insert(id, gc_guard);
     303            0 :                     }
     304            0 :                     let _ = timeline
     305            0 :                         .compact_with_options(cancel, options, ctx)
     306            0 :                         .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
     307            0 :                         .await?;
     308            0 :                     self.notify_and_unblock(id);
     309              :                 }
     310              :             }
     311            0 :             GcCompactionQueueItem::SubCompactionJob(options) => {
     312            0 :                 let _ = timeline
     313            0 :                     .compact_with_options(cancel, options, ctx)
     314            0 :                     .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
     315            0 :                     .await?;
     316              :             }
     317            0 :             GcCompactionQueueItem::Notify(id) => {
     318            0 :                 self.notify_and_unblock(id);
     319            0 :             }
     320              :             GcCompactionQueueItem::UpdateL2Lsn(_) => {
     321            0 :                 unreachable!()
     322              :             }
     323              :         }
     324            0 :         {
     325            0 :             let mut guard = self.inner.lock().unwrap();
     326            0 :             guard.running = None;
     327            0 :         }
     328            0 :         Ok(has_pending_tasks)
     329            0 :     }
     330              : 
     331              :     #[allow(clippy::type_complexity)]
     332            0 :     pub fn remaining_jobs(
     333            0 :         &self,
     334            0 :     ) -> (
     335            0 :         Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     336            0 :         VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     337            0 :     ) {
     338            0 :         let guard = self.inner.lock().unwrap();
     339            0 :         (guard.running.clone(), guard.queued.clone())
     340            0 :     }
     341              : 
     342              :     #[allow(dead_code)]
     343            0 :     pub fn remaining_jobs_num(&self) -> usize {
     344            0 :         let guard = self.inner.lock().unwrap();
     345            0 :         guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
     346            0 :     }
     347              : }
     348              : 
     349              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
     350              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
     351              : /// called.
     352              : #[derive(Debug, Clone)]
     353              : pub(crate) struct GcCompactJob {
     354              :     pub dry_run: bool,
     355              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
     356              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
     357              :     pub compact_key_range: Range<Key>,
     358              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
     359              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
     360              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
     361              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
     362              :     pub compact_lsn_range: Range<Lsn>,
     363              : }
     364              : 
     365              : impl GcCompactJob {
     366           54 :     pub fn from_compact_options(options: CompactOptions) -> Self {
     367           54 :         GcCompactJob {
     368           54 :             dry_run: options.flags.contains(CompactFlags::DryRun),
     369           54 :             compact_key_range: options
     370           54 :                 .compact_key_range
     371           54 :                 .map(|x| x.into())
     372           54 :                 .unwrap_or(Key::MIN..Key::MAX),
     373           54 :             compact_lsn_range: options
     374           54 :                 .compact_lsn_range
     375           54 :                 .map(|x| x.into())
     376           54 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     377           54 :         }
     378           54 :     }
     379              : }
     380              : 
     381              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     382              : /// and contains the exact layers we want to compact.
     383              : pub struct GcCompactionJobDescription {
     384              :     /// All layers to read in the compaction job
     385              :     selected_layers: Vec<Layer>,
     386              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     387              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     388              :     gc_cutoff: Lsn,
     389              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     390              :     /// generate an image == this LSN.
     391              :     retain_lsns_below_horizon: Vec<Lsn>,
     392              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     393              :     /// \>= this LSN will be kept and will not be rewritten.
     394              :     max_layer_lsn: Lsn,
     395              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     396              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     397              :     /// k-merge within gc-compaction.
     398              :     min_layer_lsn: Lsn,
     399              :     /// Only compact layers overlapping with this range.
     400              :     compaction_key_range: Range<Key>,
     401              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     402              :     /// This field is here solely for debugging. The field will not be read once the compaction
     403              :     /// description is generated.
     404              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     405              : }
     406              : 
     407              : /// The result of bottom-most compaction for a single key at each LSN.
     408              : #[derive(Debug)]
     409              : #[cfg_attr(test, derive(PartialEq))]
     410              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     411              : 
     412              : /// The result of bottom-most compaction.
     413              : #[derive(Debug)]
     414              : #[cfg_attr(test, derive(PartialEq))]
     415              : pub(crate) struct KeyHistoryRetention {
     416              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     417              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     418              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     419              :     pub(crate) above_horizon: KeyLogAtLsn,
     420              : }
     421              : 
     422              : impl KeyHistoryRetention {
     423              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     424              :     ///
     425              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     426              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     427              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     428              :     /// Then we delete branch @ 0x20.
     429              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     430              :     /// And that wouldnt' change the shape of the layer.
     431              :     ///
     432              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     433              :     ///
     434              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     435           74 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     436           74 :         if dry_run {
     437            0 :             return true;
     438           74 :         }
     439           74 :         let guard = tline.layers.read().await;
     440           74 :         if !guard.contains_key(key) {
     441           52 :             return false;
     442           22 :         }
     443           22 :         let layer_generation = guard.get_from_key(key).metadata().generation;
     444           22 :         drop(guard);
     445           22 :         if layer_generation == tline.generation {
     446           22 :             info!(
     447              :                 key=%key,
     448              :                 ?layer_generation,
     449            0 :                 "discard layer due to duplicated layer key in the same generation",
     450              :             );
     451           22 :             true
     452              :         } else {
     453            0 :             false
     454              :         }
     455           74 :     }
     456              : 
     457              :     /// Pipe a history of a single key to the writers.
     458              :     ///
     459              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     460              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     461              :     #[allow(clippy::too_many_arguments)]
     462          622 :     async fn pipe_to(
     463          622 :         self,
     464          622 :         key: Key,
     465          622 :         delta_writer: &mut SplitDeltaLayerWriter,
     466          622 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     467          622 :         stat: &mut CompactionStatistics,
     468          622 :         ctx: &RequestContext,
     469          622 :     ) -> anyhow::Result<()> {
     470          622 :         let mut first_batch = true;
     471         2012 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     472         1390 :             if first_batch {
     473          622 :                 if logs.len() == 1 && logs[0].1.is_image() {
     474          584 :                     let Value::Image(img) = &logs[0].1 else {
     475            0 :                         unreachable!()
     476              :                     };
     477          584 :                     stat.produce_image_key(img);
     478          584 :                     if let Some(image_writer) = image_writer.as_mut() {
     479          584 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     480              :                     } else {
     481            0 :                         delta_writer
     482            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     483            0 :                             .await?;
     484              :                     }
     485              :                 } else {
     486           66 :                     for (lsn, val) in logs {
     487           28 :                         stat.produce_key(&val);
     488           28 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     489              :                     }
     490              :                 }
     491          622 :                 first_batch = false;
     492              :             } else {
     493          884 :                 for (lsn, val) in logs {
     494          116 :                     stat.produce_key(&val);
     495          116 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     496              :                 }
     497              :             }
     498              :         }
     499          622 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     500          680 :         for (lsn, val) in above_horizon_logs {
     501           58 :             stat.produce_key(&val);
     502           58 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     503              :         }
     504          622 :         Ok(())
     505          622 :     }
     506              : }
     507              : 
     508              : #[derive(Debug, Serialize, Default)]
     509              : struct CompactionStatisticsNumSize {
     510              :     num: u64,
     511              :     size: u64,
     512              : }
     513              : 
     514              : #[derive(Debug, Serialize, Default)]
     515              : pub struct CompactionStatistics {
     516              :     delta_layer_visited: CompactionStatisticsNumSize,
     517              :     image_layer_visited: CompactionStatisticsNumSize,
     518              :     delta_layer_produced: CompactionStatisticsNumSize,
     519              :     image_layer_produced: CompactionStatisticsNumSize,
     520              :     num_delta_layer_discarded: usize,
     521              :     num_image_layer_discarded: usize,
     522              :     num_unique_keys_visited: usize,
     523              :     wal_keys_visited: CompactionStatisticsNumSize,
     524              :     image_keys_visited: CompactionStatisticsNumSize,
     525              :     wal_produced: CompactionStatisticsNumSize,
     526              :     image_produced: CompactionStatisticsNumSize,
     527              : }
     528              : 
     529              : impl CompactionStatistics {
     530         1042 :     fn estimated_size_of_value(val: &Value) -> usize {
     531          432 :         match val {
     532          610 :             Value::Image(img) => img.len(),
     533            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     534          432 :             _ => std::mem::size_of::<NeonWalRecord>(),
     535              :         }
     536         1042 :     }
     537         1636 :     fn estimated_size_of_key() -> usize {
     538         1636 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     539         1636 :     }
     540           86 :     fn visit_delta_layer(&mut self, size: u64) {
     541           86 :         self.delta_layer_visited.num += 1;
     542           86 :         self.delta_layer_visited.size += size;
     543           86 :     }
     544           66 :     fn visit_image_layer(&mut self, size: u64) {
     545           66 :         self.image_layer_visited.num += 1;
     546           66 :         self.image_layer_visited.size += size;
     547           66 :     }
     548          622 :     fn on_unique_key_visited(&mut self) {
     549          622 :         self.num_unique_keys_visited += 1;
     550          622 :     }
     551          240 :     fn visit_wal_key(&mut self, val: &Value) {
     552          240 :         self.wal_keys_visited.num += 1;
     553          240 :         self.wal_keys_visited.size +=
     554          240 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     555          240 :     }
     556          610 :     fn visit_image_key(&mut self, val: &Value) {
     557          610 :         self.image_keys_visited.num += 1;
     558          610 :         self.image_keys_visited.size +=
     559          610 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     560          610 :     }
     561          202 :     fn produce_key(&mut self, val: &Value) {
     562          202 :         match val {
     563           10 :             Value::Image(img) => self.produce_image_key(img),
     564          192 :             Value::WalRecord(_) => self.produce_wal_key(val),
     565              :         }
     566          202 :     }
     567          192 :     fn produce_wal_key(&mut self, val: &Value) {
     568          192 :         self.wal_produced.num += 1;
     569          192 :         self.wal_produced.size +=
     570          192 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     571          192 :     }
     572          594 :     fn produce_image_key(&mut self, val: &Bytes) {
     573          594 :         self.image_produced.num += 1;
     574          594 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     575          594 :     }
     576           14 :     fn discard_delta_layer(&mut self) {
     577           14 :         self.num_delta_layer_discarded += 1;
     578           14 :     }
     579            8 :     fn discard_image_layer(&mut self) {
     580            8 :         self.num_image_layer_discarded += 1;
     581            8 :     }
     582           22 :     fn produce_delta_layer(&mut self, size: u64) {
     583           22 :         self.delta_layer_produced.num += 1;
     584           22 :         self.delta_layer_produced.size += size;
     585           22 :     }
     586           30 :     fn produce_image_layer(&mut self, size: u64) {
     587           30 :         self.image_layer_produced.num += 1;
     588           30 :         self.image_layer_produced.size += size;
     589           30 :     }
     590              : }
     591              : 
     592              : impl Timeline {
     593              :     /// TODO: cancellation
     594              :     ///
     595              :     /// Returns whether the compaction has pending tasks.
     596          364 :     pub(crate) async fn compact_legacy(
     597          364 :         self: &Arc<Self>,
     598          364 :         cancel: &CancellationToken,
     599          364 :         options: CompactOptions,
     600          364 :         ctx: &RequestContext,
     601          364 :     ) -> Result<bool, CompactionError> {
     602          364 :         if options
     603          364 :             .flags
     604          364 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     605              :         {
     606            0 :             self.compact_with_gc(cancel, options, ctx)
     607            0 :                 .await
     608            0 :                 .map_err(CompactionError::Other)?;
     609            0 :             return Ok(false);
     610          364 :         }
     611          364 : 
     612          364 :         if options.flags.contains(CompactFlags::DryRun) {
     613            0 :             return Err(CompactionError::Other(anyhow!(
     614            0 :                 "dry-run mode is not supported for legacy compaction for now"
     615            0 :             )));
     616          364 :         }
     617          364 : 
     618          364 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
     619              :             // maybe useful in the future? could implement this at some point
     620            0 :             return Err(CompactionError::Other(anyhow!(
     621            0 :                 "compaction range is not supported for legacy compaction for now"
     622            0 :             )));
     623          364 :         }
     624          364 : 
     625          364 :         // High level strategy for compaction / image creation:
     626          364 :         //
     627          364 :         // 1. First, calculate the desired "partitioning" of the
     628          364 :         // currently in-use key space. The goal is to partition the
     629          364 :         // key space into roughly fixed-size chunks, but also take into
     630          364 :         // account any existing image layers, and try to align the
     631          364 :         // chunk boundaries with the existing image layers to avoid
     632          364 :         // too much churn. Also try to align chunk boundaries with
     633          364 :         // relation boundaries.  In principle, we don't know about
     634          364 :         // relation boundaries here, we just deal with key-value
     635          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     636          364 :         // map relations into key-value pairs. But in practice we know
     637          364 :         // that 'field6' is the block number, and the fields 1-5
     638          364 :         // identify a relation. This is just an optimization,
     639          364 :         // though.
     640          364 :         //
     641          364 :         // 2. Once we know the partitioning, for each partition,
     642          364 :         // decide if it's time to create a new image layer. The
     643          364 :         // criteria is: there has been too much "churn" since the last
     644          364 :         // image layer? The "churn" is fuzzy concept, it's a
     645          364 :         // combination of too many delta files, or too much WAL in
     646          364 :         // total in the delta file. Or perhaps: if creating an image
     647          364 :         // file would allow to delete some older files.
     648          364 :         //
     649          364 :         // 3. After that, we compact all level0 delta files if there
     650          364 :         // are too many of them.  While compacting, we also garbage
     651          364 :         // collect any page versions that are no longer needed because
     652          364 :         // of the new image layers we created in step 2.
     653          364 :         //
     654          364 :         // TODO: This high level strategy hasn't been implemented yet.
     655          364 :         // Below are functions compact_level0() and create_image_layers()
     656          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     657          364 :         // above. Rewrite it.
     658          364 : 
     659          364 :         // Is the timeline being deleted?
     660          364 :         if self.is_stopping() {
     661            0 :             trace!("Dropping out of compaction on timeline shutdown");
     662            0 :             return Err(CompactionError::ShuttingDown);
     663          364 :         }
     664          364 : 
     665          364 :         let target_file_size = self.get_checkpoint_distance();
     666              : 
     667              :         // Define partitioning schema if needed
     668              : 
     669              :         // FIXME: the match should only cover repartitioning, not the next steps
     670          364 :         let (partition_count, has_pending_tasks) = match self
     671          364 :             .repartition(
     672          364 :                 self.get_last_record_lsn(),
     673          364 :                 self.get_compaction_target_size(),
     674          364 :                 options.flags,
     675          364 :                 ctx,
     676          364 :             )
     677          364 :             .await
     678              :         {
     679          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     680          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     681          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     682          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     683          364 :                     .build();
     684          364 : 
     685          364 :                 // 2. Compact
     686          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     687          364 :                 let fully_compacted = self
     688          364 :                     .compact_level0(
     689          364 :                         target_file_size,
     690          364 :                         options.flags.contains(CompactFlags::ForceL0Compaction),
     691          364 :                         ctx,
     692          364 :                     )
     693          364 :                     .await?;
     694          364 :                 timer.stop_and_record();
     695          364 : 
     696          364 :                 let mut partitioning = dense_partitioning;
     697          364 :                 partitioning
     698          364 :                     .parts
     699          364 :                     .extend(sparse_partitioning.into_dense().parts);
     700          364 : 
     701          364 :                 // 3. Create new image layers for partitions that have been modified
     702          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     703          364 :                 if fully_compacted {
     704          364 :                     let image_layers = self
     705          364 :                         .create_image_layers(
     706          364 :                             &partitioning,
     707          364 :                             lsn,
     708          364 :                             if options
     709          364 :                                 .flags
     710          364 :                                 .contains(CompactFlags::ForceImageLayerCreation)
     711              :                             {
     712           14 :                                 ImageLayerCreationMode::Force
     713              :                             } else {
     714          350 :                                 ImageLayerCreationMode::Try
     715              :                             },
     716          364 :                             &image_ctx,
     717          364 :                         )
     718          364 :                         .await?;
     719              : 
     720          364 :                     self.upload_new_image_layers(image_layers)?;
     721              :                 } else {
     722            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     723              :                 }
     724          364 :                 (partitioning.parts.len(), !fully_compacted)
     725              :             }
     726            0 :             Err(err) => {
     727            0 :                 // no partitioning? This is normal, if the timeline was just created
     728            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     729            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     730            0 :                 // error but continue.
     731            0 :                 //
     732            0 :                 // Suppress error when it's due to cancellation
     733            0 :                 if !self.cancel.is_cancelled() && !err.is_cancelled() {
     734            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     735            0 :                 }
     736            0 :                 (1, false)
     737              :             }
     738              :         };
     739              : 
     740          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     741              :             // Limit the number of layer rewrites to the number of partitions: this means its
     742              :             // runtime should be comparable to a full round of image layer creations, rather than
     743              :             // being potentially much longer.
     744            0 :             let rewrite_max = partition_count;
     745            0 : 
     746            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     747          364 :         }
     748              : 
     749          364 :         Ok(has_pending_tasks)
     750          364 :     }
     751              : 
     752              :     /// Check for layers that are elegible to be rewritten:
     753              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     754              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     755              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     756              :     ///   rather not maintain compatibility with indefinitely.
     757              :     ///
     758              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     759              :     /// how much work it will try to do in each compaction pass.
     760            0 :     async fn compact_shard_ancestors(
     761            0 :         self: &Arc<Self>,
     762            0 :         rewrite_max: usize,
     763            0 :         ctx: &RequestContext,
     764            0 :     ) -> Result<(), CompactionError> {
     765            0 :         let mut drop_layers = Vec::new();
     766            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     767            0 : 
     768            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     769            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     770            0 :         // pitr_interval, for example because a branchpoint references it.
     771            0 :         //
     772            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     773            0 :         // are rewriting layers.
     774            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     775            0 : 
     776            0 :         tracing::info!(
     777            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     778            0 :             *latest_gc_cutoff,
     779            0 :             self.gc_info.read().unwrap().cutoffs.time
     780              :         );
     781              : 
     782            0 :         let layers = self.layers.read().await;
     783            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     784            0 :             let layer = layers.get_from_desc(&layer_desc);
     785            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     786              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     787            0 :                 continue;
     788            0 :             }
     789            0 : 
     790            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     791            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     792            0 :             let layer_local_page_count = sharded_range.page_count();
     793            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     794            0 :             if layer_local_page_count == 0 {
     795              :                 // This ancestral layer only covers keys that belong to other shards.
     796              :                 // We include the full metadata in the log: if we had some critical bug that caused
     797              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     798            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     799            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     800              :                 );
     801              : 
     802            0 :                 if cfg!(debug_assertions) {
     803              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     804              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     805              :                     // should be !is_key_disposable()
     806            0 :                     let range = layer_desc.get_key_range();
     807            0 :                     let mut key = range.start;
     808            0 :                     while key < range.end {
     809            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     810            0 :                         key = key.next();
     811              :                     }
     812            0 :                 }
     813              : 
     814            0 :                 drop_layers.push(layer);
     815            0 :                 continue;
     816            0 :             } else if layer_local_page_count != u32::MAX
     817            0 :                 && layer_local_page_count == layer_raw_page_count
     818              :             {
     819            0 :                 debug!(%layer,
     820            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     821              :                     layer_local_page_count
     822              :                 );
     823            0 :                 continue;
     824            0 :             }
     825            0 : 
     826            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     827            0 :             if layer_local_page_count != u32::MAX
     828            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     829              :             {
     830            0 :                 debug!(%layer,
     831            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     832              :                     layer_local_page_count,
     833              :                     layer_raw_page_count
     834              :                 );
     835            0 :             }
     836              : 
     837              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     838              :             // without incurring the I/O cost of a rewrite.
     839            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     840            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     841            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     842            0 :                 continue;
     843            0 :             }
     844            0 : 
     845            0 :             if layer_desc.is_delta() {
     846              :                 // We do not yet implement rewrite of delta layers
     847            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     848            0 :                 continue;
     849            0 :             }
     850            0 : 
     851            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     852            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     853            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     854            0 :             if layer.metadata().generation == self.generation {
     855            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     856            0 :                 continue;
     857            0 :             }
     858            0 : 
     859            0 :             if layers_to_rewrite.len() >= rewrite_max {
     860            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     861            0 :                     layers_to_rewrite.len()
     862              :                 );
     863            0 :                 continue;
     864            0 :             }
     865            0 : 
     866            0 :             // Fall through: all our conditions for doing a rewrite passed.
     867            0 :             layers_to_rewrite.push(layer);
     868              :         }
     869              : 
     870              :         // Drop read lock on layer map before we start doing time-consuming I/O
     871            0 :         drop(layers);
     872            0 : 
     873            0 :         let mut replace_image_layers = Vec::new();
     874              : 
     875            0 :         for layer in layers_to_rewrite {
     876            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     877            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     878            0 :                 self.conf,
     879            0 :                 self.timeline_id,
     880            0 :                 self.tenant_shard_id,
     881            0 :                 &layer.layer_desc().key_range,
     882            0 :                 layer.layer_desc().image_layer_lsn(),
     883            0 :                 ctx,
     884            0 :             )
     885            0 :             .await
     886            0 :             .map_err(CompactionError::Other)?;
     887              : 
     888              :             // Safety of layer rewrites:
     889              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     890              :             //   cannot interfere with the new one.
     891              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     892              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     893              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     894              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     895              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     896              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     897              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     898              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     899              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     900              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     901            0 :             let resident = layer.download_and_keep_resident().await?;
     902              : 
     903            0 :             let keys_written = resident
     904            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     905            0 :                 .await?;
     906              : 
     907            0 :             if keys_written > 0 {
     908            0 :                 let (desc, path) = image_layer_writer
     909            0 :                     .finish(ctx)
     910            0 :                     .await
     911            0 :                     .map_err(CompactionError::Other)?;
     912            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     913            0 :                     .map_err(CompactionError::Other)?;
     914            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     915            0 :                     layer.metadata().file_size,
     916            0 :                     new_layer.metadata().file_size);
     917              : 
     918            0 :                 replace_image_layers.push((layer, new_layer));
     919            0 :             } else {
     920            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     921            0 :                 // the layer has no data for us with the ShardedRange check above, but
     922            0 :                 drop_layers.push(layer);
     923            0 :             }
     924              :         }
     925              : 
     926              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     927              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     928              :         // to remote index) and be removed. This is inefficient but safe.
     929            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     930            0 : 
     931            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     932            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     933            0 :             .await?;
     934              : 
     935            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     936            0 : 
     937            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     938            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     939            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     940            0 :         // load.
     941            0 :         match self.remote_client.wait_completion().await {
     942            0 :             Ok(()) => (),
     943            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     944              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     945            0 :                 return Err(CompactionError::ShuttingDown)
     946              :             }
     947              :         }
     948              : 
     949            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     950            0 : 
     951            0 :         Ok(())
     952            0 :     }
     953              : 
     954              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     955              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     956              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     957              :     ///
     958              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     959              :     /// that we know won't be needed for reads.
     960          226 :     pub(super) async fn update_layer_visibility(
     961          226 :         &self,
     962          226 :     ) -> Result<(), super::layer_manager::Shutdown> {
     963          226 :         let head_lsn = self.get_last_record_lsn();
     964              : 
     965              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     966              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     967              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     968              :         // they will be subject to L0->L1 compaction in the near future.
     969          226 :         let layer_manager = self.layers.read().await;
     970          226 :         let layer_map = layer_manager.layer_map()?;
     971              : 
     972          226 :         let readable_points = {
     973          226 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     974          226 : 
     975          226 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     976          226 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
     977            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
     978            0 :                     continue;
     979            0 :                 }
     980            0 :                 readable_points.push(*child_lsn);
     981              :             }
     982          226 :             readable_points.push(head_lsn);
     983          226 :             readable_points
     984          226 :         };
     985          226 : 
     986          226 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     987          572 :         for (layer_desc, visibility) in layer_visibility {
     988          346 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     989          346 :             let layer = layer_manager.get_from_desc(&layer_desc);
     990          346 :             layer.set_visibility(visibility);
     991          346 :         }
     992              : 
     993              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     994              :         // avoid assuming that everything at a branch point is visible.
     995          226 :         drop(covered);
     996          226 :         Ok(())
     997          226 :     }
     998              : 
     999              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    1000              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
    1001          364 :     async fn compact_level0(
    1002          364 :         self: &Arc<Self>,
    1003          364 :         target_file_size: u64,
    1004          364 :         force_compaction_ignore_threshold: bool,
    1005          364 :         ctx: &RequestContext,
    1006          364 :     ) -> Result<bool, CompactionError> {
    1007              :         let CompactLevel0Phase1Result {
    1008          364 :             new_layers,
    1009          364 :             deltas_to_compact,
    1010          364 :             fully_compacted,
    1011              :         } = {
    1012          364 :             let phase1_span = info_span!("compact_level0_phase1");
    1013          364 :             let ctx = ctx.attached_child();
    1014          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    1015          364 :                 version: Some(2),
    1016          364 :                 tenant_id: Some(self.tenant_shard_id),
    1017          364 :                 timeline_id: Some(self.timeline_id),
    1018          364 :                 ..Default::default()
    1019          364 :             };
    1020          364 : 
    1021          364 :             let begin = tokio::time::Instant::now();
    1022          364 :             let phase1_layers_locked = self.layers.read().await;
    1023          364 :             let now = tokio::time::Instant::now();
    1024          364 :             stats.read_lock_acquisition_micros =
    1025          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    1026          364 :             self.compact_level0_phase1(
    1027          364 :                 phase1_layers_locked,
    1028          364 :                 stats,
    1029          364 :                 target_file_size,
    1030          364 :                 force_compaction_ignore_threshold,
    1031          364 :                 &ctx,
    1032          364 :             )
    1033          364 :             .instrument(phase1_span)
    1034          364 :             .await?
    1035              :         };
    1036              : 
    1037          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    1038              :             // nothing to do
    1039          336 :             return Ok(true);
    1040           28 :         }
    1041           28 : 
    1042           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    1043           28 :             .await?;
    1044           28 :         Ok(fully_compacted)
    1045          364 :     }
    1046              : 
    1047              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    1048          364 :     async fn compact_level0_phase1<'a>(
    1049          364 :         self: &'a Arc<Self>,
    1050          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
    1051          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
    1052          364 :         target_file_size: u64,
    1053          364 :         force_compaction_ignore_threshold: bool,
    1054          364 :         ctx: &RequestContext,
    1055          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    1056          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
    1057          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    1058          364 :         let layers = guard.layer_map()?;
    1059          364 :         let level0_deltas = layers.level0_deltas();
    1060          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
    1061          364 : 
    1062          364 :         // Only compact if enough layers have accumulated.
    1063          364 :         let threshold = self.get_compaction_threshold();
    1064          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    1065          336 :             if force_compaction_ignore_threshold {
    1066            0 :                 if !level0_deltas.is_empty() {
    1067            0 :                     info!(
    1068            0 :                         level0_deltas = level0_deltas.len(),
    1069            0 :                         threshold, "too few deltas to compact, but forcing compaction"
    1070              :                     );
    1071              :                 } else {
    1072            0 :                     info!(
    1073            0 :                         level0_deltas = level0_deltas.len(),
    1074            0 :                         threshold, "too few deltas to compact, cannot force compaction"
    1075              :                     );
    1076            0 :                     return Ok(CompactLevel0Phase1Result::default());
    1077              :                 }
    1078              :             } else {
    1079          336 :                 debug!(
    1080            0 :                     level0_deltas = level0_deltas.len(),
    1081            0 :                     threshold, "too few deltas to compact"
    1082              :                 );
    1083          336 :                 return Ok(CompactLevel0Phase1Result::default());
    1084              :             }
    1085           28 :         }
    1086              : 
    1087           28 :         let mut level0_deltas = level0_deltas
    1088           28 :             .iter()
    1089          402 :             .map(|x| guard.get_from_desc(x))
    1090           28 :             .collect::<Vec<_>>();
    1091           28 : 
    1092           28 :         // Gather the files to compact in this iteration.
    1093           28 :         //
    1094           28 :         // Start with the oldest Level 0 delta file, and collect any other
    1095           28 :         // level 0 files that form a contiguous sequence, such that the end
    1096           28 :         // LSN of previous file matches the start LSN of the next file.
    1097           28 :         //
    1098           28 :         // Note that if the files don't form such a sequence, we might
    1099           28 :         // "compact" just a single file. That's a bit pointless, but it allows
    1100           28 :         // us to get rid of the level 0 file, and compact the other files on
    1101           28 :         // the next iteration. This could probably made smarter, but such
    1102           28 :         // "gaps" in the sequence of level 0 files should only happen in case
    1103           28 :         // of a crash, partial download from cloud storage, or something like
    1104           28 :         // that, so it's not a big deal in practice.
    1105          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    1106           28 :         let mut level0_deltas_iter = level0_deltas.iter();
    1107           28 : 
    1108           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    1109           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    1110           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    1111           28 : 
    1112           28 :         // Accumulate the size of layers in `deltas_to_compact`
    1113           28 :         let mut deltas_to_compact_bytes = 0;
    1114           28 : 
    1115           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
    1116           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
    1117           28 :         // work in this function to only operate on this much delta data at once.
    1118           28 :         //
    1119           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
    1120           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
    1121           28 :         // still let them compact a full stack of L0s in one go.
    1122           28 :         let delta_size_limit = std::cmp::max(
    1123           28 :             self.get_compaction_threshold(),
    1124           28 :             DEFAULT_COMPACTION_THRESHOLD,
    1125           28 :         ) as u64
    1126           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
    1127           28 : 
    1128           28 :         let mut fully_compacted = true;
    1129           28 : 
    1130           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    1131          402 :         for l in level0_deltas_iter {
    1132          374 :             let lsn_range = &l.layer_desc().lsn_range;
    1133          374 : 
    1134          374 :             if lsn_range.start != prev_lsn_end {
    1135            0 :                 break;
    1136          374 :             }
    1137          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    1138          374 :             deltas_to_compact_bytes += l.metadata().file_size;
    1139          374 :             prev_lsn_end = lsn_range.end;
    1140          374 : 
    1141          374 :             if deltas_to_compact_bytes >= delta_size_limit {
    1142            0 :                 info!(
    1143            0 :                     l0_deltas_selected = deltas_to_compact.len(),
    1144            0 :                     l0_deltas_total = level0_deltas.len(),
    1145            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
    1146              :                     delta_size_limit
    1147              :                 );
    1148            0 :                 fully_compacted = false;
    1149            0 : 
    1150            0 :                 // Proceed with compaction, but only a subset of L0s
    1151            0 :                 break;
    1152          374 :             }
    1153              :         }
    1154           28 :         let lsn_range = Range {
    1155           28 :             start: deltas_to_compact
    1156           28 :                 .first()
    1157           28 :                 .unwrap()
    1158           28 :                 .layer_desc()
    1159           28 :                 .lsn_range
    1160           28 :                 .start,
    1161           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    1162           28 :         };
    1163           28 : 
    1164           28 :         info!(
    1165            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    1166            0 :             lsn_range.start,
    1167            0 :             lsn_range.end,
    1168            0 :             deltas_to_compact.len(),
    1169            0 :             level0_deltas.len()
    1170              :         );
    1171              : 
    1172          402 :         for l in deltas_to_compact.iter() {
    1173          402 :             info!("compact includes {l}");
    1174              :         }
    1175              : 
    1176              :         // We don't need the original list of layers anymore. Drop it so that
    1177              :         // we don't accidentally use it later in the function.
    1178           28 :         drop(level0_deltas);
    1179           28 : 
    1180           28 :         stats.read_lock_held_prerequisites_micros = stats
    1181           28 :             .read_lock_held_spawn_blocking_startup_micros
    1182           28 :             .till_now();
    1183              : 
    1184              :         // TODO: replace with streaming k-merge
    1185           28 :         let all_keys = {
    1186           28 :             let mut all_keys = Vec::new();
    1187          402 :             for l in deltas_to_compact.iter() {
    1188          402 :                 if self.cancel.is_cancelled() {
    1189            0 :                     return Err(CompactionError::ShuttingDown);
    1190          402 :                 }
    1191          402 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1192          402 :                 let keys = delta
    1193          402 :                     .index_entries(ctx)
    1194          402 :                     .await
    1195          402 :                     .map_err(CompactionError::Other)?;
    1196          402 :                 all_keys.extend(keys);
    1197              :             }
    1198              :             // The current stdlib sorting implementation is designed in a way where it is
    1199              :             // particularly fast where the slice is made up of sorted sub-ranges.
    1200      4423790 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1201           28 :             all_keys
    1202           28 :         };
    1203           28 : 
    1204           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    1205              : 
    1206              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
    1207              :         //
    1208              :         // A hole is a key range for which this compaction doesn't have any WAL records.
    1209              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
    1210              :         // cover the hole, but actually don't contain any WAL records for that key range.
    1211              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
    1212              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
    1213              :         //
    1214              :         // The algorithm chooses holes as follows.
    1215              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
    1216              :         // - Filter: min threshold on range length
    1217              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
    1218              :         //
    1219              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
    1220              :         #[derive(PartialEq, Eq)]
    1221              :         struct Hole {
    1222              :             key_range: Range<Key>,
    1223              :             coverage_size: usize,
    1224              :         }
    1225           28 :         let holes: Vec<Hole> = {
    1226              :             use std::cmp::Ordering;
    1227              :             impl Ord for Hole {
    1228            0 :                 fn cmp(&self, other: &Self) -> Ordering {
    1229            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
    1230            0 :                 }
    1231              :             }
    1232              :             impl PartialOrd for Hole {
    1233            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    1234            0 :                     Some(self.cmp(other))
    1235            0 :                 }
    1236              :             }
    1237           28 :             let max_holes = deltas_to_compact.len();
    1238           28 :             let last_record_lsn = self.get_last_record_lsn();
    1239           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    1240           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
    1241           28 :                                             // min-heap (reserve space for one more element added before eviction)
    1242           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    1243           28 :             let mut prev: Option<Key> = None;
    1244              : 
    1245      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    1246      2064038 :                 if let Some(prev_key) = prev {
    1247              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
    1248              :                     // compaction is the gap between data key and metadata keys.
    1249      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
    1250            0 :                         && !Key::is_metadata_key(&prev_key)
    1251              :                     {
    1252            0 :                         let key_range = prev_key..next_key;
    1253            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
    1254            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
    1255            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    1256            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
    1257            0 :                         let coverage_size =
    1258            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
    1259            0 :                         if coverage_size >= min_hole_coverage_size {
    1260            0 :                             heap.push(Hole {
    1261            0 :                                 key_range,
    1262            0 :                                 coverage_size,
    1263            0 :                             });
    1264            0 :                             if heap.len() > max_holes {
    1265            0 :                                 heap.pop(); // remove smallest hole
    1266            0 :                             }
    1267            0 :                         }
    1268      2064010 :                     }
    1269           28 :                 }
    1270      2064038 :                 prev = Some(next_key.next());
    1271              :             }
    1272           28 :             let mut holes = heap.into_vec();
    1273           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1274           28 :             holes
    1275           28 :         };
    1276           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1277           28 :         drop_rlock(guard);
    1278           28 : 
    1279           28 :         if self.cancel.is_cancelled() {
    1280            0 :             return Err(CompactionError::ShuttingDown);
    1281           28 :         }
    1282           28 : 
    1283           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1284              : 
    1285              :         // This iterator walks through all key-value pairs from all the layers
    1286              :         // we're compacting, in key, LSN order.
    1287              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1288              :         // then the Value::Image is ordered before Value::WalRecord.
    1289           28 :         let mut all_values_iter = {
    1290           28 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1291          402 :             for l in deltas_to_compact.iter() {
    1292          402 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1293          402 :                 deltas.push(l);
    1294              :             }
    1295           28 :             MergeIterator::create(&deltas, &[], ctx)
    1296           28 :         };
    1297           28 : 
    1298           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1299           28 :         let mut all_keys_iter = all_keys
    1300           28 :             .iter()
    1301      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1302      2064010 :             .coalesce(|mut prev, cur| {
    1303      2064010 :                 // Coalesce keys that belong to the same key pair.
    1304      2064010 :                 // This ensures that compaction doesn't put them
    1305      2064010 :                 // into different layer files.
    1306      2064010 :                 // Still limit this by the target file size,
    1307      2064010 :                 // so that we keep the size of the files in
    1308      2064010 :                 // check.
    1309      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1310        40038 :                     prev.2 += cur.2;
    1311        40038 :                     Ok(prev)
    1312              :                 } else {
    1313      2023972 :                     Err((prev, cur))
    1314              :                 }
    1315      2064010 :             });
    1316           28 : 
    1317           28 :         // Merge the contents of all the input delta layers into a new set
    1318           28 :         // of delta layers, based on the current partitioning.
    1319           28 :         //
    1320           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1321           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1322           28 :         // would be too large. In that case, we also split on the LSN dimension.
    1323           28 :         //
    1324           28 :         // LSN
    1325           28 :         //  ^
    1326           28 :         //  |
    1327           28 :         //  | +-----------+            +--+--+--+--+
    1328           28 :         //  | |           |            |  |  |  |  |
    1329           28 :         //  | +-----------+            |  |  |  |  |
    1330           28 :         //  | |           |            |  |  |  |  |
    1331           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1332           28 :         //  | |           |            |  |  |  |  |
    1333           28 :         //  | +-----------+            |  |  |  |  |
    1334           28 :         //  | |           |            |  |  |  |  |
    1335           28 :         //  | +-----------+            +--+--+--+--+
    1336           28 :         //  |
    1337           28 :         //  +--------------> key
    1338           28 :         //
    1339           28 :         //
    1340           28 :         // If one key (X) has a lot of page versions:
    1341           28 :         //
    1342           28 :         // LSN
    1343           28 :         //  ^
    1344           28 :         //  |                                 (X)
    1345           28 :         //  | +-----------+            +--+--+--+--+
    1346           28 :         //  | |           |            |  |  |  |  |
    1347           28 :         //  | +-----------+            |  |  +--+  |
    1348           28 :         //  | |           |            |  |  |  |  |
    1349           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1350           28 :         //  | |           |            |  |  +--+  |
    1351           28 :         //  | +-----------+            |  |  |  |  |
    1352           28 :         //  | |           |            |  |  |  |  |
    1353           28 :         //  | +-----------+            +--+--+--+--+
    1354           28 :         //  |
    1355           28 :         //  +--------------> key
    1356           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1357           28 :         // based on the partitioning.
    1358           28 :         //
    1359           28 :         // TODO: we should also opportunistically materialize and
    1360           28 :         // garbage collect what we can.
    1361           28 :         let mut new_layers = Vec::new();
    1362           28 :         let mut prev_key: Option<Key> = None;
    1363           28 :         let mut writer: Option<DeltaLayerWriter> = None;
    1364           28 :         let mut key_values_total_size = 0u64;
    1365           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1366           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1367           28 :         let mut next_hole = 0; // index of next hole in holes vector
    1368           28 : 
    1369           28 :         let mut keys = 0;
    1370              : 
    1371      2064066 :         while let Some((key, lsn, value)) = all_values_iter
    1372      2064066 :             .next()
    1373      2064066 :             .await
    1374      2064066 :             .map_err(CompactionError::Other)?
    1375              :         {
    1376      2064038 :             keys += 1;
    1377      2064038 : 
    1378      2064038 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1379              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1380              :                 // shuffling an order of million keys per layer, this means we'll check it
    1381              :                 // around tens of times per layer.
    1382            0 :                 return Err(CompactionError::ShuttingDown);
    1383      2064038 :             }
    1384      2064038 : 
    1385      2064038 :             let same_key = prev_key == Some(key);
    1386      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1387      2064038 :             if !same_key || lsn == dup_end_lsn {
    1388      2024000 :                 let mut next_key_size = 0u64;
    1389      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1390      2024000 :                 dup_start_lsn = Lsn::INVALID;
    1391      2024000 :                 if !same_key {
    1392      2024000 :                     dup_end_lsn = Lsn::INVALID;
    1393      2024000 :                 }
    1394              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1395      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1396      2024000 :                     next_key_size = next_size;
    1397      2024000 :                     if key != next_key {
    1398      2023972 :                         if dup_end_lsn.is_valid() {
    1399            0 :                             // We are writting segment with duplicates:
    1400            0 :                             // place all remaining values of this key in separate segment
    1401            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1402            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1403      2023972 :                         }
    1404      2023972 :                         break;
    1405           28 :                     }
    1406           28 :                     key_values_total_size += next_size;
    1407           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1408           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1409           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1410              :                         // Split key between multiple layers: such layer can contain only single key
    1411            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1412            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1413              :                         } else {
    1414            0 :                             lsn // start with the first LSN for this key
    1415              :                         };
    1416            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1417            0 :                         break;
    1418           28 :                     }
    1419              :                 }
    1420              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1421      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1422            0 :                     dup_start_lsn = dup_end_lsn;
    1423            0 :                     dup_end_lsn = lsn_range.end;
    1424      2024000 :                 }
    1425      2024000 :                 if writer.is_some() {
    1426      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1427      2023972 :                     let contains_hole =
    1428      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1429              :                     // check if key cause layer overflow or contains hole...
    1430      2023972 :                     if is_dup_layer
    1431      2023972 :                         || dup_end_lsn.is_valid()
    1432      2023972 :                         || written_size + key_values_total_size > target_file_size
    1433      2023692 :                         || contains_hole
    1434              :                     {
    1435              :                         // ... if so, flush previous layer and prepare to write new one
    1436          280 :                         let (desc, path) = writer
    1437          280 :                             .take()
    1438          280 :                             .unwrap()
    1439          280 :                             .finish(prev_key.unwrap().next(), ctx)
    1440          280 :                             .await
    1441          280 :                             .map_err(CompactionError::Other)?;
    1442          280 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1443          280 :                             .map_err(CompactionError::Other)?;
    1444              : 
    1445          280 :                         new_layers.push(new_delta);
    1446          280 :                         writer = None;
    1447          280 : 
    1448          280 :                         if contains_hole {
    1449            0 :                             // skip hole
    1450            0 :                             next_hole += 1;
    1451          280 :                         }
    1452      2023692 :                     }
    1453           28 :                 }
    1454              :                 // Remember size of key value because at next iteration we will access next item
    1455      2024000 :                 key_values_total_size = next_key_size;
    1456        40038 :             }
    1457      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1458            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1459            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1460            0 :                 )))
    1461      2064038 :             });
    1462              : 
    1463      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1464      2064038 :                 if writer.is_none() {
    1465          308 :                     if self.cancel.is_cancelled() {
    1466              :                         // to be somewhat responsive to cancellation, check for each new layer
    1467            0 :                         return Err(CompactionError::ShuttingDown);
    1468          308 :                     }
    1469              :                     // Create writer if not initiaized yet
    1470          308 :                     writer = Some(
    1471              :                         DeltaLayerWriter::new(
    1472          308 :                             self.conf,
    1473          308 :                             self.timeline_id,
    1474          308 :                             self.tenant_shard_id,
    1475          308 :                             key,
    1476          308 :                             if dup_end_lsn.is_valid() {
    1477              :                                 // this is a layer containing slice of values of the same key
    1478            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1479            0 :                                 dup_start_lsn..dup_end_lsn
    1480              :                             } else {
    1481          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1482          308 :                                 lsn_range.clone()
    1483              :                             },
    1484          308 :                             ctx,
    1485          308 :                         )
    1486          308 :                         .await
    1487          308 :                         .map_err(CompactionError::Other)?,
    1488              :                     );
    1489              : 
    1490          308 :                     keys = 0;
    1491      2063730 :                 }
    1492              : 
    1493      2064038 :                 writer
    1494      2064038 :                     .as_mut()
    1495      2064038 :                     .unwrap()
    1496      2064038 :                     .put_value(key, lsn, value, ctx)
    1497      2064038 :                     .await
    1498      2064038 :                     .map_err(CompactionError::Other)?;
    1499              :             } else {
    1500            0 :                 let shard = self.shard_identity.shard_index();
    1501            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1502            0 :                 if cfg!(debug_assertions) {
    1503            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    1504            0 :                 }
    1505            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1506              :             }
    1507              : 
    1508      2064038 :             if !new_layers.is_empty() {
    1509        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1510      2044252 :             }
    1511              : 
    1512      2064038 :             prev_key = Some(key);
    1513              :         }
    1514           28 :         if let Some(writer) = writer {
    1515           28 :             let (desc, path) = writer
    1516           28 :                 .finish(prev_key.unwrap().next(), ctx)
    1517           28 :                 .await
    1518           28 :                 .map_err(CompactionError::Other)?;
    1519           28 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1520           28 :                 .map_err(CompactionError::Other)?;
    1521           28 :             new_layers.push(new_delta);
    1522            0 :         }
    1523              : 
    1524              :         // Sync layers
    1525           28 :         if !new_layers.is_empty() {
    1526              :             // Print a warning if the created layer is larger than double the target size
    1527              :             // Add two pages for potential overhead. This should in theory be already
    1528              :             // accounted for in the target calculation, but for very small targets,
    1529              :             // we still might easily hit the limit otherwise.
    1530           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1531          308 :             for layer in new_layers.iter() {
    1532          308 :                 if layer.layer_desc().file_size > warn_limit {
    1533            0 :                     warn!(
    1534              :                         %layer,
    1535            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1536              :                     );
    1537          308 :                 }
    1538              :             }
    1539              : 
    1540              :             // The writer.finish() above already did the fsync of the inodes.
    1541              :             // We just need to fsync the directory in which these inodes are linked,
    1542              :             // which we know to be the timeline directory.
    1543              :             //
    1544              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1545              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1546              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1547           28 :             let timeline_dir = VirtualFile::open(
    1548           28 :                 &self
    1549           28 :                     .conf
    1550           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1551           28 :                 ctx,
    1552           28 :             )
    1553           28 :             .await
    1554           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1555           28 :             timeline_dir
    1556           28 :                 .sync_all()
    1557           28 :                 .await
    1558           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1559            0 :         }
    1560              : 
    1561           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1562           28 :         stats.new_deltas_count = Some(new_layers.len());
    1563          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1564           28 : 
    1565           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1566           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1567              :         {
    1568           28 :             Ok(stats_json) => {
    1569           28 :                 info!(
    1570            0 :                     stats_json = stats_json.as_str(),
    1571            0 :                     "compact_level0_phase1 stats available"
    1572              :                 )
    1573              :             }
    1574            0 :             Err(e) => {
    1575            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1576              :             }
    1577              :         }
    1578              : 
    1579              :         // Without this, rustc complains about deltas_to_compact still
    1580              :         // being borrowed when we `.into_iter()` below.
    1581           28 :         drop(all_values_iter);
    1582           28 : 
    1583           28 :         Ok(CompactLevel0Phase1Result {
    1584           28 :             new_layers,
    1585           28 :             deltas_to_compact: deltas_to_compact
    1586           28 :                 .into_iter()
    1587          402 :                 .map(|x| x.drop_eviction_guard())
    1588           28 :                 .collect::<Vec<_>>(),
    1589           28 :             fully_compacted,
    1590           28 :         })
    1591          364 :     }
    1592              : }
    1593              : 
    1594              : #[derive(Default)]
    1595              : struct CompactLevel0Phase1Result {
    1596              :     new_layers: Vec<ResidentLayer>,
    1597              :     deltas_to_compact: Vec<Layer>,
    1598              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1599              :     // L0 compaction size limit.
    1600              :     fully_compacted: bool,
    1601              : }
    1602              : 
    1603              : #[derive(Default)]
    1604              : struct CompactLevel0Phase1StatsBuilder {
    1605              :     version: Option<u64>,
    1606              :     tenant_id: Option<TenantShardId>,
    1607              :     timeline_id: Option<TimelineId>,
    1608              :     read_lock_acquisition_micros: DurationRecorder,
    1609              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1610              :     read_lock_held_key_sort_micros: DurationRecorder,
    1611              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1612              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1613              :     read_lock_drop_micros: DurationRecorder,
    1614              :     write_layer_files_micros: DurationRecorder,
    1615              :     level0_deltas_count: Option<usize>,
    1616              :     new_deltas_count: Option<usize>,
    1617              :     new_deltas_size: Option<u64>,
    1618              : }
    1619              : 
    1620              : #[derive(serde::Serialize)]
    1621              : struct CompactLevel0Phase1Stats {
    1622              :     version: u64,
    1623              :     tenant_id: TenantShardId,
    1624              :     timeline_id: TimelineId,
    1625              :     read_lock_acquisition_micros: RecordedDuration,
    1626              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1627              :     read_lock_held_key_sort_micros: RecordedDuration,
    1628              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1629              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1630              :     read_lock_drop_micros: RecordedDuration,
    1631              :     write_layer_files_micros: RecordedDuration,
    1632              :     level0_deltas_count: usize,
    1633              :     new_deltas_count: usize,
    1634              :     new_deltas_size: u64,
    1635              : }
    1636              : 
    1637              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1638              :     type Error = anyhow::Error;
    1639              : 
    1640           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1641           28 :         Ok(Self {
    1642           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1643           28 :             tenant_id: value
    1644           28 :                 .tenant_id
    1645           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1646           28 :             timeline_id: value
    1647           28 :                 .timeline_id
    1648           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1649           28 :             read_lock_acquisition_micros: value
    1650           28 :                 .read_lock_acquisition_micros
    1651           28 :                 .into_recorded()
    1652           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1653           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1654           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1655           28 :                 .into_recorded()
    1656           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1657           28 :             read_lock_held_key_sort_micros: value
    1658           28 :                 .read_lock_held_key_sort_micros
    1659           28 :                 .into_recorded()
    1660           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1661           28 :             read_lock_held_prerequisites_micros: value
    1662           28 :                 .read_lock_held_prerequisites_micros
    1663           28 :                 .into_recorded()
    1664           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1665           28 :             read_lock_held_compute_holes_micros: value
    1666           28 :                 .read_lock_held_compute_holes_micros
    1667           28 :                 .into_recorded()
    1668           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1669           28 :             read_lock_drop_micros: value
    1670           28 :                 .read_lock_drop_micros
    1671           28 :                 .into_recorded()
    1672           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1673           28 :             write_layer_files_micros: value
    1674           28 :                 .write_layer_files_micros
    1675           28 :                 .into_recorded()
    1676           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1677           28 :             level0_deltas_count: value
    1678           28 :                 .level0_deltas_count
    1679           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1680           28 :             new_deltas_count: value
    1681           28 :                 .new_deltas_count
    1682           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1683           28 :             new_deltas_size: value
    1684           28 :                 .new_deltas_size
    1685           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1686              :         })
    1687           28 :     }
    1688              : }
    1689              : 
    1690              : impl Timeline {
    1691              :     /// Entry point for new tiered compaction algorithm.
    1692              :     ///
    1693              :     /// All the real work is in the implementation in the pageserver_compaction
    1694              :     /// crate. The code here would apply to any algorithm implemented by the
    1695              :     /// same interface, but tiered is the only one at the moment.
    1696              :     ///
    1697              :     /// TODO: cancellation
    1698            0 :     pub(crate) async fn compact_tiered(
    1699            0 :         self: &Arc<Self>,
    1700            0 :         _cancel: &CancellationToken,
    1701            0 :         ctx: &RequestContext,
    1702            0 :     ) -> Result<(), CompactionError> {
    1703            0 :         let fanout = self.get_compaction_threshold() as u64;
    1704            0 :         let target_file_size = self.get_checkpoint_distance();
    1705              : 
    1706              :         // Find the top of the historical layers
    1707            0 :         let end_lsn = {
    1708            0 :             let guard = self.layers.read().await;
    1709            0 :             let layers = guard.layer_map()?;
    1710              : 
    1711            0 :             let l0_deltas = layers.level0_deltas();
    1712            0 : 
    1713            0 :             // As an optimization, if we find that there are too few L0 layers,
    1714            0 :             // bail out early. We know that the compaction algorithm would do
    1715            0 :             // nothing in that case.
    1716            0 :             if l0_deltas.len() < fanout as usize {
    1717              :                 // doesn't need compacting
    1718            0 :                 return Ok(());
    1719            0 :             }
    1720            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1721            0 :         };
    1722            0 : 
    1723            0 :         // Is the timeline being deleted?
    1724            0 :         if self.is_stopping() {
    1725            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1726            0 :             return Err(CompactionError::ShuttingDown);
    1727            0 :         }
    1728              : 
    1729            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1730              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1731            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1732            0 : 
    1733            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1734            0 :             &mut adaptor,
    1735            0 :             end_lsn,
    1736            0 :             target_file_size,
    1737            0 :             fanout,
    1738            0 :             ctx,
    1739            0 :         )
    1740            0 :         .await
    1741              :         // TODO: compact_tiered needs to return CompactionError
    1742            0 :         .map_err(CompactionError::Other)?;
    1743              : 
    1744            0 :         adaptor.flush_updates().await?;
    1745            0 :         Ok(())
    1746            0 :     }
    1747              : 
    1748              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1749              :     ///
    1750              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1751              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1752              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1753              :     ///
    1754              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1755              :     ///
    1756              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1757              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1758              :     ///
    1759              :     /// The function will produce:
    1760              :     ///
    1761              :     /// ```plain
    1762              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1763              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1764              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1765              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1766              :     /// ```
    1767              :     ///
    1768              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1769          630 :     pub(crate) async fn generate_key_retention(
    1770          630 :         self: &Arc<Timeline>,
    1771          630 :         key: Key,
    1772          630 :         full_history: &[(Key, Lsn, Value)],
    1773          630 :         horizon: Lsn,
    1774          630 :         retain_lsn_below_horizon: &[Lsn],
    1775          630 :         delta_threshold_cnt: usize,
    1776          630 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1777          630 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1778              :         // Pre-checks for the invariants
    1779              : 
    1780          630 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    1781              : 
    1782          630 :         if debug_mode {
    1783         1530 :             for (log_key, _, _) in full_history {
    1784          900 :                 assert_eq!(log_key, &key, "mismatched key");
    1785              :             }
    1786          630 :             for i in 1..full_history.len() {
    1787          270 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1788          270 :                 if full_history[i - 1].1 == full_history[i].1 {
    1789            0 :                     assert!(
    1790            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1791            0 :                         "unordered delta/image, or duplicated delta"
    1792              :                     );
    1793          270 :                 }
    1794              :             }
    1795              :             // There was an assertion for no base image that checks if the first
    1796              :             // record in the history is `will_init` before, but it was removed.
    1797              :             // This is explained in the test cases for generate_key_retention.
    1798              :             // Search "incomplete history" for more information.
    1799         1410 :             for lsn in retain_lsn_below_horizon {
    1800          780 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1801              :             }
    1802          630 :             for i in 1..retain_lsn_below_horizon.len() {
    1803          356 :                 assert!(
    1804          356 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1805            0 :                     "unordered LSN"
    1806              :                 );
    1807              :             }
    1808            0 :         }
    1809          630 :         let has_ancestor = base_img_from_ancestor.is_some();
    1810              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1811              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1812          630 :         let (mut split_history, lsn_split_points) = {
    1813          630 :             let mut split_history = Vec::new();
    1814          630 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1815          630 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1816         1410 :             for lsn in retain_lsn_below_horizon {
    1817          780 :                 lsn_split_points.push(*lsn);
    1818          780 :             }
    1819          630 :             lsn_split_points.push(horizon);
    1820          630 :             let mut current_idx = 0;
    1821         1530 :             for item @ (_, lsn, _) in full_history {
    1822         1144 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1823          244 :                     current_idx += 1;
    1824          244 :                 }
    1825          900 :                 split_history[current_idx].push(item);
    1826              :             }
    1827          630 :             (split_history, lsn_split_points)
    1828              :         };
    1829              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1830         2670 :         for split_for_lsn in &mut split_history {
    1831         2040 :             let mut prev_lsn = None;
    1832         2040 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1833         2040 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1834          900 :                 if let Some(prev_lsn) = &prev_lsn {
    1835          118 :                     if *prev_lsn == lsn {
    1836              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1837              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1838              :                         // drop this delta and keep the image.
    1839              :                         //
    1840              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1841              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1842              :                         // dropped.
    1843              :                         //
    1844              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1845              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1846            0 :                         continue;
    1847          118 :                     }
    1848          782 :                 }
    1849          900 :                 prev_lsn = Some(lsn);
    1850          900 :                 new_split_for_lsn.push(record);
    1851              :             }
    1852         2040 :             *split_for_lsn = new_split_for_lsn;
    1853              :         }
    1854              :         // Step 3: generate images when necessary
    1855          630 :         let mut retention = Vec::with_capacity(split_history.len());
    1856          630 :         let mut records_since_last_image = 0;
    1857          630 :         let batch_cnt = split_history.len();
    1858          630 :         assert!(
    1859          630 :             batch_cnt >= 2,
    1860            0 :             "should have at least below + above horizon batches"
    1861              :         );
    1862          630 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1863          630 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1864           42 :             replay_history.push((key, lsn, Value::Image(img)));
    1865          588 :         }
    1866              : 
    1867              :         /// Generate debug information for the replay history
    1868            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1869              :             use std::fmt::Write;
    1870            0 :             let mut output = String::new();
    1871            0 :             if let Some((key, _, _)) = replay_history.first() {
    1872            0 :                 write!(output, "key={} ", key).unwrap();
    1873            0 :                 let mut cnt = 0;
    1874            0 :                 for (_, lsn, val) in replay_history {
    1875            0 :                     if val.is_image() {
    1876            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1877            0 :                     } else if val.will_init() {
    1878            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1879            0 :                     } else {
    1880            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1881            0 :                     }
    1882            0 :                     cnt += 1;
    1883            0 :                     if cnt >= 128 {
    1884            0 :                         write!(output, "... and more").unwrap();
    1885            0 :                         break;
    1886            0 :                     }
    1887              :                 }
    1888            0 :             } else {
    1889            0 :                 write!(output, "<no history>").unwrap();
    1890            0 :             }
    1891            0 :             output
    1892            0 :         }
    1893              : 
    1894            0 :         fn generate_debug_trace(
    1895            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1896            0 :             full_history: &[(Key, Lsn, Value)],
    1897            0 :             lsns: &[Lsn],
    1898            0 :             horizon: Lsn,
    1899            0 :         ) -> String {
    1900              :             use std::fmt::Write;
    1901            0 :             let mut output = String::new();
    1902            0 :             if let Some(replay_history) = replay_history {
    1903            0 :                 writeln!(
    1904            0 :                     output,
    1905            0 :                     "replay_history: {}",
    1906            0 :                     generate_history_trace(replay_history)
    1907            0 :                 )
    1908            0 :                 .unwrap();
    1909            0 :             } else {
    1910            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1911            0 :             }
    1912            0 :             writeln!(
    1913            0 :                 output,
    1914            0 :                 "full_history: {}",
    1915            0 :                 generate_history_trace(full_history)
    1916            0 :             )
    1917            0 :             .unwrap();
    1918            0 :             writeln!(
    1919            0 :                 output,
    1920            0 :                 "when processing: [{}] horizon={}",
    1921            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1922            0 :                 horizon
    1923            0 :             )
    1924            0 :             .unwrap();
    1925            0 :             output
    1926            0 :         }
    1927              : 
    1928          630 :         let mut key_exists = false;
    1929         2040 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1930              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1931         2040 :             records_since_last_image += split_for_lsn.len();
    1932              :             // Whether to produce an image into the final layer files
    1933         2040 :             let produce_image = if i == 0 && !has_ancestor {
    1934              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1935          588 :                 true
    1936         1452 :             } else if i == batch_cnt - 1 {
    1937              :                 // Do not generate images for the last batch (above horizon)
    1938          630 :                 false
    1939          822 :             } else if records_since_last_image == 0 {
    1940          644 :                 false
    1941          178 :             } else if records_since_last_image >= delta_threshold_cnt {
    1942              :                 // Generate images when there are too many records
    1943            6 :                 true
    1944              :             } else {
    1945          172 :                 false
    1946              :             };
    1947         2040 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1948              :             // Only retain the items after the last image record
    1949         2514 :             for idx in (0..replay_history.len()).rev() {
    1950         2514 :                 if replay_history[idx].2.will_init() {
    1951         2040 :                     replay_history = replay_history[idx..].to_vec();
    1952         2040 :                     break;
    1953          474 :                 }
    1954              :             }
    1955         2040 :             if replay_history.is_empty() && !key_exists {
    1956              :                 // The key does not exist at earlier LSN, we can skip this iteration.
    1957            0 :                 retention.push(Vec::new());
    1958            0 :                 continue;
    1959         2040 :             } else {
    1960         2040 :                 key_exists = true;
    1961         2040 :             }
    1962         2040 :             let Some((_, _, val)) = replay_history.first() else {
    1963            0 :                 unreachable!("replay history should not be empty once it exists")
    1964              :             };
    1965         2040 :             if !val.will_init() {
    1966            0 :                 return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
    1967            0 :                     generate_debug_trace(
    1968            0 :                         Some(&replay_history),
    1969            0 :                         full_history,
    1970            0 :                         retain_lsn_below_horizon,
    1971            0 :                         horizon,
    1972            0 :                     )
    1973            0 :                 });
    1974         2040 :             }
    1975              :             // Whether to reconstruct the image. In debug mode, we will generate an image
    1976              :             // at every retain_lsn to ensure data is not corrupted, but we won't put the
    1977              :             // image into the final layer.
    1978         2040 :             let generate_image = produce_image || debug_mode;
    1979         2040 :             if produce_image {
    1980          594 :                 records_since_last_image = 0;
    1981         1446 :             }
    1982         2040 :             let img_and_lsn = if generate_image {
    1983         2040 :                 let replay_history_for_debug = if debug_mode {
    1984         2040 :                     Some(replay_history.clone())
    1985              :                 } else {
    1986            0 :                     None
    1987              :                 };
    1988         2040 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1989         2040 :                 let history = if produce_image {
    1990          594 :                     std::mem::take(&mut replay_history)
    1991              :                 } else {
    1992         1446 :                     replay_history.clone()
    1993              :                 };
    1994         2040 :                 let mut img = None;
    1995         2040 :                 let mut records = Vec::with_capacity(history.len());
    1996         2040 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1997         2018 :                     img = Some((*lsn, val.clone()));
    1998         2018 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1999          460 :                         let Value::WalRecord(rec) = val else {
    2000            0 :                             return Err(anyhow::anyhow!(
    2001            0 :                                 "invalid record, first record is image, expect walrecords"
    2002            0 :                             ))
    2003            0 :                             .with_context(|| {
    2004            0 :                                 generate_debug_trace(
    2005            0 :                                     replay_history_for_debug_ref,
    2006            0 :                                     full_history,
    2007            0 :                                     retain_lsn_below_horizon,
    2008            0 :                                     horizon,
    2009            0 :                                 )
    2010            0 :                             });
    2011              :                         };
    2012          460 :                         records.push((lsn, rec));
    2013              :                     }
    2014              :                 } else {
    2015           36 :                     for (_, lsn, val) in history.into_iter() {
    2016           36 :                         let Value::WalRecord(rec) = val else {
    2017            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    2018            0 :                                 .with_context(|| generate_debug_trace(
    2019            0 :                                     replay_history_for_debug_ref,
    2020            0 :                                     full_history,
    2021            0 :                                     retain_lsn_below_horizon,
    2022            0 :                                     horizon,
    2023            0 :                                 ));
    2024              :                         };
    2025           36 :                         records.push((lsn, rec));
    2026              :                     }
    2027              :                 }
    2028         2040 :                 records.reverse();
    2029         2040 :                 let state = ValueReconstructState { img, records };
    2030              :                 // last batch does not generate image so i is always in range, unless we force generate
    2031              :                 // an image during testing
    2032         2040 :                 let request_lsn = if i >= lsn_split_points.len() {
    2033          630 :                     Lsn::MAX
    2034              :                 } else {
    2035         1410 :                     lsn_split_points[i]
    2036              :                 };
    2037         2040 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    2038         2040 :                 Some((request_lsn, img))
    2039              :             } else {
    2040            0 :                 None
    2041              :             };
    2042         2040 :             if produce_image {
    2043          594 :                 let (request_lsn, img) = img_and_lsn.unwrap();
    2044          594 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    2045          594 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    2046         1446 :             } else {
    2047         1446 :                 let deltas = split_for_lsn
    2048         1446 :                     .iter()
    2049         1446 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    2050         1446 :                     .collect_vec();
    2051         1446 :                 retention.push(deltas);
    2052         1446 :             }
    2053              :         }
    2054          630 :         let mut result = Vec::with_capacity(retention.len());
    2055          630 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    2056         2040 :         for (idx, logs) in retention.into_iter().enumerate() {
    2057         2040 :             if idx == lsn_split_points.len() {
    2058          630 :                 return Ok(KeyHistoryRetention {
    2059          630 :                     below_horizon: result,
    2060          630 :                     above_horizon: KeyLogAtLsn(logs),
    2061          630 :                 });
    2062         1410 :             } else {
    2063         1410 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    2064         1410 :             }
    2065              :         }
    2066            0 :         unreachable!("key retention is empty")
    2067          630 :     }
    2068              : 
    2069              :     /// Check how much space is left on the disk
    2070           52 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    2071           52 :         let tenants_dir = self.conf.tenants_path();
    2072              : 
    2073           52 :         let stat = Statvfs::get(&tenants_dir, None)
    2074           52 :             .context("statvfs failed, presumably directory got unlinked")?;
    2075              : 
    2076           52 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    2077           52 : 
    2078           52 :         Ok(avail_bytes)
    2079           52 :     }
    2080              : 
    2081              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    2082              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    2083              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    2084              :     /// compaction.
    2085           52 :     async fn check_compaction_space(
    2086           52 :         self: &Arc<Self>,
    2087           52 :         layer_selection: &[Layer],
    2088           52 :     ) -> anyhow::Result<()> {
    2089           52 :         let available_space = self.check_available_space().await?;
    2090           52 :         let mut remote_layer_size = 0;
    2091           52 :         let mut all_layer_size = 0;
    2092          204 :         for layer in layer_selection {
    2093          152 :             let needs_download = layer.needs_download().await?;
    2094          152 :             if needs_download.is_some() {
    2095            0 :                 remote_layer_size += layer.layer_desc().file_size;
    2096          152 :             }
    2097          152 :             all_layer_size += layer.layer_desc().file_size;
    2098              :         }
    2099           52 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    2100           52 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    2101              :         {
    2102            0 :             return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    2103            0 :                 available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
    2104           52 :         }
    2105           52 :         Ok(())
    2106           52 :     }
    2107              : 
    2108              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    2109              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    2110              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    2111              :     /// here.
    2112           54 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    2113           54 :         let gc_cutoff_lsn = {
    2114           54 :             let gc_info = self.gc_info.read().unwrap();
    2115           54 :             gc_info.min_cutoff()
    2116           54 :         };
    2117           54 : 
    2118           54 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    2119           54 :         // let watermark = watermark.min(self.standby_horizon.load());
    2120           54 : 
    2121           54 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    2122           54 :         // them when computing the watermark.
    2123           54 :         gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
    2124           54 :     }
    2125              : 
    2126              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    2127              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    2128              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    2129              :     /// like a full compaction of the specified keyspace.
    2130            0 :     pub(crate) async fn gc_compaction_split_jobs(
    2131            0 :         self: &Arc<Self>,
    2132            0 :         job: GcCompactJob,
    2133            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    2134            0 :     ) -> anyhow::Result<Vec<GcCompactJob>> {
    2135            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    2136            0 :             job.compact_lsn_range.end
    2137              :         } else {
    2138            0 :             self.get_gc_compaction_watermark()
    2139              :         };
    2140              : 
    2141              :         // Split compaction job to about 4GB each
    2142              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    2143            0 :         let sub_compaction_max_job_size_mb =
    2144            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    2145            0 : 
    2146            0 :         let mut compact_jobs = Vec::new();
    2147            0 :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    2148            0 :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    2149            0 :         let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
    2150              :         // Truncate the key range to be within user specified compaction range.
    2151            0 :         fn truncate_to(
    2152            0 :             source_start: &Key,
    2153            0 :             source_end: &Key,
    2154            0 :             target_start: &Key,
    2155            0 :             target_end: &Key,
    2156            0 :         ) -> Option<(Key, Key)> {
    2157            0 :             let start = source_start.max(target_start);
    2158            0 :             let end = source_end.min(target_end);
    2159            0 :             if start < end {
    2160            0 :                 Some((*start, *end))
    2161              :             } else {
    2162            0 :                 None
    2163              :             }
    2164            0 :         }
    2165            0 :         let mut split_key_ranges = Vec::new();
    2166            0 :         let ranges = dense_ks
    2167            0 :             .parts
    2168            0 :             .iter()
    2169            0 :             .map(|partition| partition.ranges.iter())
    2170            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    2171            0 :             .flatten()
    2172            0 :             .cloned()
    2173            0 :             .collect_vec();
    2174            0 :         for range in ranges.iter() {
    2175            0 :             let Some((start, end)) = truncate_to(
    2176            0 :                 &range.start,
    2177            0 :                 &range.end,
    2178            0 :                 &job.compact_key_range.start,
    2179            0 :                 &job.compact_key_range.end,
    2180            0 :             ) else {
    2181            0 :                 continue;
    2182              :             };
    2183            0 :             split_key_ranges.push((start, end));
    2184              :         }
    2185            0 :         split_key_ranges.sort();
    2186            0 :         let guard = self.layers.read().await;
    2187            0 :         let layer_map = guard.layer_map()?;
    2188            0 :         let mut current_start = None;
    2189            0 :         let ranges_num = split_key_ranges.len();
    2190            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    2191            0 :             if current_start.is_none() {
    2192            0 :                 current_start = Some(start);
    2193            0 :             }
    2194            0 :             let start = current_start.unwrap();
    2195            0 :             if start >= end {
    2196              :                 // We have already processed this partition.
    2197            0 :                 continue;
    2198            0 :             }
    2199            0 :             let res = layer_map.range_search(start..end, compact_below_lsn);
    2200            0 :             let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
    2201            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    2202              :                 // Try to extend the compaction range so that we include at least one full layer file.
    2203            0 :                 let extended_end = res
    2204            0 :                     .found
    2205            0 :                     .keys()
    2206            0 :                     .map(|layer| layer.layer.key_range.end)
    2207            0 :                     .min();
    2208              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    2209              :                 // In this case, we simply use the specified key range end.
    2210            0 :                 let end = if let Some(extended_end) = extended_end {
    2211            0 :                     extended_end.max(end)
    2212              :                 } else {
    2213            0 :                     end
    2214              :                 };
    2215            0 :                 info!(
    2216            0 :                     "splitting compaction job: {}..{}, estimated_size={}",
    2217              :                     start, end, total_size
    2218              :                 );
    2219            0 :                 compact_jobs.push(GcCompactJob {
    2220            0 :                     dry_run: job.dry_run,
    2221            0 :                     compact_key_range: start..end,
    2222            0 :                     compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    2223            0 :                 });
    2224            0 :                 current_start = Some(end);
    2225            0 :             }
    2226              :         }
    2227            0 :         drop(guard);
    2228            0 :         Ok(compact_jobs)
    2229            0 :     }
    2230              : 
    2231              :     /// An experimental compaction building block that combines compaction with garbage collection.
    2232              :     ///
    2233              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    2234              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    2235              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    2236              :     /// and create delta layers with all deltas >= gc horizon.
    2237              :     ///
    2238              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    2239              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    2240              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    2241              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    2242              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    2243              :     /// part of the range.
    2244              :     ///
    2245              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    2246              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    2247           54 :     pub(crate) async fn compact_with_gc(
    2248           54 :         self: &Arc<Self>,
    2249           54 :         cancel: &CancellationToken,
    2250           54 :         options: CompactOptions,
    2251           54 :         ctx: &RequestContext,
    2252           54 :     ) -> anyhow::Result<()> {
    2253           54 :         let sub_compaction = options.sub_compaction;
    2254           54 :         let job = GcCompactJob::from_compact_options(options.clone());
    2255           54 :         if sub_compaction {
    2256            0 :             info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
    2257            0 :             let jobs = self
    2258            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    2259            0 :                 .await?;
    2260            0 :             let jobs_len = jobs.len();
    2261            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    2262            0 :                 info!(
    2263            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    2264            0 :                     idx + 1,
    2265              :                     jobs_len
    2266              :                 );
    2267            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    2268              :             }
    2269            0 :             if jobs_len == 0 {
    2270            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    2271            0 :             }
    2272            0 :             return Ok(());
    2273           54 :         }
    2274           54 :         self.compact_with_gc_inner(cancel, job, ctx).await
    2275           54 :     }
    2276              : 
    2277           54 :     async fn compact_with_gc_inner(
    2278           54 :         self: &Arc<Self>,
    2279           54 :         cancel: &CancellationToken,
    2280           54 :         job: GcCompactJob,
    2281           54 :         ctx: &RequestContext,
    2282           54 :     ) -> anyhow::Result<()> {
    2283           54 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    2284           54 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    2285           54 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    2286           54 : 
    2287           54 :         let gc_lock = async {
    2288           54 :             tokio::select! {
    2289           54 :                 guard = self.gc_lock.lock() => Ok(guard),
    2290              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    2291           54 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    2292              :             }
    2293           54 :         };
    2294              : 
    2295           54 :         let gc_lock = crate::timed(
    2296           54 :             gc_lock,
    2297           54 :             "acquires gc lock",
    2298           54 :             std::time::Duration::from_secs(5),
    2299           54 :         )
    2300           54 :         .await?;
    2301              : 
    2302           54 :         let dry_run = job.dry_run;
    2303           54 :         let compact_key_range = job.compact_key_range;
    2304           54 :         let compact_lsn_range = job.compact_lsn_range;
    2305              : 
    2306           54 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2307              : 
    2308           54 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
    2309              : 
    2310           54 :         scopeguard::defer! {
    2311           54 :             info!("done enhanced gc bottom-most compaction");
    2312           54 :         };
    2313           54 : 
    2314           54 :         let mut stat = CompactionStatistics::default();
    2315              : 
    2316              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    2317              :         // The layer selection has the following properties:
    2318              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    2319              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    2320           52 :         let job_desc = {
    2321           54 :             let guard = self.layers.read().await;
    2322           54 :             let layers = guard.layer_map()?;
    2323           54 :             let gc_info = self.gc_info.read().unwrap();
    2324           54 :             let mut retain_lsns_below_horizon = Vec::new();
    2325           54 :             let gc_cutoff = {
    2326              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2327              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2328              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2329              :                 // to get the truth data.
    2330           54 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    2331              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2332              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2333              :                 // the real cutoff.
    2334           54 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2335           48 :                     real_gc_cutoff
    2336              :                 } else {
    2337            6 :                     compact_lsn_range.end
    2338              :                 };
    2339           54 :                 if gc_cutoff > real_gc_cutoff {
    2340            4 :                     warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
    2341            4 :                     gc_cutoff = real_gc_cutoff;
    2342           50 :                 }
    2343           54 :                 gc_cutoff
    2344              :             };
    2345           70 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2346           70 :                 if lsn < &gc_cutoff {
    2347           70 :                     retain_lsns_below_horizon.push(*lsn);
    2348           70 :                 }
    2349              :             }
    2350           54 :             for lsn in gc_info.leases.keys() {
    2351            0 :                 if lsn < &gc_cutoff {
    2352            0 :                     retain_lsns_below_horizon.push(*lsn);
    2353            0 :                 }
    2354              :             }
    2355           54 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2356           54 :             drop(gc_info);
    2357              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2358           54 :             let Some(max_layer_lsn) = layers
    2359           54 :                 .iter_historic_layers()
    2360          244 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2361          208 :                 .map(|desc| desc.get_lsn_range().end)
    2362           54 :                 .max()
    2363              :             else {
    2364            0 :                 info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
    2365            0 :                 return Ok(());
    2366              :             };
    2367              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    2368              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    2369              :             // it is a branch.
    2370           54 :             let Some(min_layer_lsn) = layers
    2371           54 :                 .iter_historic_layers()
    2372          244 :                 .filter(|desc| {
    2373          244 :                     if compact_lsn_range.start == Lsn::INVALID {
    2374          198 :                         true // select all layers below if start == Lsn(0)
    2375              :                     } else {
    2376           46 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    2377              :                     }
    2378          244 :                 })
    2379          226 :                 .map(|desc| desc.get_lsn_range().start)
    2380           54 :                 .min()
    2381              :             else {
    2382            0 :                 info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
    2383            0 :                 return Ok(());
    2384              :             };
    2385              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2386              :             // layers to compact.
    2387           54 :             let mut rewrite_layers = Vec::new();
    2388          244 :             for desc in layers.iter_historic_layers() {
    2389          244 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2390          208 :                     && desc.get_lsn_range().start >= min_layer_lsn
    2391          190 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    2392              :                 {
    2393              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2394              :                     // even if it might contain extra keys
    2395          152 :                     selected_layers.push(guard.get_from_desc(&desc));
    2396          152 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2397          152 :                     // to overlap image layers)
    2398          152 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    2399            2 :                     {
    2400            2 :                         rewrite_layers.push(desc);
    2401          150 :                     }
    2402           92 :                 }
    2403              :             }
    2404           54 :             if selected_layers.is_empty() {
    2405            2 :                 info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
    2406            2 :                 return Ok(());
    2407           52 :             }
    2408           52 :             retain_lsns_below_horizon.sort();
    2409           52 :             GcCompactionJobDescription {
    2410           52 :                 selected_layers,
    2411           52 :                 gc_cutoff,
    2412           52 :                 retain_lsns_below_horizon,
    2413           52 :                 min_layer_lsn,
    2414           52 :                 max_layer_lsn,
    2415           52 :                 compaction_key_range: compact_key_range,
    2416           52 :                 rewrite_layers,
    2417           52 :             }
    2418              :         };
    2419           52 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    2420              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    2421              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    2422            8 :             (true, job_desc.min_layer_lsn)
    2423           44 :         } else if self.ancestor_timeline.is_some() {
    2424              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    2425              :             // LSN ranges all the way to the ancestor timeline.
    2426            2 :             (true, self.ancestor_lsn)
    2427              :         } else {
    2428           42 :             let res = job_desc
    2429           42 :                 .retain_lsns_below_horizon
    2430           42 :                 .first()
    2431           42 :                 .copied()
    2432           42 :                 .unwrap_or(job_desc.gc_cutoff);
    2433           42 :             if debug_mode {
    2434           42 :                 assert_eq!(
    2435           42 :                     res,
    2436           42 :                     job_desc
    2437           42 :                         .retain_lsns_below_horizon
    2438           42 :                         .iter()
    2439           42 :                         .min()
    2440           42 :                         .copied()
    2441           42 :                         .unwrap_or(job_desc.gc_cutoff)
    2442           42 :                 );
    2443            0 :             }
    2444           42 :             (false, res)
    2445              :         };
    2446           52 :         info!(
    2447            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    2448            0 :             job_desc.selected_layers.len(),
    2449            0 :             job_desc.rewrite_layers.len(),
    2450              :             job_desc.max_layer_lsn,
    2451              :             job_desc.min_layer_lsn,
    2452              :             job_desc.gc_cutoff,
    2453              :             lowest_retain_lsn,
    2454              :             job_desc.compaction_key_range.start,
    2455              :             job_desc.compaction_key_range.end,
    2456              :             has_data_below,
    2457              :         );
    2458              : 
    2459          204 :         for layer in &job_desc.selected_layers {
    2460          152 :             debug!("read layer: {}", layer.layer_desc().key());
    2461              :         }
    2462           54 :         for layer in &job_desc.rewrite_layers {
    2463            2 :             debug!("rewrite layer: {}", layer.key());
    2464              :         }
    2465              : 
    2466           52 :         self.check_compaction_space(&job_desc.selected_layers)
    2467           52 :             .await?;
    2468              : 
    2469              :         // Generate statistics for the compaction
    2470          204 :         for layer in &job_desc.selected_layers {
    2471          152 :             let desc = layer.layer_desc();
    2472          152 :             if desc.is_delta() {
    2473           86 :                 stat.visit_delta_layer(desc.file_size());
    2474           86 :             } else {
    2475           66 :                 stat.visit_image_layer(desc.file_size());
    2476           66 :             }
    2477              :         }
    2478              : 
    2479              :         // Step 1: construct a k-merge iterator over all layers.
    2480              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2481           52 :         let layer_names = job_desc
    2482           52 :             .selected_layers
    2483           52 :             .iter()
    2484          152 :             .map(|layer| layer.layer_desc().layer_name())
    2485           52 :             .collect_vec();
    2486           52 :         if let Some(err) = check_valid_layermap(&layer_names) {
    2487            0 :             bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
    2488           52 :         }
    2489           52 :         // The maximum LSN we are processing in this compaction loop
    2490           52 :         let end_lsn = job_desc
    2491           52 :             .selected_layers
    2492           52 :             .iter()
    2493          152 :             .map(|l| l.layer_desc().lsn_range.end)
    2494           52 :             .max()
    2495           52 :             .unwrap();
    2496           52 :         let mut delta_layers = Vec::new();
    2497           52 :         let mut image_layers = Vec::new();
    2498           52 :         let mut downloaded_layers = Vec::new();
    2499           52 :         let mut total_downloaded_size = 0;
    2500           52 :         let mut total_layer_size = 0;
    2501          204 :         for layer in &job_desc.selected_layers {
    2502          152 :             if layer.needs_download().await?.is_some() {
    2503            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    2504          152 :             }
    2505          152 :             total_layer_size += layer.layer_desc().file_size;
    2506          152 :             let resident_layer = layer.download_and_keep_resident().await?;
    2507          152 :             downloaded_layers.push(resident_layer);
    2508              :         }
    2509           52 :         info!(
    2510            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    2511            0 :             total_downloaded_size,
    2512            0 :             total_layer_size,
    2513            0 :             total_downloaded_size as f64 / total_layer_size as f64
    2514              :         );
    2515          204 :         for resident_layer in &downloaded_layers {
    2516          152 :             if resident_layer.layer_desc().is_delta() {
    2517           86 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2518           86 :                 delta_layers.push(layer);
    2519              :             } else {
    2520           66 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2521           66 :                 image_layers.push(layer);
    2522              :             }
    2523              :         }
    2524           52 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2525           52 :         let mut merge_iter = FilterIterator::create(
    2526           52 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2527           52 :             dense_ks,
    2528           52 :             sparse_ks,
    2529           52 :         )?;
    2530              : 
    2531              :         // Step 2: Produce images+deltas.
    2532           52 :         let mut accumulated_values = Vec::new();
    2533           52 :         let mut last_key: Option<Key> = None;
    2534              : 
    2535              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2536              :         // when some condition meet.
    2537           52 :         let mut image_layer_writer = if !has_data_below {
    2538              :             Some(
    2539           42 :                 SplitImageLayerWriter::new(
    2540           42 :                     self.conf,
    2541           42 :                     self.timeline_id,
    2542           42 :                     self.tenant_shard_id,
    2543           42 :                     job_desc.compaction_key_range.start,
    2544           42 :                     lowest_retain_lsn,
    2545           42 :                     self.get_compaction_target_size(),
    2546           42 :                     ctx,
    2547           42 :                 )
    2548           42 :                 .await?,
    2549              :             )
    2550              :         } else {
    2551           10 :             None
    2552              :         };
    2553              : 
    2554           52 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2555           52 :             self.conf,
    2556           52 :             self.timeline_id,
    2557           52 :             self.tenant_shard_id,
    2558           52 :             lowest_retain_lsn..end_lsn,
    2559           52 :             self.get_compaction_target_size(),
    2560           52 :         )
    2561           52 :         .await?;
    2562              : 
    2563              :         #[derive(Default)]
    2564              :         struct RewritingLayers {
    2565              :             before: Option<DeltaLayerWriter>,
    2566              :             after: Option<DeltaLayerWriter>,
    2567              :         }
    2568           52 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2569              : 
    2570              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    2571              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    2572              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    2573              :         ///
    2574              :         /// This function unifies the cases so that later code doesn't have to think about it.
    2575              :         ///
    2576              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2577              :         /// is needed for reconstruction. This should be fixed in the future.
    2578              :         ///
    2579              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2580              :         /// images.
    2581          622 :         async fn get_ancestor_image(
    2582          622 :             this_tline: &Arc<Timeline>,
    2583          622 :             key: Key,
    2584          622 :             ctx: &RequestContext,
    2585          622 :             has_data_below: bool,
    2586          622 :             history_lsn_point: Lsn,
    2587          622 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2588          622 :             if !has_data_below {
    2589          584 :                 return Ok(None);
    2590           38 :             };
    2591              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2592              :             // as much existing code as possible.
    2593           38 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    2594           38 :             Ok(Some((key, history_lsn_point, img)))
    2595          622 :         }
    2596              : 
    2597              :         // Actually, we can decide not to write to the image layer at all at this point because
    2598              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2599              :         // create this writer, and discard the writer in the end.
    2600              : 
    2601          966 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2602          914 :             if cancel.is_cancelled() {
    2603            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2604          914 :             }
    2605          914 :             if self.shard_identity.is_key_disposable(&key) {
    2606              :                 // If this shard does not need to store this key, simply skip it.
    2607              :                 //
    2608              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2609              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2610              :                 // a whole layer file as handling key spaces (ranges).
    2611            0 :                 if cfg!(debug_assertions) {
    2612            0 :                     let shard = self.shard_identity.shard_index();
    2613            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2614            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2615            0 :                 }
    2616            0 :                 continue;
    2617          914 :             }
    2618          914 :             if !job_desc.compaction_key_range.contains(&key) {
    2619           64 :                 if !desc.is_delta {
    2620           60 :                     continue;
    2621            4 :                 }
    2622            4 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2623            4 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2624            0 :                     if rewriter.before.is_none() {
    2625            0 :                         rewriter.before = Some(
    2626            0 :                             DeltaLayerWriter::new(
    2627            0 :                                 self.conf,
    2628            0 :                                 self.timeline_id,
    2629            0 :                                 self.tenant_shard_id,
    2630            0 :                                 desc.key_range.start,
    2631            0 :                                 desc.lsn_range.clone(),
    2632            0 :                                 ctx,
    2633            0 :                             )
    2634            0 :                             .await?,
    2635              :                         );
    2636            0 :                     }
    2637            0 :                     rewriter.before.as_mut().unwrap()
    2638            4 :                 } else if key >= job_desc.compaction_key_range.end {
    2639            4 :                     if rewriter.after.is_none() {
    2640            2 :                         rewriter.after = Some(
    2641            2 :                             DeltaLayerWriter::new(
    2642            2 :                                 self.conf,
    2643            2 :                                 self.timeline_id,
    2644            2 :                                 self.tenant_shard_id,
    2645            2 :                                 job_desc.compaction_key_range.end,
    2646            2 :                                 desc.lsn_range.clone(),
    2647            2 :                                 ctx,
    2648            2 :                             )
    2649            2 :                             .await?,
    2650              :                         );
    2651            2 :                     }
    2652            4 :                     rewriter.after.as_mut().unwrap()
    2653              :                 } else {
    2654            0 :                     unreachable!()
    2655              :                 };
    2656            4 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2657            4 :                 continue;
    2658          850 :             }
    2659          850 :             match val {
    2660          610 :                 Value::Image(_) => stat.visit_image_key(&val),
    2661          240 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2662              :             }
    2663          850 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2664          280 :                 if last_key.is_none() {
    2665           52 :                     last_key = Some(key);
    2666          228 :                 }
    2667          280 :                 accumulated_values.push((key, lsn, val));
    2668              :             } else {
    2669          570 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2670          570 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    2671          570 :                 let retention = self
    2672          570 :                     .generate_key_retention(
    2673          570 :                         *last_key,
    2674          570 :                         &accumulated_values,
    2675          570 :                         job_desc.gc_cutoff,
    2676          570 :                         &job_desc.retain_lsns_below_horizon,
    2677          570 :                         COMPACTION_DELTA_THRESHOLD,
    2678          570 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    2679          570 :                             .await?,
    2680              :                     )
    2681          570 :                     .await?;
    2682          570 :                 retention
    2683          570 :                     .pipe_to(
    2684          570 :                         *last_key,
    2685          570 :                         &mut delta_layer_writer,
    2686          570 :                         image_layer_writer.as_mut(),
    2687          570 :                         &mut stat,
    2688          570 :                         ctx,
    2689          570 :                     )
    2690          570 :                     .await?;
    2691          570 :                 accumulated_values.clear();
    2692          570 :                 *last_key = key;
    2693          570 :                 accumulated_values.push((key, lsn, val));
    2694              :             }
    2695              :         }
    2696              : 
    2697              :         // TODO: move the below part to the loop body
    2698           52 :         let last_key = last_key.expect("no keys produced during compaction");
    2699           52 :         stat.on_unique_key_visited();
    2700              : 
    2701           52 :         let retention = self
    2702           52 :             .generate_key_retention(
    2703           52 :                 last_key,
    2704           52 :                 &accumulated_values,
    2705           52 :                 job_desc.gc_cutoff,
    2706           52 :                 &job_desc.retain_lsns_below_horizon,
    2707           52 :                 COMPACTION_DELTA_THRESHOLD,
    2708           52 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
    2709              :             )
    2710           52 :             .await?;
    2711           52 :         retention
    2712           52 :             .pipe_to(
    2713           52 :                 last_key,
    2714           52 :                 &mut delta_layer_writer,
    2715           52 :                 image_layer_writer.as_mut(),
    2716           52 :                 &mut stat,
    2717           52 :                 ctx,
    2718           52 :             )
    2719           52 :             .await?;
    2720              :         // end: move the above part to the loop body
    2721              : 
    2722           52 :         let mut rewrote_delta_layers = Vec::new();
    2723           54 :         for (key, writers) in delta_layer_rewriters {
    2724            2 :             if let Some(delta_writer_before) = writers.before {
    2725            0 :                 let (desc, path) = delta_writer_before
    2726            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    2727            0 :                     .await?;
    2728            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2729            0 :                 rewrote_delta_layers.push(layer);
    2730            2 :             }
    2731            2 :             if let Some(delta_writer_after) = writers.after {
    2732            2 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    2733            2 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2734            2 :                 rewrote_delta_layers.push(layer);
    2735            0 :             }
    2736              :         }
    2737              : 
    2738           74 :         let discard = |key: &PersistentLayerKey| {
    2739           74 :             let key = key.clone();
    2740           74 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    2741           74 :         };
    2742              : 
    2743           52 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    2744           42 :             if !dry_run {
    2745           38 :                 let end_key = job_desc.compaction_key_range.end;
    2746           38 :                 writer
    2747           38 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    2748           38 :                     .await?
    2749              :             } else {
    2750            4 :                 drop(writer);
    2751            4 :                 Vec::new()
    2752              :             }
    2753              :         } else {
    2754           10 :             Vec::new()
    2755              :         };
    2756              : 
    2757           52 :         let produced_delta_layers = if !dry_run {
    2758           48 :             delta_layer_writer
    2759           48 :                 .finish_with_discard_fn(self, ctx, discard)
    2760           48 :                 .await?
    2761              :         } else {
    2762            4 :             drop(delta_layer_writer);
    2763            4 :             Vec::new()
    2764              :         };
    2765              : 
    2766              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    2767              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    2768           52 :         let mut compact_to = Vec::new();
    2769           52 :         let mut keep_layers = HashSet::new();
    2770           52 :         let produced_delta_layers_len = produced_delta_layers.len();
    2771           52 :         let produced_image_layers_len = produced_image_layers.len();
    2772           88 :         for action in produced_delta_layers {
    2773           36 :             match action {
    2774           22 :                 BatchWriterResult::Produced(layer) => {
    2775           22 :                     if cfg!(debug_assertions) {
    2776           22 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    2777            0 :                     }
    2778           22 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    2779           22 :                     compact_to.push(layer);
    2780              :                 }
    2781           14 :                 BatchWriterResult::Discarded(l) => {
    2782           14 :                     if cfg!(debug_assertions) {
    2783           14 :                         info!("discarded delta layer: {}", l);
    2784            0 :                     }
    2785           14 :                     keep_layers.insert(l);
    2786           14 :                     stat.discard_delta_layer();
    2787              :                 }
    2788              :             }
    2789              :         }
    2790           54 :         for layer in &rewrote_delta_layers {
    2791            2 :             debug!(
    2792            0 :                 "produced rewritten delta layer: {}",
    2793            0 :                 layer.layer_desc().key()
    2794              :             );
    2795              :         }
    2796           52 :         compact_to.extend(rewrote_delta_layers);
    2797           90 :         for action in produced_image_layers {
    2798           38 :             match action {
    2799           30 :                 BatchWriterResult::Produced(layer) => {
    2800           30 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    2801           30 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2802           30 :                     compact_to.push(layer);
    2803              :                 }
    2804            8 :                 BatchWriterResult::Discarded(l) => {
    2805            8 :                     debug!("discarded image layer: {}", l);
    2806            8 :                     keep_layers.insert(l);
    2807            8 :                     stat.discard_image_layer();
    2808              :                 }
    2809              :             }
    2810              :         }
    2811              : 
    2812           52 :         let mut layer_selection = job_desc.selected_layers;
    2813              : 
    2814              :         // Partial compaction might select more data than it processes, e.g., if
    2815              :         // the compaction_key_range only partially overlaps:
    2816              :         //
    2817              :         //         [---compaction_key_range---]
    2818              :         //   [---A----][----B----][----C----][----D----]
    2819              :         //
    2820              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    2821              :         // the compaction key range, so we can always discard them. However, for image
    2822              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    2823              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    2824              :         //
    2825              :         // The created image layers contain whatever is needed from B, C, and from
    2826              :         // `----]` of A, and from  `[---` of D.
    2827              :         //
    2828              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    2829              :         // keep that data.
    2830              :         //
    2831              :         // The solution for now is to keep A and D completely if they are image layers.
    2832              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    2833              :         // is _not_ fully covered by compaction_key_range).
    2834          204 :         for layer in &layer_selection {
    2835          152 :             if !layer.layer_desc().is_delta() {
    2836           66 :                 if !overlaps_with(
    2837           66 :                     &layer.layer_desc().key_range,
    2838           66 :                     &job_desc.compaction_key_range,
    2839           66 :                 ) {
    2840            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    2841           66 :                 }
    2842           66 :                 if !fully_contains(
    2843           66 :                     &job_desc.compaction_key_range,
    2844           66 :                     &layer.layer_desc().key_range,
    2845           66 :                 ) {
    2846            8 :                     keep_layers.insert(layer.layer_desc().key());
    2847           58 :                 }
    2848           86 :             }
    2849              :         }
    2850              : 
    2851          152 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2852           52 : 
    2853           52 :         info!(
    2854            0 :             "gc-compaction statistics: {}",
    2855            0 :             serde_json::to_string(&stat)?
    2856              :         );
    2857              : 
    2858           52 :         if dry_run {
    2859            4 :             return Ok(());
    2860           48 :         }
    2861           48 : 
    2862           48 :         info!(
    2863            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2864            0 :             produced_delta_layers_len,
    2865            0 :             produced_image_layers_len,
    2866            0 :             layer_selection.len()
    2867              :         );
    2868              : 
    2869              :         // Step 3: Place back to the layer map.
    2870              : 
    2871              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    2872           48 :         let all_layers = {
    2873           48 :             let guard = self.layers.read().await;
    2874           48 :             let layer_map = guard.layer_map()?;
    2875           48 :             layer_map.iter_historic_layers().collect_vec()
    2876           48 :         };
    2877           48 : 
    2878           48 :         let mut final_layers = all_layers
    2879           48 :             .iter()
    2880          214 :             .map(|layer| layer.layer_name())
    2881           48 :             .collect::<HashSet<_>>();
    2882          152 :         for layer in &layer_selection {
    2883          104 :             final_layers.remove(&layer.layer_desc().layer_name());
    2884          104 :         }
    2885          102 :         for layer in &compact_to {
    2886           54 :             final_layers.insert(layer.layer_desc().layer_name());
    2887           54 :         }
    2888           48 :         let final_layers = final_layers.into_iter().collect_vec();
    2889              : 
    2890              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    2891              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    2892              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    2893           48 :         if let Some(err) = check_valid_layermap(&final_layers) {
    2894            0 :             bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
    2895           48 :         }
    2896              : 
    2897              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    2898              :         // operate on L1 layers.
    2899              :         {
    2900           48 :             let mut guard = self.layers.write().await;
    2901           48 :             guard
    2902           48 :                 .open_mut()?
    2903           48 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2904           48 :         };
    2905           48 : 
    2906           48 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    2907           48 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    2908           48 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    2909           48 :         // be batched into `schedule_compaction_update`.
    2910           48 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    2911           48 :         self.schedule_uploads(disk_consistent_lsn, None)?;
    2912           48 :         self.remote_client
    2913           48 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2914              : 
    2915           48 :         drop(gc_lock);
    2916           48 : 
    2917           48 :         Ok(())
    2918           54 :     }
    2919              : }
    2920              : 
    2921              : struct TimelineAdaptor {
    2922              :     timeline: Arc<Timeline>,
    2923              : 
    2924              :     keyspace: (Lsn, KeySpace),
    2925              : 
    2926              :     new_deltas: Vec<ResidentLayer>,
    2927              :     new_images: Vec<ResidentLayer>,
    2928              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2929              : }
    2930              : 
    2931              : impl TimelineAdaptor {
    2932            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2933            0 :         Self {
    2934            0 :             timeline: timeline.clone(),
    2935            0 :             keyspace,
    2936            0 :             new_images: Vec::new(),
    2937            0 :             new_deltas: Vec::new(),
    2938            0 :             layers_to_delete: Vec::new(),
    2939            0 :         }
    2940            0 :     }
    2941              : 
    2942            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2943            0 :         let layers_to_delete = {
    2944            0 :             let guard = self.timeline.layers.read().await;
    2945            0 :             self.layers_to_delete
    2946            0 :                 .iter()
    2947            0 :                 .map(|x| guard.get_from_desc(x))
    2948            0 :                 .collect::<Vec<Layer>>()
    2949            0 :         };
    2950            0 :         self.timeline
    2951            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2952            0 :             .await?;
    2953              : 
    2954            0 :         self.timeline
    2955            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2956              : 
    2957            0 :         self.new_deltas.clear();
    2958            0 :         self.layers_to_delete.clear();
    2959            0 :         Ok(())
    2960            0 :     }
    2961              : }
    2962              : 
    2963              : #[derive(Clone)]
    2964              : struct ResidentDeltaLayer(ResidentLayer);
    2965              : #[derive(Clone)]
    2966              : struct ResidentImageLayer(ResidentLayer);
    2967              : 
    2968              : impl CompactionJobExecutor for TimelineAdaptor {
    2969              :     type Key = pageserver_api::key::Key;
    2970              : 
    2971              :     type Layer = OwnArc<PersistentLayerDesc>;
    2972              :     type DeltaLayer = ResidentDeltaLayer;
    2973              :     type ImageLayer = ResidentImageLayer;
    2974              : 
    2975              :     type RequestContext = crate::context::RequestContext;
    2976              : 
    2977            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2978            0 :         self.timeline.get_shard_identity()
    2979            0 :     }
    2980              : 
    2981            0 :     async fn get_layers(
    2982            0 :         &mut self,
    2983            0 :         key_range: &Range<Key>,
    2984            0 :         lsn_range: &Range<Lsn>,
    2985            0 :         _ctx: &RequestContext,
    2986            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2987            0 :         self.flush_updates().await?;
    2988              : 
    2989            0 :         let guard = self.timeline.layers.read().await;
    2990            0 :         let layer_map = guard.layer_map()?;
    2991              : 
    2992            0 :         let result = layer_map
    2993            0 :             .iter_historic_layers()
    2994            0 :             .filter(|l| {
    2995            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2996            0 :             })
    2997            0 :             .map(OwnArc)
    2998            0 :             .collect();
    2999            0 :         Ok(result)
    3000            0 :     }
    3001              : 
    3002            0 :     async fn get_keyspace(
    3003            0 :         &mut self,
    3004            0 :         key_range: &Range<Key>,
    3005            0 :         lsn: Lsn,
    3006            0 :         _ctx: &RequestContext,
    3007            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    3008            0 :         if lsn == self.keyspace.0 {
    3009            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    3010            0 :                 &self.keyspace.1.ranges,
    3011            0 :                 key_range,
    3012            0 :             ))
    3013              :         } else {
    3014              :             // The current compaction implementation only ever requests the key space
    3015              :             // at the compaction end LSN.
    3016            0 :             anyhow::bail!("keyspace not available for requested lsn");
    3017              :         }
    3018            0 :     }
    3019              : 
    3020            0 :     async fn downcast_delta_layer(
    3021            0 :         &self,
    3022            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3023            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    3024            0 :         // this is a lot more complex than a simple downcast...
    3025            0 :         if layer.is_delta() {
    3026            0 :             let l = {
    3027            0 :                 let guard = self.timeline.layers.read().await;
    3028            0 :                 guard.get_from_desc(layer)
    3029              :             };
    3030            0 :             let result = l.download_and_keep_resident().await?;
    3031              : 
    3032            0 :             Ok(Some(ResidentDeltaLayer(result)))
    3033              :         } else {
    3034            0 :             Ok(None)
    3035              :         }
    3036            0 :     }
    3037              : 
    3038            0 :     async fn create_image(
    3039            0 :         &mut self,
    3040            0 :         lsn: Lsn,
    3041            0 :         key_range: &Range<Key>,
    3042            0 :         ctx: &RequestContext,
    3043            0 :     ) -> anyhow::Result<()> {
    3044            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    3045            0 :     }
    3046              : 
    3047            0 :     async fn create_delta(
    3048            0 :         &mut self,
    3049            0 :         lsn_range: &Range<Lsn>,
    3050            0 :         key_range: &Range<Key>,
    3051            0 :         input_layers: &[ResidentDeltaLayer],
    3052            0 :         ctx: &RequestContext,
    3053            0 :     ) -> anyhow::Result<()> {
    3054            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3055              : 
    3056            0 :         let mut all_entries = Vec::new();
    3057            0 :         for dl in input_layers.iter() {
    3058            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    3059              :         }
    3060              : 
    3061              :         // The current stdlib sorting implementation is designed in a way where it is
    3062              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3063            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3064              : 
    3065            0 :         let mut writer = DeltaLayerWriter::new(
    3066            0 :             self.timeline.conf,
    3067            0 :             self.timeline.timeline_id,
    3068            0 :             self.timeline.tenant_shard_id,
    3069            0 :             key_range.start,
    3070            0 :             lsn_range.clone(),
    3071            0 :             ctx,
    3072            0 :         )
    3073            0 :         .await?;
    3074              : 
    3075            0 :         let mut dup_values = 0;
    3076            0 : 
    3077            0 :         // This iterator walks through all key-value pairs from all the layers
    3078            0 :         // we're compacting, in key, LSN order.
    3079            0 :         let mut prev: Option<(Key, Lsn)> = None;
    3080              :         for &DeltaEntry {
    3081            0 :             key, lsn, ref val, ..
    3082            0 :         } in all_entries.iter()
    3083              :         {
    3084            0 :             if prev == Some((key, lsn)) {
    3085              :                 // This is a duplicate. Skip it.
    3086              :                 //
    3087              :                 // It can happen if compaction is interrupted after writing some
    3088              :                 // layers but not all, and we are compacting the range again.
    3089              :                 // The calculations in the algorithm assume that there are no
    3090              :                 // duplicates, so the math on targeted file size is likely off,
    3091              :                 // and we will create smaller files than expected.
    3092            0 :                 dup_values += 1;
    3093            0 :                 continue;
    3094            0 :             }
    3095              : 
    3096            0 :             let value = val.load(ctx).await?;
    3097              : 
    3098            0 :             writer.put_value(key, lsn, value, ctx).await?;
    3099              : 
    3100            0 :             prev = Some((key, lsn));
    3101              :         }
    3102              : 
    3103            0 :         if dup_values > 0 {
    3104            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    3105            0 :         }
    3106              : 
    3107            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3108            0 :             Err(anyhow::anyhow!(
    3109            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    3110            0 :             ))
    3111            0 :         });
    3112              : 
    3113            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    3114            0 :         let new_delta_layer =
    3115            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3116              : 
    3117            0 :         self.new_deltas.push(new_delta_layer);
    3118            0 :         Ok(())
    3119            0 :     }
    3120              : 
    3121            0 :     async fn delete_layer(
    3122            0 :         &mut self,
    3123            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3124            0 :         _ctx: &RequestContext,
    3125            0 :     ) -> anyhow::Result<()> {
    3126            0 :         self.layers_to_delete.push(layer.clone().0);
    3127            0 :         Ok(())
    3128            0 :     }
    3129              : }
    3130              : 
    3131              : impl TimelineAdaptor {
    3132            0 :     async fn create_image_impl(
    3133            0 :         &mut self,
    3134            0 :         lsn: Lsn,
    3135            0 :         key_range: &Range<Key>,
    3136            0 :         ctx: &RequestContext,
    3137            0 :     ) -> Result<(), CreateImageLayersError> {
    3138            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    3139              : 
    3140            0 :         let image_layer_writer = ImageLayerWriter::new(
    3141            0 :             self.timeline.conf,
    3142            0 :             self.timeline.timeline_id,
    3143            0 :             self.timeline.tenant_shard_id,
    3144            0 :             key_range,
    3145            0 :             lsn,
    3146            0 :             ctx,
    3147            0 :         )
    3148            0 :         .await?;
    3149              : 
    3150            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    3151            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3152            0 :                 "failpoint image-layer-writer-fail-before-finish"
    3153            0 :             )))
    3154            0 :         });
    3155              : 
    3156            0 :         let keyspace = KeySpace {
    3157            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    3158              :         };
    3159              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    3160            0 :         let start = Key::MIN;
    3161              :         let ImageLayerCreationOutcome {
    3162            0 :             image,
    3163              :             next_start_key: _,
    3164            0 :         } = self
    3165            0 :             .timeline
    3166            0 :             .create_image_layer_for_rel_blocks(
    3167            0 :                 &keyspace,
    3168            0 :                 image_layer_writer,
    3169            0 :                 lsn,
    3170            0 :                 ctx,
    3171            0 :                 key_range.clone(),
    3172            0 :                 start,
    3173            0 :             )
    3174            0 :             .await?;
    3175              : 
    3176            0 :         if let Some(image_layer) = image {
    3177            0 :             self.new_images.push(image_layer);
    3178            0 :         }
    3179              : 
    3180            0 :         timer.stop_and_record();
    3181            0 : 
    3182            0 :         Ok(())
    3183            0 :     }
    3184              : }
    3185              : 
    3186              : impl CompactionRequestContext for crate::context::RequestContext {}
    3187              : 
    3188              : #[derive(Debug, Clone)]
    3189              : pub struct OwnArc<T>(pub Arc<T>);
    3190              : 
    3191              : impl<T> Deref for OwnArc<T> {
    3192              :     type Target = <Arc<T> as Deref>::Target;
    3193            0 :     fn deref(&self) -> &Self::Target {
    3194            0 :         &self.0
    3195            0 :     }
    3196              : }
    3197              : 
    3198              : impl<T> AsRef<T> for OwnArc<T> {
    3199            0 :     fn as_ref(&self) -> &T {
    3200            0 :         self.0.as_ref()
    3201            0 :     }
    3202              : }
    3203              : 
    3204              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    3205            0 :     fn key_range(&self) -> &Range<Key> {
    3206            0 :         &self.key_range
    3207            0 :     }
    3208            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3209            0 :         &self.lsn_range
    3210            0 :     }
    3211            0 :     fn file_size(&self) -> u64 {
    3212            0 :         self.file_size
    3213            0 :     }
    3214            0 :     fn short_id(&self) -> std::string::String {
    3215            0 :         self.as_ref().short_id().to_string()
    3216            0 :     }
    3217            0 :     fn is_delta(&self) -> bool {
    3218            0 :         self.as_ref().is_delta()
    3219            0 :     }
    3220              : }
    3221              : 
    3222              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    3223            0 :     fn key_range(&self) -> &Range<Key> {
    3224            0 :         &self.layer_desc().key_range
    3225            0 :     }
    3226            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3227            0 :         &self.layer_desc().lsn_range
    3228            0 :     }
    3229            0 :     fn file_size(&self) -> u64 {
    3230            0 :         self.layer_desc().file_size
    3231            0 :     }
    3232            0 :     fn short_id(&self) -> std::string::String {
    3233            0 :         self.layer_desc().short_id().to_string()
    3234            0 :     }
    3235            0 :     fn is_delta(&self) -> bool {
    3236            0 :         true
    3237            0 :     }
    3238              : }
    3239              : 
    3240              : use crate::tenant::timeline::DeltaEntry;
    3241              : 
    3242              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    3243            0 :     fn key_range(&self) -> &Range<Key> {
    3244            0 :         &self.0.layer_desc().key_range
    3245            0 :     }
    3246            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3247            0 :         &self.0.layer_desc().lsn_range
    3248            0 :     }
    3249            0 :     fn file_size(&self) -> u64 {
    3250            0 :         self.0.layer_desc().file_size
    3251            0 :     }
    3252            0 :     fn short_id(&self) -> std::string::String {
    3253            0 :         self.0.layer_desc().short_id().to_string()
    3254            0 :     }
    3255            0 :     fn is_delta(&self) -> bool {
    3256            0 :         true
    3257            0 :     }
    3258              : }
    3259              : 
    3260              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    3261              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    3262              : 
    3263            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    3264            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    3265            0 :     }
    3266              : }
    3267              : 
    3268              : impl CompactionLayer<Key> for ResidentImageLayer {
    3269            0 :     fn key_range(&self) -> &Range<Key> {
    3270            0 :         &self.0.layer_desc().key_range
    3271            0 :     }
    3272            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3273            0 :         &self.0.layer_desc().lsn_range
    3274            0 :     }
    3275            0 :     fn file_size(&self) -> u64 {
    3276            0 :         self.0.layer_desc().file_size
    3277            0 :     }
    3278            0 :     fn short_id(&self) -> std::string::String {
    3279            0 :         self.0.layer_desc().short_id().to_string()
    3280            0 :     }
    3281            0 :     fn is_delta(&self) -> bool {
    3282            0 :         false
    3283            0 :     }
    3284              : }
    3285              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta