LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: ae32e90d936a6b0438b35d9ab8babc961b1ac107.info Lines: 55.4 % 2280 1264
Test Date: 2025-01-29 17:08:55 Functions: 39.8 % 161 64

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, bail, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::key::KEY_SIZE;
      23              : use pageserver_api::keyspace::ShardedRange;
      24              : use pageserver_api::models::CompactInfoResponse;
      25              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      26              : use serde::Serialize;
      27              : use tokio_util::sync::CancellationToken;
      28              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      29              : use utils::id::TimelineId;
      30              : 
      31              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      32              : use crate::page_cache;
      33              : use crate::statvfs::Statvfs;
      34              : use crate::tenant::checks::check_valid_layermap;
      35              : use crate::tenant::gc_block::GcBlock;
      36              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      37              : use crate::tenant::storage_layer::batch_split_writer::{
      38              :     BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
      39              : };
      40              : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
      41              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      42              : use crate::tenant::storage_layer::{
      43              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      44              : };
      45              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      46              : use crate::tenant::timeline::{ImageLayerCreationOutcome, IoConcurrency};
      47              : use crate::tenant::timeline::{Layer, ResidentLayer};
      48              : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
      49              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      50              : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
      51              : 
      52              : use pageserver_api::key::Key;
      53              : use pageserver_api::keyspace::KeySpace;
      54              : use pageserver_api::record::NeonWalRecord;
      55              : use pageserver_api::value::Value;
      56              : 
      57              : use utils::lsn::Lsn;
      58              : 
      59              : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
      60              : use pageserver_compaction::interface::*;
      61              : 
      62              : use super::CompactionError;
      63              : 
      64              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      65              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      66              : 
      67              : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
      68              : pub struct GcCompactionJobId(pub usize);
      69              : 
      70              : impl std::fmt::Display for GcCompactionJobId {
      71            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      72            0 :         write!(f, "{}", self.0)
      73            0 :     }
      74              : }
      75              : 
      76              : #[derive(Debug, Clone)]
      77              : pub enum GcCompactionQueueItem {
      78              :     Manual(CompactOptions),
      79              :     SubCompactionJob(CompactOptions),
      80              :     #[allow(dead_code)]
      81              :     UpdateL2Lsn(Lsn),
      82              :     Notify(GcCompactionJobId),
      83              : }
      84              : 
      85              : impl GcCompactionQueueItem {
      86            0 :     pub fn into_compact_info_resp(
      87            0 :         self,
      88            0 :         id: GcCompactionJobId,
      89            0 :         running: bool,
      90            0 :     ) -> Option<CompactInfoResponse> {
      91            0 :         match self {
      92            0 :             GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
      93            0 :                 compact_key_range: options.compact_key_range,
      94            0 :                 compact_lsn_range: options.compact_lsn_range,
      95            0 :                 sub_compaction: options.sub_compaction,
      96            0 :                 running,
      97            0 :                 job_id: id.0,
      98            0 :             }),
      99            0 :             GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
     100            0 :                 compact_key_range: options.compact_key_range,
     101            0 :                 compact_lsn_range: options.compact_lsn_range,
     102            0 :                 sub_compaction: options.sub_compaction,
     103            0 :                 running,
     104            0 :                 job_id: id.0,
     105            0 :             }),
     106            0 :             GcCompactionQueueItem::UpdateL2Lsn(_) => None,
     107            0 :             GcCompactionQueueItem::Notify(_) => None,
     108              :         }
     109            0 :     }
     110              : }
     111              : 
     112              : struct GcCompactionQueueInner {
     113              :     running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     114              :     queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     115              :     notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
     116              :     gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
     117              :     last_id: GcCompactionJobId,
     118              : }
     119              : 
     120              : impl GcCompactionQueueInner {
     121            0 :     fn next_id(&mut self) -> GcCompactionJobId {
     122            0 :         let id = self.last_id;
     123            0 :         self.last_id = GcCompactionJobId(id.0 + 1);
     124            0 :         id
     125            0 :     }
     126              : }
     127              : 
     128              : /// A structure to store gc_compaction jobs.
     129              : pub struct GcCompactionQueue {
     130              :     /// All items in the queue, and the currently-running job.
     131              :     inner: std::sync::Mutex<GcCompactionQueueInner>,
     132              :     /// Ensure only one thread is consuming the queue.
     133              :     consumer_lock: tokio::sync::Mutex<()>,
     134              : }
     135              : 
     136              : impl GcCompactionQueue {
     137            0 :     pub fn new() -> Self {
     138            0 :         GcCompactionQueue {
     139            0 :             inner: std::sync::Mutex::new(GcCompactionQueueInner {
     140            0 :                 running: None,
     141            0 :                 queued: VecDeque::new(),
     142            0 :                 notify: HashMap::new(),
     143            0 :                 gc_guards: HashMap::new(),
     144            0 :                 last_id: GcCompactionJobId(0),
     145            0 :             }),
     146            0 :             consumer_lock: tokio::sync::Mutex::new(()),
     147            0 :         }
     148            0 :     }
     149              : 
     150            0 :     pub fn cancel_scheduled(&self) {
     151            0 :         let mut guard = self.inner.lock().unwrap();
     152            0 :         guard.queued.clear();
     153            0 :         guard.notify.clear();
     154            0 :         guard.gc_guards.clear();
     155            0 :     }
     156              : 
     157              :     /// Schedule a manual compaction job.
     158            0 :     pub fn schedule_manual_compaction(
     159            0 :         &self,
     160            0 :         options: CompactOptions,
     161            0 :         notify: Option<tokio::sync::oneshot::Sender<()>>,
     162            0 :     ) -> GcCompactionJobId {
     163            0 :         let mut guard = self.inner.lock().unwrap();
     164            0 :         let id = guard.next_id();
     165            0 :         guard
     166            0 :             .queued
     167            0 :             .push_back((id, GcCompactionQueueItem::Manual(options)));
     168            0 :         if let Some(notify) = notify {
     169            0 :             guard.notify.insert(id, notify);
     170            0 :         }
     171            0 :         info!("scheduled compaction job id={}", id);
     172            0 :         id
     173            0 :     }
     174              : 
     175              :     /// Trigger an auto compaction.
     176              :     #[allow(dead_code)]
     177            0 :     pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
     178              : 
     179              :     /// Notify the caller the job has finished and unblock GC.
     180            0 :     fn notify_and_unblock(&self, id: GcCompactionJobId) {
     181            0 :         info!("compaction job id={} finished", id);
     182            0 :         let mut guard = self.inner.lock().unwrap();
     183            0 :         if let Some(blocking) = guard.gc_guards.remove(&id) {
     184            0 :             drop(blocking)
     185            0 :         }
     186            0 :         if let Some(tx) = guard.notify.remove(&id) {
     187            0 :             let _ = tx.send(());
     188            0 :         }
     189            0 :     }
     190              : 
     191            0 :     async fn handle_sub_compaction(
     192            0 :         &self,
     193            0 :         id: GcCompactionJobId,
     194            0 :         options: CompactOptions,
     195            0 :         timeline: &Arc<Timeline>,
     196            0 :         gc_block: &GcBlock,
     197            0 :     ) -> Result<(), CompactionError> {
     198            0 :         info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
     199            0 :         let jobs: Vec<GcCompactJob> = timeline
     200            0 :             .gc_compaction_split_jobs(
     201            0 :                 GcCompactJob::from_compact_options(options.clone()),
     202            0 :                 options.sub_compaction_max_job_size_mb,
     203            0 :             )
     204            0 :             .await
     205            0 :             .map_err(CompactionError::Other)?;
     206            0 :         if jobs.is_empty() {
     207            0 :             info!("no jobs to run, skipping scheduled compaction task");
     208            0 :             self.notify_and_unblock(id);
     209              :         } else {
     210            0 :             let gc_guard = match gc_block.start().await {
     211            0 :                 Ok(guard) => guard,
     212            0 :                 Err(e) => {
     213            0 :                     return Err(CompactionError::Other(anyhow!(
     214            0 :                         "cannot run gc-compaction because gc is blocked: {}",
     215            0 :                         e
     216            0 :                     )));
     217              :                 }
     218              :             };
     219              : 
     220            0 :             let jobs_len = jobs.len();
     221            0 :             let mut pending_tasks = Vec::new();
     222            0 :             for job in jobs {
     223              :                 // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
     224              :                 // until we do further refactors to allow directly call `compact_with_gc`.
     225            0 :                 let mut flags: EnumSet<CompactFlags> = EnumSet::default();
     226            0 :                 flags |= CompactFlags::EnhancedGcBottomMostCompaction;
     227            0 :                 if job.dry_run {
     228            0 :                     flags |= CompactFlags::DryRun;
     229            0 :                 }
     230            0 :                 let options = CompactOptions {
     231            0 :                     flags,
     232            0 :                     sub_compaction: false,
     233            0 :                     compact_key_range: Some(job.compact_key_range.into()),
     234            0 :                     compact_lsn_range: Some(job.compact_lsn_range.into()),
     235            0 :                     sub_compaction_max_job_size_mb: None,
     236            0 :                 };
     237            0 :                 pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
     238              :             }
     239            0 :             pending_tasks.push(GcCompactionQueueItem::Notify(id));
     240            0 :             {
     241            0 :                 let mut guard = self.inner.lock().unwrap();
     242            0 :                 guard.gc_guards.insert(id, gc_guard);
     243            0 :                 let mut tasks = Vec::new();
     244            0 :                 for task in pending_tasks {
     245            0 :                     let id = guard.next_id();
     246            0 :                     tasks.push((id, task));
     247            0 :                 }
     248            0 :                 tasks.reverse();
     249            0 :                 for item in tasks {
     250            0 :                     guard.queued.push_front(item);
     251            0 :                 }
     252              :             }
     253            0 :             info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
     254              :         }
     255            0 :         Ok(())
     256            0 :     }
     257              : 
     258              :     /// Take a job from the queue and process it. Returns if there are still pending tasks.
     259            0 :     pub async fn iteration(
     260            0 :         &self,
     261            0 :         cancel: &CancellationToken,
     262            0 :         ctx: &RequestContext,
     263            0 :         gc_block: &GcBlock,
     264            0 :         timeline: &Arc<Timeline>,
     265            0 :     ) -> Result<bool, CompactionError> {
     266            0 :         let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
     267              :         let has_pending_tasks;
     268            0 :         let (id, item) = {
     269            0 :             let mut guard = self.inner.lock().unwrap();
     270            0 :             let Some((id, item)) = guard.queued.pop_front() else {
     271            0 :                 return Ok(false);
     272              :             };
     273            0 :             guard.running = Some((id, item.clone()));
     274            0 :             has_pending_tasks = !guard.queued.is_empty();
     275            0 :             (id, item)
     276            0 :         };
     277            0 : 
     278            0 :         match item {
     279            0 :             GcCompactionQueueItem::Manual(options) => {
     280            0 :                 if !options
     281            0 :                     .flags
     282            0 :                     .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     283              :                 {
     284            0 :                     warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
     285            0 :                 } else if options.sub_compaction {
     286            0 :                     self.handle_sub_compaction(id, options, timeline, gc_block)
     287            0 :                         .await?;
     288              :                 } else {
     289            0 :                     let gc_guard = match gc_block.start().await {
     290            0 :                         Ok(guard) => guard,
     291            0 :                         Err(e) => {
     292            0 :                             return Err(CompactionError::Other(anyhow!(
     293            0 :                                 "cannot run gc-compaction because gc is blocked: {}",
     294            0 :                                 e
     295            0 :                             )));
     296              :                         }
     297              :                     };
     298            0 :                     {
     299            0 :                         let mut guard = self.inner.lock().unwrap();
     300            0 :                         guard.gc_guards.insert(id, gc_guard);
     301            0 :                     }
     302            0 :                     let _ = timeline
     303            0 :                         .compact_with_options(cancel, options, ctx)
     304            0 :                         .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
     305            0 :                         .await?;
     306            0 :                     self.notify_and_unblock(id);
     307              :                 }
     308              :             }
     309            0 :             GcCompactionQueueItem::SubCompactionJob(options) => {
     310            0 :                 let _ = timeline
     311            0 :                     .compact_with_options(cancel, options, ctx)
     312            0 :                     .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
     313            0 :                     .await?;
     314              :             }
     315            0 :             GcCompactionQueueItem::Notify(id) => {
     316            0 :                 self.notify_and_unblock(id);
     317            0 :             }
     318              :             GcCompactionQueueItem::UpdateL2Lsn(_) => {
     319            0 :                 unreachable!()
     320              :             }
     321              :         }
     322            0 :         {
     323            0 :             let mut guard = self.inner.lock().unwrap();
     324            0 :             guard.running = None;
     325            0 :         }
     326            0 :         Ok(has_pending_tasks)
     327            0 :     }
     328              : 
     329              :     #[allow(clippy::type_complexity)]
     330            0 :     pub fn remaining_jobs(
     331            0 :         &self,
     332            0 :     ) -> (
     333            0 :         Option<(GcCompactionJobId, GcCompactionQueueItem)>,
     334            0 :         VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
     335            0 :     ) {
     336            0 :         let guard = self.inner.lock().unwrap();
     337            0 :         (guard.running.clone(), guard.queued.clone())
     338            0 :     }
     339              : 
     340              :     #[allow(dead_code)]
     341            0 :     pub fn remaining_jobs_num(&self) -> usize {
     342            0 :         let guard = self.inner.lock().unwrap();
     343            0 :         guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
     344            0 :     }
     345              : }
     346              : 
     347              : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
     348              : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
     349              : /// called.
     350              : #[derive(Debug, Clone)]
     351              : pub(crate) struct GcCompactJob {
     352              :     pub dry_run: bool,
     353              :     /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
     354              :     /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
     355              :     pub compact_key_range: Range<Key>,
     356              :     /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
     357              :     /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
     358              :     /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
     359              :     /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
     360              :     pub compact_lsn_range: Range<Lsn>,
     361              : }
     362              : 
     363              : impl GcCompactJob {
     364          108 :     pub fn from_compact_options(options: CompactOptions) -> Self {
     365          108 :         GcCompactJob {
     366          108 :             dry_run: options.flags.contains(CompactFlags::DryRun),
     367          108 :             compact_key_range: options
     368          108 :                 .compact_key_range
     369          108 :                 .map(|x| x.into())
     370          108 :                 .unwrap_or(Key::MIN..Key::MAX),
     371          108 :             compact_lsn_range: options
     372          108 :                 .compact_lsn_range
     373          108 :                 .map(|x| x.into())
     374          108 :                 .unwrap_or(Lsn::INVALID..Lsn::MAX),
     375          108 :         }
     376          108 :     }
     377              : }
     378              : 
     379              : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
     380              : /// and contains the exact layers we want to compact.
     381              : pub struct GcCompactionJobDescription {
     382              :     /// All layers to read in the compaction job
     383              :     selected_layers: Vec<Layer>,
     384              :     /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
     385              :     /// keep all deltas <= this LSN or generate an image == this LSN.
     386              :     gc_cutoff: Lsn,
     387              :     /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
     388              :     /// generate an image == this LSN.
     389              :     retain_lsns_below_horizon: Vec<Lsn>,
     390              :     /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
     391              :     /// \>= this LSN will be kept and will not be rewritten.
     392              :     max_layer_lsn: Lsn,
     393              :     /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
     394              :     /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
     395              :     /// k-merge within gc-compaction.
     396              :     min_layer_lsn: Lsn,
     397              :     /// Only compact layers overlapping with this range.
     398              :     compaction_key_range: Range<Key>,
     399              :     /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
     400              :     /// This field is here solely for debugging. The field will not be read once the compaction
     401              :     /// description is generated.
     402              :     rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
     403              : }
     404              : 
     405              : /// The result of bottom-most compaction for a single key at each LSN.
     406              : #[derive(Debug)]
     407              : #[cfg_attr(test, derive(PartialEq))]
     408              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
     409              : 
     410              : /// The result of bottom-most compaction.
     411              : #[derive(Debug)]
     412              : #[cfg_attr(test, derive(PartialEq))]
     413              : pub(crate) struct KeyHistoryRetention {
     414              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
     415              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
     416              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
     417              :     pub(crate) above_horizon: KeyLogAtLsn,
     418              : }
     419              : 
     420              : impl KeyHistoryRetention {
     421              :     /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
     422              :     ///
     423              :     /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
     424              :     /// For example, consider the case where a single delta with range [0x10,0x50) exists.
     425              :     /// And we have branches at LSN 0x10, 0x20, 0x30.
     426              :     /// Then we delete branch @ 0x20.
     427              :     /// Bottom-most compaction may now delete the delta [0x20,0x30).
     428              :     /// And that wouldnt' change the shape of the layer.
     429              :     ///
     430              :     /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
     431              :     ///
     432              :     /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
     433          148 :     async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
     434          148 :         if dry_run {
     435            0 :             return true;
     436          148 :         }
     437              :         let layer_generation;
     438              :         {
     439          148 :             let guard = tline.layers.read().await;
     440          148 :             if !guard.contains_key(key) {
     441          104 :                 return false;
     442           44 :             }
     443           44 :             layer_generation = guard.get_from_key(key).metadata().generation;
     444           44 :         }
     445           44 :         if layer_generation == tline.generation {
     446           44 :             info!(
     447              :                 key=%key,
     448              :                 ?layer_generation,
     449            0 :                 "discard layer due to duplicated layer key in the same generation",
     450              :             );
     451           44 :             true
     452              :         } else {
     453            0 :             false
     454              :         }
     455          148 :     }
     456              : 
     457              :     /// Pipe a history of a single key to the writers.
     458              :     ///
     459              :     /// If `image_writer` is none, the images will be placed into the delta layers.
     460              :     /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
     461              :     #[allow(clippy::too_many_arguments)]
     462         1244 :     async fn pipe_to(
     463         1244 :         self,
     464         1244 :         key: Key,
     465         1244 :         delta_writer: &mut SplitDeltaLayerWriter,
     466         1244 :         mut image_writer: Option<&mut SplitImageLayerWriter>,
     467         1244 :         stat: &mut CompactionStatistics,
     468         1244 :         ctx: &RequestContext,
     469         1244 :     ) -> anyhow::Result<()> {
     470         1244 :         let mut first_batch = true;
     471         4024 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
     472         2780 :             if first_batch {
     473         1244 :                 if logs.len() == 1 && logs[0].1.is_image() {
     474         1168 :                     let Value::Image(img) = &logs[0].1 else {
     475            0 :                         unreachable!()
     476              :                     };
     477         1168 :                     stat.produce_image_key(img);
     478         1168 :                     if let Some(image_writer) = image_writer.as_mut() {
     479         1168 :                         image_writer.put_image(key, img.clone(), ctx).await?;
     480              :                     } else {
     481            0 :                         delta_writer
     482            0 :                             .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
     483            0 :                             .await?;
     484              :                     }
     485              :                 } else {
     486          132 :                     for (lsn, val) in logs {
     487           56 :                         stat.produce_key(&val);
     488           56 :                         delta_writer.put_value(key, lsn, val, ctx).await?;
     489              :                     }
     490              :                 }
     491         1244 :                 first_batch = false;
     492              :             } else {
     493         1768 :                 for (lsn, val) in logs {
     494          232 :                     stat.produce_key(&val);
     495          232 :                     delta_writer.put_value(key, lsn, val, ctx).await?;
     496              :                 }
     497              :             }
     498              :         }
     499         1244 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     500         1360 :         for (lsn, val) in above_horizon_logs {
     501          116 :             stat.produce_key(&val);
     502          116 :             delta_writer.put_value(key, lsn, val, ctx).await?;
     503              :         }
     504         1244 :         Ok(())
     505         1244 :     }
     506              : }
     507              : 
     508              : #[derive(Debug, Serialize, Default)]
     509              : struct CompactionStatisticsNumSize {
     510              :     num: u64,
     511              :     size: u64,
     512              : }
     513              : 
     514              : #[derive(Debug, Serialize, Default)]
     515              : pub struct CompactionStatistics {
     516              :     delta_layer_visited: CompactionStatisticsNumSize,
     517              :     image_layer_visited: CompactionStatisticsNumSize,
     518              :     delta_layer_produced: CompactionStatisticsNumSize,
     519              :     image_layer_produced: CompactionStatisticsNumSize,
     520              :     num_delta_layer_discarded: usize,
     521              :     num_image_layer_discarded: usize,
     522              :     num_unique_keys_visited: usize,
     523              :     wal_keys_visited: CompactionStatisticsNumSize,
     524              :     image_keys_visited: CompactionStatisticsNumSize,
     525              :     wal_produced: CompactionStatisticsNumSize,
     526              :     image_produced: CompactionStatisticsNumSize,
     527              : }
     528              : 
     529              : impl CompactionStatistics {
     530         2084 :     fn estimated_size_of_value(val: &Value) -> usize {
     531          864 :         match val {
     532         1220 :             Value::Image(img) => img.len(),
     533            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     534          864 :             _ => std::mem::size_of::<NeonWalRecord>(),
     535              :         }
     536         2084 :     }
     537         3272 :     fn estimated_size_of_key() -> usize {
     538         3272 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     539         3272 :     }
     540          172 :     fn visit_delta_layer(&mut self, size: u64) {
     541          172 :         self.delta_layer_visited.num += 1;
     542          172 :         self.delta_layer_visited.size += size;
     543          172 :     }
     544          132 :     fn visit_image_layer(&mut self, size: u64) {
     545          132 :         self.image_layer_visited.num += 1;
     546          132 :         self.image_layer_visited.size += size;
     547          132 :     }
     548         1244 :     fn on_unique_key_visited(&mut self) {
     549         1244 :         self.num_unique_keys_visited += 1;
     550         1244 :     }
     551          480 :     fn visit_wal_key(&mut self, val: &Value) {
     552          480 :         self.wal_keys_visited.num += 1;
     553          480 :         self.wal_keys_visited.size +=
     554          480 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     555          480 :     }
     556         1220 :     fn visit_image_key(&mut self, val: &Value) {
     557         1220 :         self.image_keys_visited.num += 1;
     558         1220 :         self.image_keys_visited.size +=
     559         1220 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     560         1220 :     }
     561          404 :     fn produce_key(&mut self, val: &Value) {
     562          404 :         match val {
     563           20 :             Value::Image(img) => self.produce_image_key(img),
     564          384 :             Value::WalRecord(_) => self.produce_wal_key(val),
     565              :         }
     566          404 :     }
     567          384 :     fn produce_wal_key(&mut self, val: &Value) {
     568          384 :         self.wal_produced.num += 1;
     569          384 :         self.wal_produced.size +=
     570          384 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     571          384 :     }
     572         1188 :     fn produce_image_key(&mut self, val: &Bytes) {
     573         1188 :         self.image_produced.num += 1;
     574         1188 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     575         1188 :     }
     576           28 :     fn discard_delta_layer(&mut self) {
     577           28 :         self.num_delta_layer_discarded += 1;
     578           28 :     }
     579           16 :     fn discard_image_layer(&mut self) {
     580           16 :         self.num_image_layer_discarded += 1;
     581           16 :     }
     582           44 :     fn produce_delta_layer(&mut self, size: u64) {
     583           44 :         self.delta_layer_produced.num += 1;
     584           44 :         self.delta_layer_produced.size += size;
     585           44 :     }
     586           60 :     fn produce_image_layer(&mut self, size: u64) {
     587           60 :         self.image_layer_produced.num += 1;
     588           60 :         self.image_layer_produced.size += size;
     589           60 :     }
     590              : }
     591              : 
     592              : impl Timeline {
     593              :     /// TODO: cancellation
     594              :     ///
     595              :     /// Returns whether the compaction has pending tasks.
     596          728 :     pub(crate) async fn compact_legacy(
     597          728 :         self: &Arc<Self>,
     598          728 :         cancel: &CancellationToken,
     599          728 :         options: CompactOptions,
     600          728 :         ctx: &RequestContext,
     601          728 :     ) -> Result<bool, CompactionError> {
     602          728 :         if options
     603          728 :             .flags
     604          728 :             .contains(CompactFlags::EnhancedGcBottomMostCompaction)
     605              :         {
     606            0 :             self.compact_with_gc(cancel, options, ctx)
     607            0 :                 .await
     608            0 :                 .map_err(CompactionError::Other)?;
     609            0 :             return Ok(false);
     610          728 :         }
     611          728 : 
     612          728 :         if options.flags.contains(CompactFlags::DryRun) {
     613            0 :             return Err(CompactionError::Other(anyhow!(
     614            0 :                 "dry-run mode is not supported for legacy compaction for now"
     615            0 :             )));
     616          728 :         }
     617          728 : 
     618          728 :         if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
     619              :             // maybe useful in the future? could implement this at some point
     620            0 :             return Err(CompactionError::Other(anyhow!(
     621            0 :                 "compaction range is not supported for legacy compaction for now"
     622            0 :             )));
     623          728 :         }
     624          728 : 
     625          728 :         // High level strategy for compaction / image creation:
     626          728 :         //
     627          728 :         // 1. First, calculate the desired "partitioning" of the
     628          728 :         // currently in-use key space. The goal is to partition the
     629          728 :         // key space into roughly fixed-size chunks, but also take into
     630          728 :         // account any existing image layers, and try to align the
     631          728 :         // chunk boundaries with the existing image layers to avoid
     632          728 :         // too much churn. Also try to align chunk boundaries with
     633          728 :         // relation boundaries.  In principle, we don't know about
     634          728 :         // relation boundaries here, we just deal with key-value
     635          728 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     636          728 :         // map relations into key-value pairs. But in practice we know
     637          728 :         // that 'field6' is the block number, and the fields 1-5
     638          728 :         // identify a relation. This is just an optimization,
     639          728 :         // though.
     640          728 :         //
     641          728 :         // 2. Once we know the partitioning, for each partition,
     642          728 :         // decide if it's time to create a new image layer. The
     643          728 :         // criteria is: there has been too much "churn" since the last
     644          728 :         // image layer? The "churn" is fuzzy concept, it's a
     645          728 :         // combination of too many delta files, or too much WAL in
     646          728 :         // total in the delta file. Or perhaps: if creating an image
     647          728 :         // file would allow to delete some older files.
     648          728 :         //
     649          728 :         // 3. After that, we compact all level0 delta files if there
     650          728 :         // are too many of them.  While compacting, we also garbage
     651          728 :         // collect any page versions that are no longer needed because
     652          728 :         // of the new image layers we created in step 2.
     653          728 :         //
     654          728 :         // TODO: This high level strategy hasn't been implemented yet.
     655          728 :         // Below are functions compact_level0() and create_image_layers()
     656          728 :         // but they are a bit ad hoc and don't quite work like it's explained
     657          728 :         // above. Rewrite it.
     658          728 : 
     659          728 :         // Is the timeline being deleted?
     660          728 :         if self.is_stopping() {
     661            0 :             trace!("Dropping out of compaction on timeline shutdown");
     662            0 :             return Err(CompactionError::ShuttingDown);
     663          728 :         }
     664          728 : 
     665          728 :         let target_file_size = self.get_checkpoint_distance();
     666              : 
     667              :         // Define partitioning schema if needed
     668              : 
     669              :         // FIXME: the match should only cover repartitioning, not the next steps
     670          728 :         let (partition_count, has_pending_tasks) = match self
     671          728 :             .repartition(
     672          728 :                 self.get_last_record_lsn(),
     673          728 :                 self.get_compaction_target_size(),
     674          728 :                 options.flags,
     675          728 :                 ctx,
     676          728 :             )
     677          728 :             .await
     678              :         {
     679          728 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     680          728 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     681          728 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     682          728 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     683          728 :                     .build();
     684          728 : 
     685          728 :                 // 2. Compact
     686          728 :                 let timer = self.metrics.compact_time_histo.start_timer();
     687          728 :                 let fully_compacted = self
     688          728 :                     .compact_level0(
     689          728 :                         target_file_size,
     690          728 :                         options.flags.contains(CompactFlags::ForceL0Compaction),
     691          728 :                         ctx,
     692          728 :                     )
     693          728 :                     .await?;
     694          728 :                 timer.stop_and_record();
     695          728 : 
     696          728 :                 let mut partitioning = dense_partitioning;
     697          728 :                 partitioning
     698          728 :                     .parts
     699          728 :                     .extend(sparse_partitioning.into_dense().parts);
     700          728 : 
     701          728 :                 // 3. Create new image layers for partitions that have been modified
     702          728 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     703          728 :                 if fully_compacted {
     704          728 :                     let image_layers = self
     705          728 :                         .create_image_layers(
     706          728 :                             &partitioning,
     707          728 :                             lsn,
     708          728 :                             if options
     709          728 :                                 .flags
     710          728 :                                 .contains(CompactFlags::ForceImageLayerCreation)
     711              :                             {
     712           28 :                                 ImageLayerCreationMode::Force
     713              :                             } else {
     714          700 :                                 ImageLayerCreationMode::Try
     715              :                             },
     716          728 :                             &image_ctx,
     717          728 :                         )
     718          728 :                         .await?;
     719              : 
     720          728 :                     self.upload_new_image_layers(image_layers)?;
     721              :                 } else {
     722            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     723              :                 }
     724          728 :                 (partitioning.parts.len(), !fully_compacted)
     725              :             }
     726            0 :             Err(err) => {
     727            0 :                 // no partitioning? This is normal, if the timeline was just created
     728            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     729            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     730            0 :                 // error but continue.
     731            0 :                 //
     732            0 :                 // Suppress error when it's due to cancellation
     733            0 :                 if !self.cancel.is_cancelled() && !err.is_cancelled() {
     734            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     735            0 :                 }
     736            0 :                 (1, false)
     737              :             }
     738              :         };
     739              : 
     740          728 :         if self.shard_identity.count >= ShardCount::new(2) {
     741              :             // Limit the number of layer rewrites to the number of partitions: this means its
     742              :             // runtime should be comparable to a full round of image layer creations, rather than
     743              :             // being potentially much longer.
     744            0 :             let rewrite_max = partition_count;
     745            0 : 
     746            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     747          728 :         }
     748              : 
     749          728 :         Ok(has_pending_tasks)
     750          728 :     }
     751              : 
     752              :     /// Check for layers that are elegible to be rewritten:
     753              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     754              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     755              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     756              :     ///   rather not maintain compatibility with indefinitely.
     757              :     ///
     758              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     759              :     /// how much work it will try to do in each compaction pass.
     760            0 :     async fn compact_shard_ancestors(
     761            0 :         self: &Arc<Self>,
     762            0 :         rewrite_max: usize,
     763            0 :         ctx: &RequestContext,
     764            0 :     ) -> Result<(), CompactionError> {
     765            0 :         let mut drop_layers = Vec::new();
     766            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     767            0 : 
     768            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     769            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     770            0 :         // pitr_interval, for example because a branchpoint references it.
     771            0 :         //
     772            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     773            0 :         // are rewriting layers.
     774            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     775            0 : 
     776            0 :         tracing::info!(
     777            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     778            0 :             *latest_gc_cutoff,
     779            0 :             self.gc_info.read().unwrap().cutoffs.time
     780              :         );
     781              : 
     782            0 :         let layers = self.layers.read().await;
     783            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     784            0 :             let layer = layers.get_from_desc(&layer_desc);
     785            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     786              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     787            0 :                 continue;
     788            0 :             }
     789            0 : 
     790            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     791            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     792            0 :             let layer_local_page_count = sharded_range.page_count();
     793            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     794            0 :             if layer_local_page_count == 0 {
     795              :                 // This ancestral layer only covers keys that belong to other shards.
     796              :                 // We include the full metadata in the log: if we had some critical bug that caused
     797              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     798            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     799            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     800              :                 );
     801              : 
     802            0 :                 if cfg!(debug_assertions) {
     803              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     804              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     805              :                     // should be !is_key_disposable()
     806            0 :                     let range = layer_desc.get_key_range();
     807            0 :                     let mut key = range.start;
     808            0 :                     while key < range.end {
     809            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     810            0 :                         key = key.next();
     811              :                     }
     812            0 :                 }
     813              : 
     814            0 :                 drop_layers.push(layer);
     815            0 :                 continue;
     816            0 :             } else if layer_local_page_count != u32::MAX
     817            0 :                 && layer_local_page_count == layer_raw_page_count
     818              :             {
     819            0 :                 debug!(%layer,
     820            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     821              :                     layer_local_page_count
     822              :                 );
     823            0 :                 continue;
     824            0 :             }
     825            0 : 
     826            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     827            0 :             if layer_local_page_count != u32::MAX
     828            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     829              :             {
     830            0 :                 debug!(%layer,
     831            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     832              :                     layer_local_page_count,
     833              :                     layer_raw_page_count
     834              :                 );
     835            0 :             }
     836              : 
     837              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     838              :             // without incurring the I/O cost of a rewrite.
     839            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     840            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     841            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     842            0 :                 continue;
     843            0 :             }
     844            0 : 
     845            0 :             if layer_desc.is_delta() {
     846              :                 // We do not yet implement rewrite of delta layers
     847            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     848            0 :                 continue;
     849            0 :             }
     850            0 : 
     851            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     852            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     853            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     854            0 :             if layer.metadata().generation == self.generation {
     855            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     856            0 :                 continue;
     857            0 :             }
     858            0 : 
     859            0 :             if layers_to_rewrite.len() >= rewrite_max {
     860            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     861            0 :                     layers_to_rewrite.len()
     862              :                 );
     863            0 :                 continue;
     864            0 :             }
     865            0 : 
     866            0 :             // Fall through: all our conditions for doing a rewrite passed.
     867            0 :             layers_to_rewrite.push(layer);
     868              :         }
     869              : 
     870              :         // Drop read lock on layer map before we start doing time-consuming I/O
     871            0 :         drop(layers);
     872            0 : 
     873            0 :         let mut replace_image_layers = Vec::new();
     874              : 
     875            0 :         for layer in layers_to_rewrite {
     876            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     877            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     878            0 :                 self.conf,
     879            0 :                 self.timeline_id,
     880            0 :                 self.tenant_shard_id,
     881            0 :                 &layer.layer_desc().key_range,
     882            0 :                 layer.layer_desc().image_layer_lsn(),
     883            0 :                 ctx,
     884            0 :             )
     885            0 :             .await
     886            0 :             .map_err(CompactionError::Other)?;
     887              : 
     888              :             // Safety of layer rewrites:
     889              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     890              :             //   cannot interfere with the new one.
     891              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     892              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     893              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     894              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     895              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     896              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     897              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     898              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     899              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     900              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     901            0 :             let resident = layer.download_and_keep_resident().await?;
     902              : 
     903            0 :             let keys_written = resident
     904            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     905            0 :                 .await?;
     906              : 
     907            0 :             if keys_written > 0 {
     908            0 :                 let (desc, path) = image_layer_writer
     909            0 :                     .finish(ctx)
     910            0 :                     .await
     911            0 :                     .map_err(CompactionError::Other)?;
     912            0 :                 let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
     913            0 :                     .map_err(CompactionError::Other)?;
     914            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     915            0 :                     layer.metadata().file_size,
     916            0 :                     new_layer.metadata().file_size);
     917              : 
     918            0 :                 replace_image_layers.push((layer, new_layer));
     919            0 :             } else {
     920            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     921            0 :                 // the layer has no data for us with the ShardedRange check above, but
     922            0 :                 drop_layers.push(layer);
     923            0 :             }
     924              :         }
     925              : 
     926              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     927              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     928              :         // to remote index) and be removed. This is inefficient but safe.
     929            0 :         fail::fail_point!("compact-shard-ancestors-localonly");
     930            0 : 
     931            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     932            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     933            0 :             .await?;
     934              : 
     935            0 :         fail::fail_point!("compact-shard-ancestors-enqueued");
     936            0 : 
     937            0 :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     938            0 :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     939            0 :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     940            0 :         // load.
     941            0 :         match self.remote_client.wait_completion().await {
     942            0 :             Ok(()) => (),
     943            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     944              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     945            0 :                 return Err(CompactionError::ShuttingDown)
     946              :             }
     947              :         }
     948              : 
     949            0 :         fail::fail_point!("compact-shard-ancestors-persistent");
     950            0 : 
     951            0 :         Ok(())
     952            0 :     }
     953              : 
     954              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     955              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     956              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     957              :     ///
     958              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     959              :     /// that we know won't be needed for reads.
     960          452 :     pub(super) async fn update_layer_visibility(
     961          452 :         &self,
     962          452 :     ) -> Result<(), super::layer_manager::Shutdown> {
     963          452 :         let head_lsn = self.get_last_record_lsn();
     964              : 
     965              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     966              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     967              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     968              :         // they will be subject to L0->L1 compaction in the near future.
     969          452 :         let layer_manager = self.layers.read().await;
     970          452 :         let layer_map = layer_manager.layer_map()?;
     971              : 
     972          452 :         let readable_points = {
     973          452 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     974          452 : 
     975          452 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     976          452 :             for (child_lsn, _child_timeline_id, is_offloaded) in &children {
     977            0 :                 if *is_offloaded == MaybeOffloaded::Yes {
     978            0 :                     continue;
     979            0 :                 }
     980            0 :                 readable_points.push(*child_lsn);
     981              :             }
     982          452 :             readable_points.push(head_lsn);
     983          452 :             readable_points
     984          452 :         };
     985          452 : 
     986          452 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     987         1144 :         for (layer_desc, visibility) in layer_visibility {
     988          692 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     989          692 :             let layer = layer_manager.get_from_desc(&layer_desc);
     990          692 :             layer.set_visibility(visibility);
     991          692 :         }
     992              : 
     993              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     994              :         // avoid assuming that everything at a branch point is visible.
     995          452 :         drop(covered);
     996          452 :         Ok(())
     997          452 :     }
     998              : 
     999              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    1000              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
    1001          728 :     async fn compact_level0(
    1002          728 :         self: &Arc<Self>,
    1003          728 :         target_file_size: u64,
    1004          728 :         force_compaction_ignore_threshold: bool,
    1005          728 :         ctx: &RequestContext,
    1006          728 :     ) -> Result<bool, CompactionError> {
    1007              :         let CompactLevel0Phase1Result {
    1008          728 :             new_layers,
    1009          728 :             deltas_to_compact,
    1010          728 :             fully_compacted,
    1011              :         } = {
    1012          728 :             let phase1_span = info_span!("compact_level0_phase1");
    1013          728 :             let ctx = ctx.attached_child();
    1014          728 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    1015          728 :                 version: Some(2),
    1016          728 :                 tenant_id: Some(self.tenant_shard_id),
    1017          728 :                 timeline_id: Some(self.timeline_id),
    1018          728 :                 ..Default::default()
    1019          728 :             };
    1020          728 : 
    1021          728 :             let begin = tokio::time::Instant::now();
    1022          728 :             let phase1_layers_locked = self.layers.read().await;
    1023          728 :             let now = tokio::time::Instant::now();
    1024          728 :             stats.read_lock_acquisition_micros =
    1025          728 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    1026          728 :             self.compact_level0_phase1(
    1027          728 :                 phase1_layers_locked,
    1028          728 :                 stats,
    1029          728 :                 target_file_size,
    1030          728 :                 force_compaction_ignore_threshold,
    1031          728 :                 &ctx,
    1032          728 :             )
    1033          728 :             .instrument(phase1_span)
    1034          728 :             .await?
    1035              :         };
    1036              : 
    1037          728 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    1038              :             // nothing to do
    1039          672 :             return Ok(true);
    1040           56 :         }
    1041           56 : 
    1042           56 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    1043           56 :             .await?;
    1044           56 :         Ok(fully_compacted)
    1045          728 :     }
    1046              : 
    1047              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    1048          728 :     async fn compact_level0_phase1<'a>(
    1049          728 :         self: &'a Arc<Self>,
    1050          728 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
    1051          728 :         mut stats: CompactLevel0Phase1StatsBuilder,
    1052          728 :         target_file_size: u64,
    1053          728 :         force_compaction_ignore_threshold: bool,
    1054          728 :         ctx: &RequestContext,
    1055          728 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    1056          728 :         stats.read_lock_held_spawn_blocking_startup_micros =
    1057          728 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    1058          728 :         let layers = guard.layer_map()?;
    1059          728 :         let level0_deltas = layers.level0_deltas();
    1060          728 :         stats.level0_deltas_count = Some(level0_deltas.len());
    1061          728 : 
    1062          728 :         // Only compact if enough layers have accumulated.
    1063          728 :         let threshold = self.get_compaction_threshold();
    1064          728 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    1065          672 :             if force_compaction_ignore_threshold {
    1066            0 :                 if !level0_deltas.is_empty() {
    1067            0 :                     info!(
    1068            0 :                         level0_deltas = level0_deltas.len(),
    1069            0 :                         threshold, "too few deltas to compact, but forcing compaction"
    1070              :                     );
    1071              :                 } else {
    1072            0 :                     info!(
    1073            0 :                         level0_deltas = level0_deltas.len(),
    1074            0 :                         threshold, "too few deltas to compact, cannot force compaction"
    1075              :                     );
    1076            0 :                     return Ok(CompactLevel0Phase1Result::default());
    1077              :                 }
    1078              :             } else {
    1079          672 :                 debug!(
    1080            0 :                     level0_deltas = level0_deltas.len(),
    1081            0 :                     threshold, "too few deltas to compact"
    1082              :                 );
    1083          672 :                 return Ok(CompactLevel0Phase1Result::default());
    1084              :             }
    1085           56 :         }
    1086              : 
    1087           56 :         let mut level0_deltas = level0_deltas
    1088           56 :             .iter()
    1089          804 :             .map(|x| guard.get_from_desc(x))
    1090           56 :             .collect::<Vec<_>>();
    1091           56 : 
    1092           56 :         // Gather the files to compact in this iteration.
    1093           56 :         //
    1094           56 :         // Start with the oldest Level 0 delta file, and collect any other
    1095           56 :         // level 0 files that form a contiguous sequence, such that the end
    1096           56 :         // LSN of previous file matches the start LSN of the next file.
    1097           56 :         //
    1098           56 :         // Note that if the files don't form such a sequence, we might
    1099           56 :         // "compact" just a single file. That's a bit pointless, but it allows
    1100           56 :         // us to get rid of the level 0 file, and compact the other files on
    1101           56 :         // the next iteration. This could probably made smarter, but such
    1102           56 :         // "gaps" in the sequence of level 0 files should only happen in case
    1103           56 :         // of a crash, partial download from cloud storage, or something like
    1104           56 :         // that, so it's not a big deal in practice.
    1105         1496 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    1106           56 :         let mut level0_deltas_iter = level0_deltas.iter();
    1107           56 : 
    1108           56 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    1109           56 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    1110           56 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    1111           56 : 
    1112           56 :         // Accumulate the size of layers in `deltas_to_compact`
    1113           56 :         let mut deltas_to_compact_bytes = 0;
    1114           56 : 
    1115           56 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
    1116           56 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
    1117           56 :         // work in this function to only operate on this much delta data at once.
    1118           56 :         let delta_size_limit = self.get_compaction_upper_limit() as u64
    1119           56 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
    1120           56 : 
    1121           56 :         let mut fully_compacted = true;
    1122           56 : 
    1123           56 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    1124          804 :         for l in level0_deltas_iter {
    1125          748 :             let lsn_range = &l.layer_desc().lsn_range;
    1126          748 : 
    1127          748 :             if lsn_range.start != prev_lsn_end {
    1128            0 :                 break;
    1129          748 :             }
    1130          748 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    1131          748 :             deltas_to_compact_bytes += l.metadata().file_size;
    1132          748 :             prev_lsn_end = lsn_range.end;
    1133          748 : 
    1134          748 :             if deltas_to_compact_bytes >= delta_size_limit {
    1135            0 :                 info!(
    1136            0 :                     l0_deltas_selected = deltas_to_compact.len(),
    1137            0 :                     l0_deltas_total = level0_deltas.len(),
    1138            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
    1139              :                     delta_size_limit
    1140              :                 );
    1141            0 :                 fully_compacted = false;
    1142            0 : 
    1143            0 :                 // Proceed with compaction, but only a subset of L0s
    1144            0 :                 break;
    1145          748 :             }
    1146              :         }
    1147           56 :         let lsn_range = Range {
    1148           56 :             start: deltas_to_compact
    1149           56 :                 .first()
    1150           56 :                 .unwrap()
    1151           56 :                 .layer_desc()
    1152           56 :                 .lsn_range
    1153           56 :                 .start,
    1154           56 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    1155           56 :         };
    1156           56 : 
    1157           56 :         info!(
    1158            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    1159            0 :             lsn_range.start,
    1160            0 :             lsn_range.end,
    1161            0 :             deltas_to_compact.len(),
    1162            0 :             level0_deltas.len()
    1163              :         );
    1164              : 
    1165          804 :         for l in deltas_to_compact.iter() {
    1166          804 :             info!("compact includes {l}");
    1167              :         }
    1168              : 
    1169              :         // We don't need the original list of layers anymore. Drop it so that
    1170              :         // we don't accidentally use it later in the function.
    1171           56 :         drop(level0_deltas);
    1172           56 : 
    1173           56 :         stats.read_lock_held_prerequisites_micros = stats
    1174           56 :             .read_lock_held_spawn_blocking_startup_micros
    1175           56 :             .till_now();
    1176              : 
    1177              :         // TODO: replace with streaming k-merge
    1178           56 :         let all_keys = {
    1179           56 :             let mut all_keys = Vec::new();
    1180          804 :             for l in deltas_to_compact.iter() {
    1181          804 :                 if self.cancel.is_cancelled() {
    1182            0 :                     return Err(CompactionError::ShuttingDown);
    1183          804 :                 }
    1184          804 :                 let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1185          804 :                 let keys = delta
    1186          804 :                     .index_entries(ctx)
    1187          804 :                     .await
    1188          804 :                     .map_err(CompactionError::Other)?;
    1189          804 :                 all_keys.extend(keys);
    1190              :             }
    1191              :             // The current stdlib sorting implementation is designed in a way where it is
    1192              :             // particularly fast where the slice is made up of sorted sub-ranges.
    1193      8847586 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1194           56 :             all_keys
    1195           56 :         };
    1196           56 : 
    1197           56 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    1198              : 
    1199              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
    1200              :         //
    1201              :         // A hole is a key range for which this compaction doesn't have any WAL records.
    1202              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
    1203              :         // cover the hole, but actually don't contain any WAL records for that key range.
    1204              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
    1205              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
    1206              :         //
    1207              :         // The algorithm chooses holes as follows.
    1208              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
    1209              :         // - Filter: min threshold on range length
    1210              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
    1211              :         //
    1212              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
    1213              :         #[derive(PartialEq, Eq)]
    1214              :         struct Hole {
    1215              :             key_range: Range<Key>,
    1216              :             coverage_size: usize,
    1217              :         }
    1218           56 :         let holes: Vec<Hole> = {
    1219              :             use std::cmp::Ordering;
    1220              :             impl Ord for Hole {
    1221            0 :                 fn cmp(&self, other: &Self) -> Ordering {
    1222            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
    1223            0 :                 }
    1224              :             }
    1225              :             impl PartialOrd for Hole {
    1226            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    1227            0 :                     Some(self.cmp(other))
    1228            0 :                 }
    1229              :             }
    1230           56 :             let max_holes = deltas_to_compact.len();
    1231           56 :             let last_record_lsn = self.get_last_record_lsn();
    1232           56 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    1233           56 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
    1234           56 :                                             // min-heap (reserve space for one more element added before eviction)
    1235           56 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    1236           56 :             let mut prev: Option<Key> = None;
    1237              : 
    1238      4128076 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    1239      4128076 :                 if let Some(prev_key) = prev {
    1240              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
    1241              :                     // compaction is the gap between data key and metadata keys.
    1242      4128020 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
    1243            0 :                         && !Key::is_metadata_key(&prev_key)
    1244              :                     {
    1245            0 :                         let key_range = prev_key..next_key;
    1246            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
    1247            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
    1248            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    1249            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
    1250            0 :                         let coverage_size =
    1251            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
    1252            0 :                         if coverage_size >= min_hole_coverage_size {
    1253            0 :                             heap.push(Hole {
    1254            0 :                                 key_range,
    1255            0 :                                 coverage_size,
    1256            0 :                             });
    1257            0 :                             if heap.len() > max_holes {
    1258            0 :                                 heap.pop(); // remove smallest hole
    1259            0 :                             }
    1260            0 :                         }
    1261      4128020 :                     }
    1262           56 :                 }
    1263      4128076 :                 prev = Some(next_key.next());
    1264              :             }
    1265           56 :             let mut holes = heap.into_vec();
    1266           56 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
    1267           56 :             holes
    1268           56 :         };
    1269           56 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    1270           56 :         drop_rlock(guard);
    1271           56 : 
    1272           56 :         if self.cancel.is_cancelled() {
    1273            0 :             return Err(CompactionError::ShuttingDown);
    1274           56 :         }
    1275           56 : 
    1276           56 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    1277              : 
    1278              :         // This iterator walks through all key-value pairs from all the layers
    1279              :         // we're compacting, in key, LSN order.
    1280              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
    1281              :         // then the Value::Image is ordered before Value::WalRecord.
    1282           56 :         let mut all_values_iter = {
    1283           56 :             let mut deltas = Vec::with_capacity(deltas_to_compact.len());
    1284          804 :             for l in deltas_to_compact.iter() {
    1285          804 :                 let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
    1286          804 :                 deltas.push(l);
    1287              :             }
    1288           56 :             MergeIterator::create(&deltas, &[], ctx)
    1289           56 :         };
    1290           56 : 
    1291           56 :         // This iterator walks through all keys and is needed to calculate size used by each key
    1292           56 :         let mut all_keys_iter = all_keys
    1293           56 :             .iter()
    1294      4128076 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    1295      4128020 :             .coalesce(|mut prev, cur| {
    1296      4128020 :                 // Coalesce keys that belong to the same key pair.
    1297      4128020 :                 // This ensures that compaction doesn't put them
    1298      4128020 :                 // into different layer files.
    1299      4128020 :                 // Still limit this by the target file size,
    1300      4128020 :                 // so that we keep the size of the files in
    1301      4128020 :                 // check.
    1302      4128020 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    1303        80076 :                     prev.2 += cur.2;
    1304        80076 :                     Ok(prev)
    1305              :                 } else {
    1306      4047944 :                     Err((prev, cur))
    1307              :                 }
    1308      4128020 :             });
    1309           56 : 
    1310           56 :         // Merge the contents of all the input delta layers into a new set
    1311           56 :         // of delta layers, based on the current partitioning.
    1312           56 :         //
    1313           56 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1314           56 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1315           56 :         // would be too large. In that case, we also split on the LSN dimension.
    1316           56 :         //
    1317           56 :         // LSN
    1318           56 :         //  ^
    1319           56 :         //  |
    1320           56 :         //  | +-----------+            +--+--+--+--+
    1321           56 :         //  | |           |            |  |  |  |  |
    1322           56 :         //  | +-----------+            |  |  |  |  |
    1323           56 :         //  | |           |            |  |  |  |  |
    1324           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1325           56 :         //  | |           |            |  |  |  |  |
    1326           56 :         //  | +-----------+            |  |  |  |  |
    1327           56 :         //  | |           |            |  |  |  |  |
    1328           56 :         //  | +-----------+            +--+--+--+--+
    1329           56 :         //  |
    1330           56 :         //  +--------------> key
    1331           56 :         //
    1332           56 :         //
    1333           56 :         // If one key (X) has a lot of page versions:
    1334           56 :         //
    1335           56 :         // LSN
    1336           56 :         //  ^
    1337           56 :         //  |                                 (X)
    1338           56 :         //  | +-----------+            +--+--+--+--+
    1339           56 :         //  | |           |            |  |  |  |  |
    1340           56 :         //  | +-----------+            |  |  +--+  |
    1341           56 :         //  | |           |            |  |  |  |  |
    1342           56 :         //  | +-----------+     ==>    |  |  |  |  |
    1343           56 :         //  | |           |            |  |  +--+  |
    1344           56 :         //  | +-----------+            |  |  |  |  |
    1345           56 :         //  | |           |            |  |  |  |  |
    1346           56 :         //  | +-----------+            +--+--+--+--+
    1347           56 :         //  |
    1348           56 :         //  +--------------> key
    1349           56 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1350           56 :         // based on the partitioning.
    1351           56 :         //
    1352           56 :         // TODO: we should also opportunistically materialize and
    1353           56 :         // garbage collect what we can.
    1354           56 :         let mut new_layers = Vec::new();
    1355           56 :         let mut prev_key: Option<Key> = None;
    1356           56 :         let mut writer: Option<DeltaLayerWriter> = None;
    1357           56 :         let mut key_values_total_size = 0u64;
    1358           56 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1359           56 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1360           56 :         let mut next_hole = 0; // index of next hole in holes vector
    1361           56 : 
    1362           56 :         let mut keys = 0;
    1363              : 
    1364      4128132 :         while let Some((key, lsn, value)) = all_values_iter
    1365      4128132 :             .next()
    1366      4128132 :             .await
    1367      4128132 :             .map_err(CompactionError::Other)?
    1368              :         {
    1369      4128076 :             keys += 1;
    1370      4128076 : 
    1371      4128076 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1372              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1373              :                 // shuffling an order of million keys per layer, this means we'll check it
    1374              :                 // around tens of times per layer.
    1375            0 :                 return Err(CompactionError::ShuttingDown);
    1376      4128076 :             }
    1377      4128076 : 
    1378      4128076 :             let same_key = prev_key == Some(key);
    1379      4128076 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1380      4128076 :             if !same_key || lsn == dup_end_lsn {
    1381      4048000 :                 let mut next_key_size = 0u64;
    1382      4048000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1383      4048000 :                 dup_start_lsn = Lsn::INVALID;
    1384      4048000 :                 if !same_key {
    1385      4048000 :                     dup_end_lsn = Lsn::INVALID;
    1386      4048000 :                 }
    1387              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1388      4048000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1389      4048000 :                     next_key_size = next_size;
    1390      4048000 :                     if key != next_key {
    1391      4047944 :                         if dup_end_lsn.is_valid() {
    1392            0 :                             // We are writting segment with duplicates:
    1393            0 :                             // place all remaining values of this key in separate segment
    1394            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1395            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1396      4047944 :                         }
    1397      4047944 :                         break;
    1398           56 :                     }
    1399           56 :                     key_values_total_size += next_size;
    1400           56 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1401           56 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1402           56 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1403              :                         // Split key between multiple layers: such layer can contain only single key
    1404            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1405            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1406              :                         } else {
    1407            0 :                             lsn // start with the first LSN for this key
    1408              :                         };
    1409            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1410            0 :                         break;
    1411           56 :                     }
    1412              :                 }
    1413              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1414      4048000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1415            0 :                     dup_start_lsn = dup_end_lsn;
    1416            0 :                     dup_end_lsn = lsn_range.end;
    1417      4048000 :                 }
    1418      4048000 :                 if writer.is_some() {
    1419      4047944 :                     let written_size = writer.as_mut().unwrap().size();
    1420      4047944 :                     let contains_hole =
    1421      4047944 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1422              :                     // check if key cause layer overflow or contains hole...
    1423      4047944 :                     if is_dup_layer
    1424      4047944 :                         || dup_end_lsn.is_valid()
    1425      4047944 :                         || written_size + key_values_total_size > target_file_size
    1426      4047384 :                         || contains_hole
    1427              :                     {
    1428              :                         // ... if so, flush previous layer and prepare to write new one
    1429          560 :                         let (desc, path) = writer
    1430          560 :                             .take()
    1431          560 :                             .unwrap()
    1432          560 :                             .finish(prev_key.unwrap().next(), ctx)
    1433          560 :                             .await
    1434          560 :                             .map_err(CompactionError::Other)?;
    1435          560 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1436          560 :                             .map_err(CompactionError::Other)?;
    1437              : 
    1438          560 :                         new_layers.push(new_delta);
    1439          560 :                         writer = None;
    1440          560 : 
    1441          560 :                         if contains_hole {
    1442            0 :                             // skip hole
    1443            0 :                             next_hole += 1;
    1444          560 :                         }
    1445      4047384 :                     }
    1446           56 :                 }
    1447              :                 // Remember size of key value because at next iteration we will access next item
    1448      4048000 :                 key_values_total_size = next_key_size;
    1449        80076 :             }
    1450      4128076 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1451            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1452            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1453            0 :                 )))
    1454      4128076 :             });
    1455              : 
    1456      4128076 :             if !self.shard_identity.is_key_disposable(&key) {
    1457      4128076 :                 if writer.is_none() {
    1458          616 :                     if self.cancel.is_cancelled() {
    1459              :                         // to be somewhat responsive to cancellation, check for each new layer
    1460            0 :                         return Err(CompactionError::ShuttingDown);
    1461          616 :                     }
    1462              :                     // Create writer if not initiaized yet
    1463          616 :                     writer = Some(
    1464              :                         DeltaLayerWriter::new(
    1465          616 :                             self.conf,
    1466          616 :                             self.timeline_id,
    1467          616 :                             self.tenant_shard_id,
    1468          616 :                             key,
    1469          616 :                             if dup_end_lsn.is_valid() {
    1470              :                                 // this is a layer containing slice of values of the same key
    1471            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1472            0 :                                 dup_start_lsn..dup_end_lsn
    1473              :                             } else {
    1474          616 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1475          616 :                                 lsn_range.clone()
    1476              :                             },
    1477          616 :                             ctx,
    1478          616 :                         )
    1479          616 :                         .await
    1480          616 :                         .map_err(CompactionError::Other)?,
    1481              :                     );
    1482              : 
    1483          616 :                     keys = 0;
    1484      4127460 :                 }
    1485              : 
    1486      4128076 :                 writer
    1487      4128076 :                     .as_mut()
    1488      4128076 :                     .unwrap()
    1489      4128076 :                     .put_value(key, lsn, value, ctx)
    1490      4128076 :                     .await
    1491      4128076 :                     .map_err(CompactionError::Other)?;
    1492              :             } else {
    1493            0 :                 let shard = self.shard_identity.shard_index();
    1494            0 :                 let owner = self.shard_identity.get_shard_number(&key);
    1495            0 :                 if cfg!(debug_assertions) {
    1496            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    1497            0 :                 }
    1498            0 :                 debug!("dropping key {key} during compaction (it belongs on shard {owner})");
    1499              :             }
    1500              : 
    1501      4128076 :             if !new_layers.is_empty() {
    1502        39572 :                 fail_point!("after-timeline-compacted-first-L1");
    1503      4088504 :             }
    1504              : 
    1505      4128076 :             prev_key = Some(key);
    1506              :         }
    1507           56 :         if let Some(writer) = writer {
    1508           56 :             let (desc, path) = writer
    1509           56 :                 .finish(prev_key.unwrap().next(), ctx)
    1510           56 :                 .await
    1511           56 :                 .map_err(CompactionError::Other)?;
    1512           56 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1513           56 :                 .map_err(CompactionError::Other)?;
    1514           56 :             new_layers.push(new_delta);
    1515            0 :         }
    1516              : 
    1517              :         // Sync layers
    1518           56 :         if !new_layers.is_empty() {
    1519              :             // Print a warning if the created layer is larger than double the target size
    1520              :             // Add two pages for potential overhead. This should in theory be already
    1521              :             // accounted for in the target calculation, but for very small targets,
    1522              :             // we still might easily hit the limit otherwise.
    1523           56 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1524          616 :             for layer in new_layers.iter() {
    1525          616 :                 if layer.layer_desc().file_size > warn_limit {
    1526            0 :                     warn!(
    1527              :                         %layer,
    1528            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1529              :                     );
    1530          616 :                 }
    1531              :             }
    1532              : 
    1533              :             // The writer.finish() above already did the fsync of the inodes.
    1534              :             // We just need to fsync the directory in which these inodes are linked,
    1535              :             // which we know to be the timeline directory.
    1536              :             //
    1537              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1538              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1539              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1540           56 :             let timeline_dir = VirtualFile::open(
    1541           56 :                 &self
    1542           56 :                     .conf
    1543           56 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1544           56 :                 ctx,
    1545           56 :             )
    1546           56 :             .await
    1547           56 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1548           56 :             timeline_dir
    1549           56 :                 .sync_all()
    1550           56 :                 .await
    1551           56 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1552            0 :         }
    1553              : 
    1554           56 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1555           56 :         stats.new_deltas_count = Some(new_layers.len());
    1556          616 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1557           56 : 
    1558           56 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1559           56 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1560              :         {
    1561           56 :             Ok(stats_json) => {
    1562           56 :                 info!(
    1563            0 :                     stats_json = stats_json.as_str(),
    1564            0 :                     "compact_level0_phase1 stats available"
    1565              :                 )
    1566              :             }
    1567            0 :             Err(e) => {
    1568            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1569              :             }
    1570              :         }
    1571              : 
    1572              :         // Without this, rustc complains about deltas_to_compact still
    1573              :         // being borrowed when we `.into_iter()` below.
    1574           56 :         drop(all_values_iter);
    1575           56 : 
    1576           56 :         Ok(CompactLevel0Phase1Result {
    1577           56 :             new_layers,
    1578           56 :             deltas_to_compact: deltas_to_compact
    1579           56 :                 .into_iter()
    1580          804 :                 .map(|x| x.drop_eviction_guard())
    1581           56 :                 .collect::<Vec<_>>(),
    1582           56 :             fully_compacted,
    1583           56 :         })
    1584          728 :     }
    1585              : }
    1586              : 
    1587              : #[derive(Default)]
    1588              : struct CompactLevel0Phase1Result {
    1589              :     new_layers: Vec<ResidentLayer>,
    1590              :     deltas_to_compact: Vec<Layer>,
    1591              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1592              :     // L0 compaction size limit.
    1593              :     fully_compacted: bool,
    1594              : }
    1595              : 
    1596              : #[derive(Default)]
    1597              : struct CompactLevel0Phase1StatsBuilder {
    1598              :     version: Option<u64>,
    1599              :     tenant_id: Option<TenantShardId>,
    1600              :     timeline_id: Option<TimelineId>,
    1601              :     read_lock_acquisition_micros: DurationRecorder,
    1602              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1603              :     read_lock_held_key_sort_micros: DurationRecorder,
    1604              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1605              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1606              :     read_lock_drop_micros: DurationRecorder,
    1607              :     write_layer_files_micros: DurationRecorder,
    1608              :     level0_deltas_count: Option<usize>,
    1609              :     new_deltas_count: Option<usize>,
    1610              :     new_deltas_size: Option<u64>,
    1611              : }
    1612              : 
    1613              : #[derive(serde::Serialize)]
    1614              : struct CompactLevel0Phase1Stats {
    1615              :     version: u64,
    1616              :     tenant_id: TenantShardId,
    1617              :     timeline_id: TimelineId,
    1618              :     read_lock_acquisition_micros: RecordedDuration,
    1619              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1620              :     read_lock_held_key_sort_micros: RecordedDuration,
    1621              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1622              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1623              :     read_lock_drop_micros: RecordedDuration,
    1624              :     write_layer_files_micros: RecordedDuration,
    1625              :     level0_deltas_count: usize,
    1626              :     new_deltas_count: usize,
    1627              :     new_deltas_size: u64,
    1628              : }
    1629              : 
    1630              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1631              :     type Error = anyhow::Error;
    1632              : 
    1633           56 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1634           56 :         Ok(Self {
    1635           56 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1636           56 :             tenant_id: value
    1637           56 :                 .tenant_id
    1638           56 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1639           56 :             timeline_id: value
    1640           56 :                 .timeline_id
    1641           56 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1642           56 :             read_lock_acquisition_micros: value
    1643           56 :                 .read_lock_acquisition_micros
    1644           56 :                 .into_recorded()
    1645           56 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1646           56 :             read_lock_held_spawn_blocking_startup_micros: value
    1647           56 :                 .read_lock_held_spawn_blocking_startup_micros
    1648           56 :                 .into_recorded()
    1649           56 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1650           56 :             read_lock_held_key_sort_micros: value
    1651           56 :                 .read_lock_held_key_sort_micros
    1652           56 :                 .into_recorded()
    1653           56 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1654           56 :             read_lock_held_prerequisites_micros: value
    1655           56 :                 .read_lock_held_prerequisites_micros
    1656           56 :                 .into_recorded()
    1657           56 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1658           56 :             read_lock_held_compute_holes_micros: value
    1659           56 :                 .read_lock_held_compute_holes_micros
    1660           56 :                 .into_recorded()
    1661           56 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1662           56 :             read_lock_drop_micros: value
    1663           56 :                 .read_lock_drop_micros
    1664           56 :                 .into_recorded()
    1665           56 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1666           56 :             write_layer_files_micros: value
    1667           56 :                 .write_layer_files_micros
    1668           56 :                 .into_recorded()
    1669           56 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1670           56 :             level0_deltas_count: value
    1671           56 :                 .level0_deltas_count
    1672           56 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1673           56 :             new_deltas_count: value
    1674           56 :                 .new_deltas_count
    1675           56 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1676           56 :             new_deltas_size: value
    1677           56 :                 .new_deltas_size
    1678           56 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1679              :         })
    1680           56 :     }
    1681              : }
    1682              : 
    1683              : impl Timeline {
    1684              :     /// Entry point for new tiered compaction algorithm.
    1685              :     ///
    1686              :     /// All the real work is in the implementation in the pageserver_compaction
    1687              :     /// crate. The code here would apply to any algorithm implemented by the
    1688              :     /// same interface, but tiered is the only one at the moment.
    1689              :     ///
    1690              :     /// TODO: cancellation
    1691            0 :     pub(crate) async fn compact_tiered(
    1692            0 :         self: &Arc<Self>,
    1693            0 :         _cancel: &CancellationToken,
    1694            0 :         ctx: &RequestContext,
    1695            0 :     ) -> Result<(), CompactionError> {
    1696            0 :         let fanout = self.get_compaction_threshold() as u64;
    1697            0 :         let target_file_size = self.get_checkpoint_distance();
    1698              : 
    1699              :         // Find the top of the historical layers
    1700            0 :         let end_lsn = {
    1701            0 :             let guard = self.layers.read().await;
    1702            0 :             let layers = guard.layer_map()?;
    1703              : 
    1704            0 :             let l0_deltas = layers.level0_deltas();
    1705            0 : 
    1706            0 :             // As an optimization, if we find that there are too few L0 layers,
    1707            0 :             // bail out early. We know that the compaction algorithm would do
    1708            0 :             // nothing in that case.
    1709            0 :             if l0_deltas.len() < fanout as usize {
    1710              :                 // doesn't need compacting
    1711            0 :                 return Ok(());
    1712            0 :             }
    1713            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1714            0 :         };
    1715            0 : 
    1716            0 :         // Is the timeline being deleted?
    1717            0 :         if self.is_stopping() {
    1718            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1719            0 :             return Err(CompactionError::ShuttingDown);
    1720            0 :         }
    1721              : 
    1722            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1723              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1724            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1725            0 : 
    1726            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1727            0 :             &mut adaptor,
    1728            0 :             end_lsn,
    1729            0 :             target_file_size,
    1730            0 :             fanout,
    1731            0 :             ctx,
    1732            0 :         )
    1733            0 :         .await
    1734              :         // TODO: compact_tiered needs to return CompactionError
    1735            0 :         .map_err(CompactionError::Other)?;
    1736              : 
    1737            0 :         adaptor.flush_updates().await?;
    1738            0 :         Ok(())
    1739            0 :     }
    1740              : 
    1741              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1742              :     ///
    1743              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1744              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1745              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1746              :     ///
    1747              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1748              :     ///
    1749              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1750              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1751              :     ///
    1752              :     /// The function will produce:
    1753              :     ///
    1754              :     /// ```plain
    1755              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1756              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1757              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1758              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1759              :     /// ```
    1760              :     ///
    1761              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1762         1260 :     pub(crate) async fn generate_key_retention(
    1763         1260 :         self: &Arc<Timeline>,
    1764         1260 :         key: Key,
    1765         1260 :         full_history: &[(Key, Lsn, Value)],
    1766         1260 :         horizon: Lsn,
    1767         1260 :         retain_lsn_below_horizon: &[Lsn],
    1768         1260 :         delta_threshold_cnt: usize,
    1769         1260 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1770         1260 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1771              :         // Pre-checks for the invariants
    1772              : 
    1773         1260 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    1774              : 
    1775         1260 :         if debug_mode {
    1776         3060 :             for (log_key, _, _) in full_history {
    1777         1800 :                 assert_eq!(log_key, &key, "mismatched key");
    1778              :             }
    1779         1260 :             for i in 1..full_history.len() {
    1780          540 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1781          540 :                 if full_history[i - 1].1 == full_history[i].1 {
    1782            0 :                     assert!(
    1783            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1784            0 :                         "unordered delta/image, or duplicated delta"
    1785              :                     );
    1786          540 :                 }
    1787              :             }
    1788              :             // There was an assertion for no base image that checks if the first
    1789              :             // record in the history is `will_init` before, but it was removed.
    1790              :             // This is explained in the test cases for generate_key_retention.
    1791              :             // Search "incomplete history" for more information.
    1792         2820 :             for lsn in retain_lsn_below_horizon {
    1793         1560 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1794              :             }
    1795         1260 :             for i in 1..retain_lsn_below_horizon.len() {
    1796          712 :                 assert!(
    1797          712 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1798            0 :                     "unordered LSN"
    1799              :                 );
    1800              :             }
    1801            0 :         }
    1802         1260 :         let has_ancestor = base_img_from_ancestor.is_some();
    1803              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1804              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1805         1260 :         let (mut split_history, lsn_split_points) = {
    1806         1260 :             let mut split_history = Vec::new();
    1807         1260 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1808         1260 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1809         2820 :             for lsn in retain_lsn_below_horizon {
    1810         1560 :                 lsn_split_points.push(*lsn);
    1811         1560 :             }
    1812         1260 :             lsn_split_points.push(horizon);
    1813         1260 :             let mut current_idx = 0;
    1814         3060 :             for item @ (_, lsn, _) in full_history {
    1815         2288 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1816          488 :                     current_idx += 1;
    1817          488 :                 }
    1818         1800 :                 split_history[current_idx].push(item);
    1819              :             }
    1820         1260 :             (split_history, lsn_split_points)
    1821              :         };
    1822              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1823         5340 :         for split_for_lsn in &mut split_history {
    1824         4080 :             let mut prev_lsn = None;
    1825         4080 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1826         4080 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1827         1800 :                 if let Some(prev_lsn) = &prev_lsn {
    1828          236 :                     if *prev_lsn == lsn {
    1829              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1830              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1831              :                         // drop this delta and keep the image.
    1832              :                         //
    1833              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1834              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1835              :                         // dropped.
    1836              :                         //
    1837              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1838              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1839            0 :                         continue;
    1840          236 :                     }
    1841         1564 :                 }
    1842         1800 :                 prev_lsn = Some(lsn);
    1843         1800 :                 new_split_for_lsn.push(record);
    1844              :             }
    1845         4080 :             *split_for_lsn = new_split_for_lsn;
    1846              :         }
    1847              :         // Step 3: generate images when necessary
    1848         1260 :         let mut retention = Vec::with_capacity(split_history.len());
    1849         1260 :         let mut records_since_last_image = 0;
    1850         1260 :         let batch_cnt = split_history.len();
    1851         1260 :         assert!(
    1852         1260 :             batch_cnt >= 2,
    1853            0 :             "should have at least below + above horizon batches"
    1854              :         );
    1855         1260 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1856         1260 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1857           84 :             replay_history.push((key, lsn, Value::Image(img)));
    1858         1176 :         }
    1859              : 
    1860              :         /// Generate debug information for the replay history
    1861            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1862              :             use std::fmt::Write;
    1863            0 :             let mut output = String::new();
    1864            0 :             if let Some((key, _, _)) = replay_history.first() {
    1865            0 :                 write!(output, "key={} ", key).unwrap();
    1866            0 :                 let mut cnt = 0;
    1867            0 :                 for (_, lsn, val) in replay_history {
    1868            0 :                     if val.is_image() {
    1869            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1870            0 :                     } else if val.will_init() {
    1871            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1872            0 :                     } else {
    1873            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1874            0 :                     }
    1875            0 :                     cnt += 1;
    1876            0 :                     if cnt >= 128 {
    1877            0 :                         write!(output, "... and more").unwrap();
    1878            0 :                         break;
    1879            0 :                     }
    1880              :                 }
    1881            0 :             } else {
    1882            0 :                 write!(output, "<no history>").unwrap();
    1883            0 :             }
    1884            0 :             output
    1885            0 :         }
    1886              : 
    1887            0 :         fn generate_debug_trace(
    1888            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1889            0 :             full_history: &[(Key, Lsn, Value)],
    1890            0 :             lsns: &[Lsn],
    1891            0 :             horizon: Lsn,
    1892            0 :         ) -> String {
    1893              :             use std::fmt::Write;
    1894            0 :             let mut output = String::new();
    1895            0 :             if let Some(replay_history) = replay_history {
    1896            0 :                 writeln!(
    1897            0 :                     output,
    1898            0 :                     "replay_history: {}",
    1899            0 :                     generate_history_trace(replay_history)
    1900            0 :                 )
    1901            0 :                 .unwrap();
    1902            0 :             } else {
    1903            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1904            0 :             }
    1905            0 :             writeln!(
    1906            0 :                 output,
    1907            0 :                 "full_history: {}",
    1908            0 :                 generate_history_trace(full_history)
    1909            0 :             )
    1910            0 :             .unwrap();
    1911            0 :             writeln!(
    1912            0 :                 output,
    1913            0 :                 "when processing: [{}] horizon={}",
    1914            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1915            0 :                 horizon
    1916            0 :             )
    1917            0 :             .unwrap();
    1918            0 :             output
    1919            0 :         }
    1920              : 
    1921         1260 :         let mut key_exists = false;
    1922         4080 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1923              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1924         4080 :             records_since_last_image += split_for_lsn.len();
    1925              :             // Whether to produce an image into the final layer files
    1926         4080 :             let produce_image = if i == 0 && !has_ancestor {
    1927              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1928         1176 :                 true
    1929         2904 :             } else if i == batch_cnt - 1 {
    1930              :                 // Do not generate images for the last batch (above horizon)
    1931         1260 :                 false
    1932         1644 :             } else if records_since_last_image == 0 {
    1933         1288 :                 false
    1934          356 :             } else if records_since_last_image >= delta_threshold_cnt {
    1935              :                 // Generate images when there are too many records
    1936           12 :                 true
    1937              :             } else {
    1938          344 :                 false
    1939              :             };
    1940         4080 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1941              :             // Only retain the items after the last image record
    1942         5028 :             for idx in (0..replay_history.len()).rev() {
    1943         5028 :                 if replay_history[idx].2.will_init() {
    1944         4080 :                     replay_history = replay_history[idx..].to_vec();
    1945         4080 :                     break;
    1946          948 :                 }
    1947              :             }
    1948         4080 :             if replay_history.is_empty() && !key_exists {
    1949              :                 // The key does not exist at earlier LSN, we can skip this iteration.
    1950            0 :                 retention.push(Vec::new());
    1951            0 :                 continue;
    1952         4080 :             } else {
    1953         4080 :                 key_exists = true;
    1954         4080 :             }
    1955         4080 :             let Some((_, _, val)) = replay_history.first() else {
    1956            0 :                 unreachable!("replay history should not be empty once it exists")
    1957              :             };
    1958         4080 :             if !val.will_init() {
    1959            0 :                 return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
    1960            0 :                     generate_debug_trace(
    1961            0 :                         Some(&replay_history),
    1962            0 :                         full_history,
    1963            0 :                         retain_lsn_below_horizon,
    1964            0 :                         horizon,
    1965            0 :                     )
    1966            0 :                 });
    1967         4080 :             }
    1968              :             // Whether to reconstruct the image. In debug mode, we will generate an image
    1969              :             // at every retain_lsn to ensure data is not corrupted, but we won't put the
    1970              :             // image into the final layer.
    1971         4080 :             let generate_image = produce_image || debug_mode;
    1972         4080 :             if produce_image {
    1973         1188 :                 records_since_last_image = 0;
    1974         2892 :             }
    1975         4080 :             let img_and_lsn = if generate_image {
    1976         4080 :                 let replay_history_for_debug = if debug_mode {
    1977         4080 :                     Some(replay_history.clone())
    1978              :                 } else {
    1979            0 :                     None
    1980              :                 };
    1981         4080 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1982         4080 :                 let history = if produce_image {
    1983         1188 :                     std::mem::take(&mut replay_history)
    1984              :                 } else {
    1985         2892 :                     replay_history.clone()
    1986              :                 };
    1987         4080 :                 let mut img = None;
    1988         4080 :                 let mut records = Vec::with_capacity(history.len());
    1989         4080 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1990         4036 :                     img = Some((*lsn, val.clone()));
    1991         4036 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1992          920 :                         let Value::WalRecord(rec) = val else {
    1993            0 :                             return Err(anyhow::anyhow!(
    1994            0 :                                 "invalid record, first record is image, expect walrecords"
    1995            0 :                             ))
    1996            0 :                             .with_context(|| {
    1997            0 :                                 generate_debug_trace(
    1998            0 :                                     replay_history_for_debug_ref,
    1999            0 :                                     full_history,
    2000            0 :                                     retain_lsn_below_horizon,
    2001            0 :                                     horizon,
    2002            0 :                                 )
    2003            0 :                             });
    2004              :                         };
    2005          920 :                         records.push((lsn, rec));
    2006              :                     }
    2007              :                 } else {
    2008           72 :                     for (_, lsn, val) in history.into_iter() {
    2009           72 :                         let Value::WalRecord(rec) = val else {
    2010            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    2011            0 :                                 .with_context(|| generate_debug_trace(
    2012            0 :                                     replay_history_for_debug_ref,
    2013            0 :                                     full_history,
    2014            0 :                                     retain_lsn_below_horizon,
    2015            0 :                                     horizon,
    2016            0 :                                 ));
    2017              :                         };
    2018           72 :                         records.push((lsn, rec));
    2019              :                     }
    2020              :                 }
    2021         4080 :                 records.reverse();
    2022         4080 :                 let state = ValueReconstructState { img, records };
    2023              :                 // last batch does not generate image so i is always in range, unless we force generate
    2024              :                 // an image during testing
    2025         4080 :                 let request_lsn = if i >= lsn_split_points.len() {
    2026         1260 :                     Lsn::MAX
    2027              :                 } else {
    2028         2820 :                     lsn_split_points[i]
    2029              :                 };
    2030         4080 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    2031         4080 :                 Some((request_lsn, img))
    2032              :             } else {
    2033            0 :                 None
    2034              :             };
    2035         4080 :             if produce_image {
    2036         1188 :                 let (request_lsn, img) = img_and_lsn.unwrap();
    2037         1188 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    2038         1188 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    2039         2892 :             } else {
    2040         2892 :                 let deltas = split_for_lsn
    2041         2892 :                     .iter()
    2042         2892 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    2043         2892 :                     .collect_vec();
    2044         2892 :                 retention.push(deltas);
    2045         2892 :             }
    2046              :         }
    2047         1260 :         let mut result = Vec::with_capacity(retention.len());
    2048         1260 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    2049         4080 :         for (idx, logs) in retention.into_iter().enumerate() {
    2050         4080 :             if idx == lsn_split_points.len() {
    2051         1260 :                 return Ok(KeyHistoryRetention {
    2052         1260 :                     below_horizon: result,
    2053         1260 :                     above_horizon: KeyLogAtLsn(logs),
    2054         1260 :                 });
    2055         2820 :             } else {
    2056         2820 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    2057         2820 :             }
    2058              :         }
    2059            0 :         unreachable!("key retention is empty")
    2060         1260 :     }
    2061              : 
    2062              :     /// Check how much space is left on the disk
    2063          104 :     async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
    2064          104 :         let tenants_dir = self.conf.tenants_path();
    2065              : 
    2066          104 :         let stat = Statvfs::get(&tenants_dir, None)
    2067          104 :             .context("statvfs failed, presumably directory got unlinked")?;
    2068              : 
    2069          104 :         let (avail_bytes, _) = stat.get_avail_total_bytes();
    2070          104 : 
    2071          104 :         Ok(avail_bytes)
    2072          104 :     }
    2073              : 
    2074              :     /// Check if the compaction can proceed safely without running out of space. We assume the size
    2075              :     /// upper bound of the produced files of a compaction job is the same as all layers involved in
    2076              :     /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
    2077              :     /// compaction.
    2078          104 :     async fn check_compaction_space(
    2079          104 :         self: &Arc<Self>,
    2080          104 :         layer_selection: &[Layer],
    2081          104 :     ) -> anyhow::Result<()> {
    2082          104 :         let available_space = self.check_available_space().await?;
    2083          104 :         let mut remote_layer_size = 0;
    2084          104 :         let mut all_layer_size = 0;
    2085          408 :         for layer in layer_selection {
    2086          304 :             let needs_download = layer.needs_download().await?;
    2087          304 :             if needs_download.is_some() {
    2088            0 :                 remote_layer_size += layer.layer_desc().file_size;
    2089          304 :             }
    2090          304 :             all_layer_size += layer.layer_desc().file_size;
    2091              :         }
    2092          104 :         let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
    2093          104 :         if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
    2094              :         {
    2095            0 :             return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
    2096            0 :                 available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
    2097          104 :         }
    2098          104 :         Ok(())
    2099          104 :     }
    2100              : 
    2101              :     /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
    2102              :     /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
    2103              :     /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
    2104              :     /// here.
    2105          108 :     pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
    2106          108 :         let gc_cutoff_lsn = {
    2107          108 :             let gc_info = self.gc_info.read().unwrap();
    2108          108 :             gc_info.min_cutoff()
    2109          108 :         };
    2110          108 : 
    2111          108 :         // TODO: standby horizon should use leases so we don't really need to consider it here.
    2112          108 :         // let watermark = watermark.min(self.standby_horizon.load());
    2113          108 : 
    2114          108 :         // TODO: ensure the child branches will not use anything below the watermark, or consider
    2115          108 :         // them when computing the watermark.
    2116          108 :         gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
    2117          108 :     }
    2118              : 
    2119              :     /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
    2120              :     /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
    2121              :     /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
    2122              :     /// like a full compaction of the specified keyspace.
    2123            0 :     pub(crate) async fn gc_compaction_split_jobs(
    2124            0 :         self: &Arc<Self>,
    2125            0 :         job: GcCompactJob,
    2126            0 :         sub_compaction_max_job_size_mb: Option<u64>,
    2127            0 :     ) -> anyhow::Result<Vec<GcCompactJob>> {
    2128            0 :         let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
    2129            0 :             job.compact_lsn_range.end
    2130              :         } else {
    2131            0 :             self.get_gc_compaction_watermark()
    2132              :         };
    2133              : 
    2134            0 :         if compact_below_lsn == Lsn::INVALID {
    2135            0 :             tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
    2136            0 :             return Ok(vec![]);
    2137            0 :         }
    2138              : 
    2139              :         // Split compaction job to about 4GB each
    2140              :         const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
    2141            0 :         let sub_compaction_max_job_size_mb =
    2142            0 :             sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
    2143            0 : 
    2144            0 :         let mut compact_jobs = Vec::new();
    2145            0 :         // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
    2146            0 :         // by estimating the amount of files read for a compaction job. We should also partition on LSN.
    2147            0 :         let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
    2148              :         // Truncate the key range to be within user specified compaction range.
    2149            0 :         fn truncate_to(
    2150            0 :             source_start: &Key,
    2151            0 :             source_end: &Key,
    2152            0 :             target_start: &Key,
    2153            0 :             target_end: &Key,
    2154            0 :         ) -> Option<(Key, Key)> {
    2155            0 :             let start = source_start.max(target_start);
    2156            0 :             let end = source_end.min(target_end);
    2157            0 :             if start < end {
    2158            0 :                 Some((*start, *end))
    2159              :             } else {
    2160            0 :                 None
    2161              :             }
    2162            0 :         }
    2163            0 :         let mut split_key_ranges = Vec::new();
    2164            0 :         let ranges = dense_ks
    2165            0 :             .parts
    2166            0 :             .iter()
    2167            0 :             .map(|partition| partition.ranges.iter())
    2168            0 :             .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
    2169            0 :             .flatten()
    2170            0 :             .cloned()
    2171            0 :             .collect_vec();
    2172            0 :         for range in ranges.iter() {
    2173            0 :             let Some((start, end)) = truncate_to(
    2174            0 :                 &range.start,
    2175            0 :                 &range.end,
    2176            0 :                 &job.compact_key_range.start,
    2177            0 :                 &job.compact_key_range.end,
    2178            0 :             ) else {
    2179            0 :                 continue;
    2180              :             };
    2181            0 :             split_key_ranges.push((start, end));
    2182              :         }
    2183            0 :         split_key_ranges.sort();
    2184            0 :         let guard = self.layers.read().await;
    2185            0 :         let layer_map = guard.layer_map()?;
    2186            0 :         let mut current_start = None;
    2187            0 :         let ranges_num = split_key_ranges.len();
    2188            0 :         for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
    2189            0 :             if current_start.is_none() {
    2190            0 :                 current_start = Some(start);
    2191            0 :             }
    2192            0 :             let start = current_start.unwrap();
    2193            0 :             if start >= end {
    2194              :                 // We have already processed this partition.
    2195            0 :                 continue;
    2196            0 :             }
    2197            0 :             let res = layer_map.range_search(start..end, compact_below_lsn);
    2198            0 :             let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
    2199            0 :             if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
    2200              :                 // Try to extend the compaction range so that we include at least one full layer file.
    2201            0 :                 let extended_end = res
    2202            0 :                     .found
    2203            0 :                     .keys()
    2204            0 :                     .map(|layer| layer.layer.key_range.end)
    2205            0 :                     .min();
    2206              :                 // It is possible that the search range does not contain any layer files when we reach the end of the loop.
    2207              :                 // In this case, we simply use the specified key range end.
    2208            0 :                 let end = if let Some(extended_end) = extended_end {
    2209            0 :                     extended_end.max(end)
    2210              :                 } else {
    2211            0 :                     end
    2212              :                 };
    2213            0 :                 let end = if ranges_num == idx + 1 {
    2214              :                     // extend the compaction range to the end of the key range if it's the last partition
    2215            0 :                     end.max(job.compact_key_range.end)
    2216              :                 } else {
    2217            0 :                     end
    2218              :                 };
    2219            0 :                 info!(
    2220            0 :                     "splitting compaction job: {}..{}, estimated_size={}",
    2221              :                     start, end, total_size
    2222              :                 );
    2223            0 :                 compact_jobs.push(GcCompactJob {
    2224            0 :                     dry_run: job.dry_run,
    2225            0 :                     compact_key_range: start..end,
    2226            0 :                     compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
    2227            0 :                 });
    2228            0 :                 current_start = Some(end);
    2229            0 :             }
    2230              :         }
    2231            0 :         drop(guard);
    2232            0 :         Ok(compact_jobs)
    2233            0 :     }
    2234              : 
    2235              :     /// An experimental compaction building block that combines compaction with garbage collection.
    2236              :     ///
    2237              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    2238              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    2239              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    2240              :     /// and create delta layers with all deltas >= gc horizon.
    2241              :     ///
    2242              :     /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
    2243              :     /// Partial compaction will read and process all layers overlapping with the key range, even if it might
    2244              :     /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
    2245              :     /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
    2246              :     /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
    2247              :     /// part of the range.
    2248              :     ///
    2249              :     /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
    2250              :     /// the LSN. Otherwise, it will use the gc cutoff by default.
    2251          108 :     pub(crate) async fn compact_with_gc(
    2252          108 :         self: &Arc<Self>,
    2253          108 :         cancel: &CancellationToken,
    2254          108 :         options: CompactOptions,
    2255          108 :         ctx: &RequestContext,
    2256          108 :     ) -> anyhow::Result<()> {
    2257          108 :         let sub_compaction = options.sub_compaction;
    2258          108 :         let job = GcCompactJob::from_compact_options(options.clone());
    2259          108 :         if sub_compaction {
    2260            0 :             info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
    2261            0 :             let jobs = self
    2262            0 :                 .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
    2263            0 :                 .await?;
    2264            0 :             let jobs_len = jobs.len();
    2265            0 :             for (idx, job) in jobs.into_iter().enumerate() {
    2266            0 :                 info!(
    2267            0 :                     "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
    2268            0 :                     idx + 1,
    2269              :                     jobs_len
    2270              :                 );
    2271            0 :                 self.compact_with_gc_inner(cancel, job, ctx).await?;
    2272              :             }
    2273            0 :             if jobs_len == 0 {
    2274            0 :                 info!("no jobs to run, skipping gc bottom-most compaction");
    2275            0 :             }
    2276            0 :             return Ok(());
    2277          108 :         }
    2278          108 :         self.compact_with_gc_inner(cancel, job, ctx).await
    2279          108 :     }
    2280              : 
    2281          108 :     async fn compact_with_gc_inner(
    2282          108 :         self: &Arc<Self>,
    2283          108 :         cancel: &CancellationToken,
    2284          108 :         job: GcCompactJob,
    2285          108 :         ctx: &RequestContext,
    2286          108 :     ) -> anyhow::Result<()> {
    2287          108 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    2288          108 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    2289          108 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    2290          108 : 
    2291          108 :         let gc_lock = async {
    2292          108 :             tokio::select! {
    2293          108 :                 guard = self.gc_lock.lock() => Ok(guard),
    2294              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    2295          108 :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    2296              :             }
    2297          108 :         };
    2298              : 
    2299          108 :         let gc_lock = crate::timed(
    2300          108 :             gc_lock,
    2301          108 :             "acquires gc lock",
    2302          108 :             std::time::Duration::from_secs(5),
    2303          108 :         )
    2304          108 :         .await?;
    2305              : 
    2306          108 :         let dry_run = job.dry_run;
    2307          108 :         let compact_key_range = job.compact_key_range;
    2308          108 :         let compact_lsn_range = job.compact_lsn_range;
    2309              : 
    2310          108 :         let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
    2311              : 
    2312          108 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
    2313              : 
    2314          108 :         scopeguard::defer! {
    2315          108 :             info!("done enhanced gc bottom-most compaction");
    2316          108 :         };
    2317          108 : 
    2318          108 :         let mut stat = CompactionStatistics::default();
    2319              : 
    2320              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    2321              :         // The layer selection has the following properties:
    2322              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    2323              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    2324          104 :         let job_desc = {
    2325          108 :             let guard = self.layers.read().await;
    2326          108 :             let layers = guard.layer_map()?;
    2327          108 :             let gc_info = self.gc_info.read().unwrap();
    2328          108 :             let mut retain_lsns_below_horizon = Vec::new();
    2329          108 :             let gc_cutoff = {
    2330              :                 // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
    2331              :                 // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
    2332              :                 // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
    2333              :                 // to get the truth data.
    2334          108 :                 let real_gc_cutoff = self.get_gc_compaction_watermark();
    2335              :                 // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
    2336              :                 // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
    2337              :                 // the real cutoff.
    2338          108 :                 let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
    2339           96 :                     if real_gc_cutoff == Lsn::INVALID {
    2340              :                         // If the gc_cutoff is not generated yet, we should not compact anything.
    2341            0 :                         tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
    2342            0 :                         return Ok(());
    2343           96 :                     }
    2344           96 :                     real_gc_cutoff
    2345              :                 } else {
    2346           12 :                     compact_lsn_range.end
    2347              :                 };
    2348          108 :                 if gc_cutoff > real_gc_cutoff {
    2349            8 :                     warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
    2350            8 :                     gc_cutoff = real_gc_cutoff;
    2351          100 :                 }
    2352          108 :                 gc_cutoff
    2353              :             };
    2354          140 :             for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
    2355          140 :                 if lsn < &gc_cutoff {
    2356          140 :                     retain_lsns_below_horizon.push(*lsn);
    2357          140 :                 }
    2358              :             }
    2359          108 :             for lsn in gc_info.leases.keys() {
    2360            0 :                 if lsn < &gc_cutoff {
    2361            0 :                     retain_lsns_below_horizon.push(*lsn);
    2362            0 :                 }
    2363              :             }
    2364          108 :             let mut selected_layers: Vec<Layer> = Vec::new();
    2365          108 :             drop(gc_info);
    2366              :             // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
    2367          108 :             let Some(max_layer_lsn) = layers
    2368          108 :                 .iter_historic_layers()
    2369          488 :                 .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
    2370          416 :                 .map(|desc| desc.get_lsn_range().end)
    2371          108 :                 .max()
    2372              :             else {
    2373            0 :                 info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
    2374            0 :                 return Ok(());
    2375              :             };
    2376              :             // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
    2377              :             // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
    2378              :             // it is a branch.
    2379          108 :             let Some(min_layer_lsn) = layers
    2380          108 :                 .iter_historic_layers()
    2381          488 :                 .filter(|desc| {
    2382          488 :                     if compact_lsn_range.start == Lsn::INVALID {
    2383          396 :                         true // select all layers below if start == Lsn(0)
    2384              :                     } else {
    2385           92 :                         desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
    2386              :                     }
    2387          488 :                 })
    2388          452 :                 .map(|desc| desc.get_lsn_range().start)
    2389          108 :                 .min()
    2390              :             else {
    2391            0 :                 info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
    2392            0 :                 return Ok(());
    2393              :             };
    2394              :             // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
    2395              :             // layers to compact.
    2396          108 :             let mut rewrite_layers = Vec::new();
    2397          488 :             for desc in layers.iter_historic_layers() {
    2398          488 :                 if desc.get_lsn_range().end <= max_layer_lsn
    2399          416 :                     && desc.get_lsn_range().start >= min_layer_lsn
    2400          380 :                     && overlaps_with(&desc.get_key_range(), &compact_key_range)
    2401              :                 {
    2402              :                     // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
    2403              :                     // even if it might contain extra keys
    2404          304 :                     selected_layers.push(guard.get_from_desc(&desc));
    2405          304 :                     // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
    2406          304 :                     // to overlap image layers)
    2407          304 :                     if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
    2408            4 :                     {
    2409            4 :                         rewrite_layers.push(desc);
    2410          300 :                     }
    2411          184 :                 }
    2412              :             }
    2413          108 :             if selected_layers.is_empty() {
    2414            4 :                 info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
    2415            4 :                 return Ok(());
    2416          104 :             }
    2417          104 :             retain_lsns_below_horizon.sort();
    2418          104 :             GcCompactionJobDescription {
    2419          104 :                 selected_layers,
    2420          104 :                 gc_cutoff,
    2421          104 :                 retain_lsns_below_horizon,
    2422          104 :                 min_layer_lsn,
    2423          104 :                 max_layer_lsn,
    2424          104 :                 compaction_key_range: compact_key_range,
    2425          104 :                 rewrite_layers,
    2426          104 :             }
    2427              :         };
    2428          104 :         let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
    2429              :             // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
    2430              :             // We use job_desc.min_layer_lsn as if it's the lowest branch point.
    2431           16 :             (true, job_desc.min_layer_lsn)
    2432           88 :         } else if self.ancestor_timeline.is_some() {
    2433              :             // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
    2434              :             // LSN ranges all the way to the ancestor timeline.
    2435            4 :             (true, self.ancestor_lsn)
    2436              :         } else {
    2437           84 :             let res = job_desc
    2438           84 :                 .retain_lsns_below_horizon
    2439           84 :                 .first()
    2440           84 :                 .copied()
    2441           84 :                 .unwrap_or(job_desc.gc_cutoff);
    2442           84 :             if debug_mode {
    2443           84 :                 assert_eq!(
    2444           84 :                     res,
    2445           84 :                     job_desc
    2446           84 :                         .retain_lsns_below_horizon
    2447           84 :                         .iter()
    2448           84 :                         .min()
    2449           84 :                         .copied()
    2450           84 :                         .unwrap_or(job_desc.gc_cutoff)
    2451           84 :                 );
    2452            0 :             }
    2453           84 :             (false, res)
    2454              :         };
    2455          104 :         info!(
    2456            0 :             "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
    2457            0 :             job_desc.selected_layers.len(),
    2458            0 :             job_desc.rewrite_layers.len(),
    2459              :             job_desc.max_layer_lsn,
    2460              :             job_desc.min_layer_lsn,
    2461              :             job_desc.gc_cutoff,
    2462              :             lowest_retain_lsn,
    2463              :             job_desc.compaction_key_range.start,
    2464              :             job_desc.compaction_key_range.end,
    2465              :             has_data_below,
    2466              :         );
    2467              : 
    2468          408 :         for layer in &job_desc.selected_layers {
    2469          304 :             debug!("read layer: {}", layer.layer_desc().key());
    2470              :         }
    2471          108 :         for layer in &job_desc.rewrite_layers {
    2472            4 :             debug!("rewrite layer: {}", layer.key());
    2473              :         }
    2474              : 
    2475          104 :         self.check_compaction_space(&job_desc.selected_layers)
    2476          104 :             .await?;
    2477              : 
    2478              :         // Generate statistics for the compaction
    2479          408 :         for layer in &job_desc.selected_layers {
    2480          304 :             let desc = layer.layer_desc();
    2481          304 :             if desc.is_delta() {
    2482          172 :                 stat.visit_delta_layer(desc.file_size());
    2483          172 :             } else {
    2484          132 :                 stat.visit_image_layer(desc.file_size());
    2485          132 :             }
    2486              :         }
    2487              : 
    2488              :         // Step 1: construct a k-merge iterator over all layers.
    2489              :         // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
    2490          104 :         let layer_names = job_desc
    2491          104 :             .selected_layers
    2492          104 :             .iter()
    2493          304 :             .map(|layer| layer.layer_desc().layer_name())
    2494          104 :             .collect_vec();
    2495          104 :         if let Some(err) = check_valid_layermap(&layer_names) {
    2496            0 :             bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
    2497          104 :         }
    2498          104 :         // The maximum LSN we are processing in this compaction loop
    2499          104 :         let end_lsn = job_desc
    2500          104 :             .selected_layers
    2501          104 :             .iter()
    2502          304 :             .map(|l| l.layer_desc().lsn_range.end)
    2503          104 :             .max()
    2504          104 :             .unwrap();
    2505          104 :         let mut delta_layers = Vec::new();
    2506          104 :         let mut image_layers = Vec::new();
    2507          104 :         let mut downloaded_layers = Vec::new();
    2508          104 :         let mut total_downloaded_size = 0;
    2509          104 :         let mut total_layer_size = 0;
    2510          408 :         for layer in &job_desc.selected_layers {
    2511          304 :             if layer.needs_download().await?.is_some() {
    2512            0 :                 total_downloaded_size += layer.layer_desc().file_size;
    2513          304 :             }
    2514          304 :             total_layer_size += layer.layer_desc().file_size;
    2515          304 :             let resident_layer = layer.download_and_keep_resident().await?;
    2516          304 :             downloaded_layers.push(resident_layer);
    2517              :         }
    2518          104 :         info!(
    2519            0 :             "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
    2520            0 :             total_downloaded_size,
    2521            0 :             total_layer_size,
    2522            0 :             total_downloaded_size as f64 / total_layer_size as f64
    2523              :         );
    2524          408 :         for resident_layer in &downloaded_layers {
    2525          304 :             if resident_layer.layer_desc().is_delta() {
    2526          172 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    2527          172 :                 delta_layers.push(layer);
    2528              :             } else {
    2529          132 :                 let layer = resident_layer.get_as_image(ctx).await?;
    2530          132 :                 image_layers.push(layer);
    2531              :             }
    2532              :         }
    2533          104 :         let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
    2534          104 :         let mut merge_iter = FilterIterator::create(
    2535          104 :             MergeIterator::create(&delta_layers, &image_layers, ctx),
    2536          104 :             dense_ks,
    2537          104 :             sparse_ks,
    2538          104 :         )?;
    2539              : 
    2540              :         // Step 2: Produce images+deltas.
    2541          104 :         let mut accumulated_values = Vec::new();
    2542          104 :         let mut last_key: Option<Key> = None;
    2543              : 
    2544              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2545              :         // when some condition meet.
    2546          104 :         let mut image_layer_writer = if !has_data_below {
    2547              :             Some(
    2548           84 :                 SplitImageLayerWriter::new(
    2549           84 :                     self.conf,
    2550           84 :                     self.timeline_id,
    2551           84 :                     self.tenant_shard_id,
    2552           84 :                     job_desc.compaction_key_range.start,
    2553           84 :                     lowest_retain_lsn,
    2554           84 :                     self.get_compaction_target_size(),
    2555           84 :                     ctx,
    2556           84 :                 )
    2557           84 :                 .await?,
    2558              :             )
    2559              :         } else {
    2560           20 :             None
    2561              :         };
    2562              : 
    2563          104 :         let mut delta_layer_writer = SplitDeltaLayerWriter::new(
    2564          104 :             self.conf,
    2565          104 :             self.timeline_id,
    2566          104 :             self.tenant_shard_id,
    2567          104 :             lowest_retain_lsn..end_lsn,
    2568          104 :             self.get_compaction_target_size(),
    2569          104 :         )
    2570          104 :         .await?;
    2571              : 
    2572              :         #[derive(Default)]
    2573              :         struct RewritingLayers {
    2574              :             before: Option<DeltaLayerWriter>,
    2575              :             after: Option<DeltaLayerWriter>,
    2576              :         }
    2577          104 :         let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
    2578              : 
    2579              :         /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
    2580              :         /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
    2581              :         /// In those cases, we need to pull up data from below the LSN range we're compaction.
    2582              :         ///
    2583              :         /// This function unifies the cases so that later code doesn't have to think about it.
    2584              :         ///
    2585              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2586              :         /// is needed for reconstruction. This should be fixed in the future.
    2587              :         ///
    2588              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2589              :         /// images.
    2590         1244 :         async fn get_ancestor_image(
    2591         1244 :             this_tline: &Arc<Timeline>,
    2592         1244 :             key: Key,
    2593         1244 :             ctx: &RequestContext,
    2594         1244 :             has_data_below: bool,
    2595         1244 :             history_lsn_point: Lsn,
    2596         1244 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2597         1244 :             if !has_data_below {
    2598         1168 :                 return Ok(None);
    2599           76 :             };
    2600              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2601              :             // as much existing code as possible.
    2602           76 :             let img = this_tline.get(key, history_lsn_point, ctx).await?;
    2603           76 :             Ok(Some((key, history_lsn_point, img)))
    2604         1244 :         }
    2605              : 
    2606              :         // Actually, we can decide not to write to the image layer at all at this point because
    2607              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2608              :         // create this writer, and discard the writer in the end.
    2609              : 
    2610         1932 :         while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
    2611         1828 :             if cancel.is_cancelled() {
    2612            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2613         1828 :             }
    2614         1828 :             if self.shard_identity.is_key_disposable(&key) {
    2615              :                 // If this shard does not need to store this key, simply skip it.
    2616              :                 //
    2617              :                 // This is not handled in the filter iterator because shard is determined by hash.
    2618              :                 // Therefore, it does not give us any performance benefit to do things like skip
    2619              :                 // a whole layer file as handling key spaces (ranges).
    2620            0 :                 if cfg!(debug_assertions) {
    2621            0 :                     let shard = self.shard_identity.shard_index();
    2622            0 :                     let owner = self.shard_identity.get_shard_number(&key);
    2623            0 :                     panic!("key {key} does not belong on shard {shard}, owned by {owner}");
    2624            0 :                 }
    2625            0 :                 continue;
    2626         1828 :             }
    2627         1828 :             if !job_desc.compaction_key_range.contains(&key) {
    2628          128 :                 if !desc.is_delta {
    2629          120 :                     continue;
    2630            8 :                 }
    2631            8 :                 let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
    2632            8 :                 let rewriter = if key < job_desc.compaction_key_range.start {
    2633            0 :                     if rewriter.before.is_none() {
    2634            0 :                         rewriter.before = Some(
    2635            0 :                             DeltaLayerWriter::new(
    2636            0 :                                 self.conf,
    2637            0 :                                 self.timeline_id,
    2638            0 :                                 self.tenant_shard_id,
    2639            0 :                                 desc.key_range.start,
    2640            0 :                                 desc.lsn_range.clone(),
    2641            0 :                                 ctx,
    2642            0 :                             )
    2643            0 :                             .await?,
    2644              :                         );
    2645            0 :                     }
    2646            0 :                     rewriter.before.as_mut().unwrap()
    2647            8 :                 } else if key >= job_desc.compaction_key_range.end {
    2648            8 :                     if rewriter.after.is_none() {
    2649            4 :                         rewriter.after = Some(
    2650            4 :                             DeltaLayerWriter::new(
    2651            4 :                                 self.conf,
    2652            4 :                                 self.timeline_id,
    2653            4 :                                 self.tenant_shard_id,
    2654            4 :                                 job_desc.compaction_key_range.end,
    2655            4 :                                 desc.lsn_range.clone(),
    2656            4 :                                 ctx,
    2657            4 :                             )
    2658            4 :                             .await?,
    2659              :                         );
    2660            4 :                     }
    2661            8 :                     rewriter.after.as_mut().unwrap()
    2662              :                 } else {
    2663            0 :                     unreachable!()
    2664              :                 };
    2665            8 :                 rewriter.put_value(key, lsn, val, ctx).await?;
    2666            8 :                 continue;
    2667         1700 :             }
    2668         1700 :             match val {
    2669         1220 :                 Value::Image(_) => stat.visit_image_key(&val),
    2670          480 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2671              :             }
    2672         1700 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2673          560 :                 if last_key.is_none() {
    2674          104 :                     last_key = Some(key);
    2675          456 :                 }
    2676          560 :                 accumulated_values.push((key, lsn, val));
    2677              :             } else {
    2678         1140 :                 let last_key: &mut Key = last_key.as_mut().unwrap();
    2679         1140 :                 stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
    2680         1140 :                 let retention = self
    2681         1140 :                     .generate_key_retention(
    2682         1140 :                         *last_key,
    2683         1140 :                         &accumulated_values,
    2684         1140 :                         job_desc.gc_cutoff,
    2685         1140 :                         &job_desc.retain_lsns_below_horizon,
    2686         1140 :                         COMPACTION_DELTA_THRESHOLD,
    2687         1140 :                         get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
    2688         1140 :                             .await?,
    2689              :                     )
    2690         1140 :                     .await?;
    2691         1140 :                 retention
    2692         1140 :                     .pipe_to(
    2693         1140 :                         *last_key,
    2694         1140 :                         &mut delta_layer_writer,
    2695         1140 :                         image_layer_writer.as_mut(),
    2696         1140 :                         &mut stat,
    2697         1140 :                         ctx,
    2698         1140 :                     )
    2699         1140 :                     .await?;
    2700         1140 :                 accumulated_values.clear();
    2701         1140 :                 *last_key = key;
    2702         1140 :                 accumulated_values.push((key, lsn, val));
    2703              :             }
    2704              :         }
    2705              : 
    2706              :         // TODO: move the below part to the loop body
    2707          104 :         let last_key = last_key.expect("no keys produced during compaction");
    2708          104 :         stat.on_unique_key_visited();
    2709              : 
    2710          104 :         let retention = self
    2711          104 :             .generate_key_retention(
    2712          104 :                 last_key,
    2713          104 :                 &accumulated_values,
    2714          104 :                 job_desc.gc_cutoff,
    2715          104 :                 &job_desc.retain_lsns_below_horizon,
    2716          104 :                 COMPACTION_DELTA_THRESHOLD,
    2717          104 :                 get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
    2718              :             )
    2719          104 :             .await?;
    2720          104 :         retention
    2721          104 :             .pipe_to(
    2722          104 :                 last_key,
    2723          104 :                 &mut delta_layer_writer,
    2724          104 :                 image_layer_writer.as_mut(),
    2725          104 :                 &mut stat,
    2726          104 :                 ctx,
    2727          104 :             )
    2728          104 :             .await?;
    2729              :         // end: move the above part to the loop body
    2730              : 
    2731          104 :         let mut rewrote_delta_layers = Vec::new();
    2732          108 :         for (key, writers) in delta_layer_rewriters {
    2733            4 :             if let Some(delta_writer_before) = writers.before {
    2734            0 :                 let (desc, path) = delta_writer_before
    2735            0 :                     .finish(job_desc.compaction_key_range.start, ctx)
    2736            0 :                     .await?;
    2737            0 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2738            0 :                 rewrote_delta_layers.push(layer);
    2739            4 :             }
    2740            4 :             if let Some(delta_writer_after) = writers.after {
    2741            4 :                 let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
    2742            4 :                 let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
    2743            4 :                 rewrote_delta_layers.push(layer);
    2744            0 :             }
    2745              :         }
    2746              : 
    2747          148 :         let discard = |key: &PersistentLayerKey| {
    2748          148 :             let key = key.clone();
    2749          148 :             async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
    2750          148 :         };
    2751              : 
    2752          104 :         let produced_image_layers = if let Some(writer) = image_layer_writer {
    2753           84 :             if !dry_run {
    2754           76 :                 let end_key = job_desc.compaction_key_range.end;
    2755           76 :                 writer
    2756           76 :                     .finish_with_discard_fn(self, ctx, end_key, discard)
    2757           76 :                     .await?
    2758              :             } else {
    2759            8 :                 drop(writer);
    2760            8 :                 Vec::new()
    2761              :             }
    2762              :         } else {
    2763           20 :             Vec::new()
    2764              :         };
    2765              : 
    2766          104 :         let produced_delta_layers = if !dry_run {
    2767           96 :             delta_layer_writer
    2768           96 :                 .finish_with_discard_fn(self, ctx, discard)
    2769           96 :                 .await?
    2770              :         } else {
    2771            8 :             drop(delta_layer_writer);
    2772            8 :             Vec::new()
    2773              :         };
    2774              : 
    2775              :         // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
    2776              :         // compaction is cancelled at this point, we might have some layers that are not cleaned up.
    2777          104 :         let mut compact_to = Vec::new();
    2778          104 :         let mut keep_layers = HashSet::new();
    2779          104 :         let produced_delta_layers_len = produced_delta_layers.len();
    2780          104 :         let produced_image_layers_len = produced_image_layers.len();
    2781          176 :         for action in produced_delta_layers {
    2782           72 :             match action {
    2783           44 :                 BatchWriterResult::Produced(layer) => {
    2784           44 :                     if cfg!(debug_assertions) {
    2785           44 :                         info!("produced delta layer: {}", layer.layer_desc().key());
    2786            0 :                     }
    2787           44 :                     stat.produce_delta_layer(layer.layer_desc().file_size());
    2788           44 :                     compact_to.push(layer);
    2789              :                 }
    2790           28 :                 BatchWriterResult::Discarded(l) => {
    2791           28 :                     if cfg!(debug_assertions) {
    2792           28 :                         info!("discarded delta layer: {}", l);
    2793            0 :                     }
    2794           28 :                     keep_layers.insert(l);
    2795           28 :                     stat.discard_delta_layer();
    2796              :                 }
    2797              :             }
    2798              :         }
    2799          108 :         for layer in &rewrote_delta_layers {
    2800            4 :             debug!(
    2801            0 :                 "produced rewritten delta layer: {}",
    2802            0 :                 layer.layer_desc().key()
    2803              :             );
    2804              :         }
    2805          104 :         compact_to.extend(rewrote_delta_layers);
    2806          180 :         for action in produced_image_layers {
    2807           76 :             match action {
    2808           60 :                 BatchWriterResult::Produced(layer) => {
    2809           60 :                     debug!("produced image layer: {}", layer.layer_desc().key());
    2810           60 :                     stat.produce_image_layer(layer.layer_desc().file_size());
    2811           60 :                     compact_to.push(layer);
    2812              :                 }
    2813           16 :                 BatchWriterResult::Discarded(l) => {
    2814           16 :                     debug!("discarded image layer: {}", l);
    2815           16 :                     keep_layers.insert(l);
    2816           16 :                     stat.discard_image_layer();
    2817              :                 }
    2818              :             }
    2819              :         }
    2820              : 
    2821          104 :         let mut layer_selection = job_desc.selected_layers;
    2822              : 
    2823              :         // Partial compaction might select more data than it processes, e.g., if
    2824              :         // the compaction_key_range only partially overlaps:
    2825              :         //
    2826              :         //         [---compaction_key_range---]
    2827              :         //   [---A----][----B----][----C----][----D----]
    2828              :         //
    2829              :         // For delta layers, we will rewrite the layers so that it is cut exactly at
    2830              :         // the compaction key range, so we can always discard them. However, for image
    2831              :         // layers, as we do not rewrite them for now, we need to handle them differently.
    2832              :         // Assume image layers  A, B, C, D are all in the `layer_selection`.
    2833              :         //
    2834              :         // The created image layers contain whatever is needed from B, C, and from
    2835              :         // `----]` of A, and from  `[---` of D.
    2836              :         //
    2837              :         // In contrast, `[---A` and `D----]` have not been processed, so, we must
    2838              :         // keep that data.
    2839              :         //
    2840              :         // The solution for now is to keep A and D completely if they are image layers.
    2841              :         // (layer_selection is what we'll remove from the layer map, so, retain what
    2842              :         // is _not_ fully covered by compaction_key_range).
    2843          408 :         for layer in &layer_selection {
    2844          304 :             if !layer.layer_desc().is_delta() {
    2845          132 :                 if !overlaps_with(
    2846          132 :                     &layer.layer_desc().key_range,
    2847          132 :                     &job_desc.compaction_key_range,
    2848          132 :                 ) {
    2849            0 :                     bail!("violated constraint: image layer outside of compaction key range");
    2850          132 :                 }
    2851          132 :                 if !fully_contains(
    2852          132 :                     &job_desc.compaction_key_range,
    2853          132 :                     &layer.layer_desc().key_range,
    2854          132 :                 ) {
    2855           16 :                     keep_layers.insert(layer.layer_desc().key());
    2856          116 :                 }
    2857          172 :             }
    2858              :         }
    2859              : 
    2860          304 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2861          104 : 
    2862          104 :         info!(
    2863            0 :             "gc-compaction statistics: {}",
    2864            0 :             serde_json::to_string(&stat)?
    2865              :         );
    2866              : 
    2867          104 :         if dry_run {
    2868            8 :             return Ok(());
    2869           96 :         }
    2870           96 : 
    2871           96 :         info!(
    2872            0 :             "produced {} delta layers and {} image layers, {} layers are kept",
    2873            0 :             produced_delta_layers_len,
    2874            0 :             produced_image_layers_len,
    2875            0 :             keep_layers.len()
    2876              :         );
    2877              : 
    2878              :         // Step 3: Place back to the layer map.
    2879              : 
    2880              :         // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
    2881           96 :         let all_layers = {
    2882           96 :             let guard = self.layers.read().await;
    2883           96 :             let layer_map = guard.layer_map()?;
    2884           96 :             layer_map.iter_historic_layers().collect_vec()
    2885           96 :         };
    2886           96 : 
    2887           96 :         let mut final_layers = all_layers
    2888           96 :             .iter()
    2889          428 :             .map(|layer| layer.layer_name())
    2890           96 :             .collect::<HashSet<_>>();
    2891          304 :         for layer in &layer_selection {
    2892          208 :             final_layers.remove(&layer.layer_desc().layer_name());
    2893          208 :         }
    2894          204 :         for layer in &compact_to {
    2895          108 :             final_layers.insert(layer.layer_desc().layer_name());
    2896          108 :         }
    2897           96 :         let final_layers = final_layers.into_iter().collect_vec();
    2898              : 
    2899              :         // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
    2900              :         // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
    2901              :         // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
    2902           96 :         if let Some(err) = check_valid_layermap(&final_layers) {
    2903            0 :             bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
    2904           96 :         }
    2905              : 
    2906              :         // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
    2907              :         // operate on L1 layers.
    2908              :         {
    2909           96 :             let mut guard = self.layers.write().await;
    2910           96 :             guard
    2911           96 :                 .open_mut()?
    2912           96 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2913           96 :         };
    2914           96 : 
    2915           96 :         // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
    2916           96 :         // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
    2917           96 :         // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
    2918           96 :         // be batched into `schedule_compaction_update`.
    2919           96 :         let disk_consistent_lsn = self.disk_consistent_lsn.load();
    2920           96 :         self.schedule_uploads(disk_consistent_lsn, None)?;
    2921              :         // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
    2922              :         // of `compact_from`.
    2923           96 :         let compact_from = {
    2924           96 :             let mut compact_from = Vec::new();
    2925           96 :             let mut compact_to_set = HashMap::new();
    2926          204 :             for layer in &compact_to {
    2927          108 :                 compact_to_set.insert(layer.layer_desc().key(), layer);
    2928          108 :             }
    2929          304 :             for layer in &layer_selection {
    2930          208 :                 if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
    2931            0 :                     tracing::info!(
    2932            0 :                         "skipping delete {} because found same layer key at different generation {}",
    2933              :                         layer, to
    2934              :                     );
    2935          208 :                 } else {
    2936          208 :                     compact_from.push(layer.clone());
    2937          208 :                 }
    2938              :             }
    2939           96 :             compact_from
    2940           96 :         };
    2941           96 :         self.remote_client
    2942           96 :             .schedule_compaction_update(&compact_from, &compact_to)?;
    2943              : 
    2944           96 :         drop(gc_lock);
    2945           96 : 
    2946           96 :         Ok(())
    2947          108 :     }
    2948              : }
    2949              : 
    2950              : struct TimelineAdaptor {
    2951              :     timeline: Arc<Timeline>,
    2952              : 
    2953              :     keyspace: (Lsn, KeySpace),
    2954              : 
    2955              :     new_deltas: Vec<ResidentLayer>,
    2956              :     new_images: Vec<ResidentLayer>,
    2957              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2958              : }
    2959              : 
    2960              : impl TimelineAdaptor {
    2961            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2962            0 :         Self {
    2963            0 :             timeline: timeline.clone(),
    2964            0 :             keyspace,
    2965            0 :             new_images: Vec::new(),
    2966            0 :             new_deltas: Vec::new(),
    2967            0 :             layers_to_delete: Vec::new(),
    2968            0 :         }
    2969            0 :     }
    2970              : 
    2971            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2972            0 :         let layers_to_delete = {
    2973            0 :             let guard = self.timeline.layers.read().await;
    2974            0 :             self.layers_to_delete
    2975            0 :                 .iter()
    2976            0 :                 .map(|x| guard.get_from_desc(x))
    2977            0 :                 .collect::<Vec<Layer>>()
    2978            0 :         };
    2979            0 :         self.timeline
    2980            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2981            0 :             .await?;
    2982              : 
    2983            0 :         self.timeline
    2984            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2985              : 
    2986            0 :         self.new_deltas.clear();
    2987            0 :         self.layers_to_delete.clear();
    2988            0 :         Ok(())
    2989            0 :     }
    2990              : }
    2991              : 
    2992              : #[derive(Clone)]
    2993              : struct ResidentDeltaLayer(ResidentLayer);
    2994              : #[derive(Clone)]
    2995              : struct ResidentImageLayer(ResidentLayer);
    2996              : 
    2997              : impl CompactionJobExecutor for TimelineAdaptor {
    2998              :     type Key = pageserver_api::key::Key;
    2999              : 
    3000              :     type Layer = OwnArc<PersistentLayerDesc>;
    3001              :     type DeltaLayer = ResidentDeltaLayer;
    3002              :     type ImageLayer = ResidentImageLayer;
    3003              : 
    3004              :     type RequestContext = crate::context::RequestContext;
    3005              : 
    3006            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    3007            0 :         self.timeline.get_shard_identity()
    3008            0 :     }
    3009              : 
    3010            0 :     async fn get_layers(
    3011            0 :         &mut self,
    3012            0 :         key_range: &Range<Key>,
    3013            0 :         lsn_range: &Range<Lsn>,
    3014            0 :         _ctx: &RequestContext,
    3015            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    3016            0 :         self.flush_updates().await?;
    3017              : 
    3018            0 :         let guard = self.timeline.layers.read().await;
    3019            0 :         let layer_map = guard.layer_map()?;
    3020              : 
    3021            0 :         let result = layer_map
    3022            0 :             .iter_historic_layers()
    3023            0 :             .filter(|l| {
    3024            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    3025            0 :             })
    3026            0 :             .map(OwnArc)
    3027            0 :             .collect();
    3028            0 :         Ok(result)
    3029            0 :     }
    3030              : 
    3031            0 :     async fn get_keyspace(
    3032            0 :         &mut self,
    3033            0 :         key_range: &Range<Key>,
    3034            0 :         lsn: Lsn,
    3035            0 :         _ctx: &RequestContext,
    3036            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    3037            0 :         if lsn == self.keyspace.0 {
    3038            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    3039            0 :                 &self.keyspace.1.ranges,
    3040            0 :                 key_range,
    3041            0 :             ))
    3042              :         } else {
    3043              :             // The current compaction implementation only ever requests the key space
    3044              :             // at the compaction end LSN.
    3045            0 :             anyhow::bail!("keyspace not available for requested lsn");
    3046              :         }
    3047            0 :     }
    3048              : 
    3049            0 :     async fn downcast_delta_layer(
    3050            0 :         &self,
    3051            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3052            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    3053            0 :         // this is a lot more complex than a simple downcast...
    3054            0 :         if layer.is_delta() {
    3055            0 :             let l = {
    3056            0 :                 let guard = self.timeline.layers.read().await;
    3057            0 :                 guard.get_from_desc(layer)
    3058              :             };
    3059            0 :             let result = l.download_and_keep_resident().await?;
    3060              : 
    3061            0 :             Ok(Some(ResidentDeltaLayer(result)))
    3062              :         } else {
    3063            0 :             Ok(None)
    3064              :         }
    3065            0 :     }
    3066              : 
    3067            0 :     async fn create_image(
    3068            0 :         &mut self,
    3069            0 :         lsn: Lsn,
    3070            0 :         key_range: &Range<Key>,
    3071            0 :         ctx: &RequestContext,
    3072            0 :     ) -> anyhow::Result<()> {
    3073            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    3074            0 :     }
    3075              : 
    3076            0 :     async fn create_delta(
    3077            0 :         &mut self,
    3078            0 :         lsn_range: &Range<Lsn>,
    3079            0 :         key_range: &Range<Key>,
    3080            0 :         input_layers: &[ResidentDeltaLayer],
    3081            0 :         ctx: &RequestContext,
    3082            0 :     ) -> anyhow::Result<()> {
    3083            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3084              : 
    3085            0 :         let mut all_entries = Vec::new();
    3086            0 :         for dl in input_layers.iter() {
    3087            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    3088              :         }
    3089              : 
    3090              :         // The current stdlib sorting implementation is designed in a way where it is
    3091              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3092            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3093              : 
    3094            0 :         let mut writer = DeltaLayerWriter::new(
    3095            0 :             self.timeline.conf,
    3096            0 :             self.timeline.timeline_id,
    3097            0 :             self.timeline.tenant_shard_id,
    3098            0 :             key_range.start,
    3099            0 :             lsn_range.clone(),
    3100            0 :             ctx,
    3101            0 :         )
    3102            0 :         .await?;
    3103              : 
    3104            0 :         let mut dup_values = 0;
    3105            0 : 
    3106            0 :         // This iterator walks through all key-value pairs from all the layers
    3107            0 :         // we're compacting, in key, LSN order.
    3108            0 :         let mut prev: Option<(Key, Lsn)> = None;
    3109              :         for &DeltaEntry {
    3110            0 :             key, lsn, ref val, ..
    3111            0 :         } in all_entries.iter()
    3112              :         {
    3113            0 :             if prev == Some((key, lsn)) {
    3114              :                 // This is a duplicate. Skip it.
    3115              :                 //
    3116              :                 // It can happen if compaction is interrupted after writing some
    3117              :                 // layers but not all, and we are compacting the range again.
    3118              :                 // The calculations in the algorithm assume that there are no
    3119              :                 // duplicates, so the math on targeted file size is likely off,
    3120              :                 // and we will create smaller files than expected.
    3121            0 :                 dup_values += 1;
    3122            0 :                 continue;
    3123            0 :             }
    3124              : 
    3125            0 :             let value = val.load(ctx).await?;
    3126              : 
    3127            0 :             writer.put_value(key, lsn, value, ctx).await?;
    3128              : 
    3129            0 :             prev = Some((key, lsn));
    3130              :         }
    3131              : 
    3132            0 :         if dup_values > 0 {
    3133            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    3134            0 :         }
    3135              : 
    3136            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3137            0 :             Err(anyhow::anyhow!(
    3138            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    3139            0 :             ))
    3140            0 :         });
    3141              : 
    3142            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    3143            0 :         let new_delta_layer =
    3144            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3145              : 
    3146            0 :         self.new_deltas.push(new_delta_layer);
    3147            0 :         Ok(())
    3148            0 :     }
    3149              : 
    3150            0 :     async fn delete_layer(
    3151            0 :         &mut self,
    3152            0 :         layer: &OwnArc<PersistentLayerDesc>,
    3153            0 :         _ctx: &RequestContext,
    3154            0 :     ) -> anyhow::Result<()> {
    3155            0 :         self.layers_to_delete.push(layer.clone().0);
    3156            0 :         Ok(())
    3157            0 :     }
    3158              : }
    3159              : 
    3160              : impl TimelineAdaptor {
    3161            0 :     async fn create_image_impl(
    3162            0 :         &mut self,
    3163            0 :         lsn: Lsn,
    3164            0 :         key_range: &Range<Key>,
    3165            0 :         ctx: &RequestContext,
    3166            0 :     ) -> Result<(), CreateImageLayersError> {
    3167            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    3168              : 
    3169            0 :         let image_layer_writer = ImageLayerWriter::new(
    3170            0 :             self.timeline.conf,
    3171            0 :             self.timeline.timeline_id,
    3172            0 :             self.timeline.tenant_shard_id,
    3173            0 :             key_range,
    3174            0 :             lsn,
    3175            0 :             ctx,
    3176            0 :         )
    3177            0 :         .await?;
    3178              : 
    3179            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    3180            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3181            0 :                 "failpoint image-layer-writer-fail-before-finish"
    3182            0 :             )))
    3183            0 :         });
    3184              : 
    3185            0 :         let keyspace = KeySpace {
    3186            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    3187              :         };
    3188              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    3189            0 :         let start = Key::MIN;
    3190              :         let ImageLayerCreationOutcome {
    3191            0 :             unfinished_image_layer,
    3192              :             next_start_key: _,
    3193            0 :         } = self
    3194            0 :             .timeline
    3195            0 :             .create_image_layer_for_rel_blocks(
    3196            0 :                 &keyspace,
    3197            0 :                 image_layer_writer,
    3198            0 :                 lsn,
    3199            0 :                 ctx,
    3200            0 :                 key_range.clone(),
    3201            0 :                 start,
    3202            0 :                 IoConcurrency::sequential(),
    3203            0 :             )
    3204            0 :             .await?;
    3205              : 
    3206            0 :         if let Some(image_layer_writer) = unfinished_image_layer {
    3207            0 :             let (desc, path) = image_layer_writer.finish(ctx).await?;
    3208            0 :             let image_layer =
    3209            0 :                 Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    3210            0 :             self.new_images.push(image_layer);
    3211            0 :         }
    3212              : 
    3213            0 :         timer.stop_and_record();
    3214            0 : 
    3215            0 :         Ok(())
    3216            0 :     }
    3217              : }
    3218              : 
    3219              : impl CompactionRequestContext for crate::context::RequestContext {}
    3220              : 
    3221              : #[derive(Debug, Clone)]
    3222              : pub struct OwnArc<T>(pub Arc<T>);
    3223              : 
    3224              : impl<T> Deref for OwnArc<T> {
    3225              :     type Target = <Arc<T> as Deref>::Target;
    3226            0 :     fn deref(&self) -> &Self::Target {
    3227            0 :         &self.0
    3228            0 :     }
    3229              : }
    3230              : 
    3231              : impl<T> AsRef<T> for OwnArc<T> {
    3232            0 :     fn as_ref(&self) -> &T {
    3233            0 :         self.0.as_ref()
    3234            0 :     }
    3235              : }
    3236              : 
    3237              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    3238            0 :     fn key_range(&self) -> &Range<Key> {
    3239            0 :         &self.key_range
    3240            0 :     }
    3241            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3242            0 :         &self.lsn_range
    3243            0 :     }
    3244            0 :     fn file_size(&self) -> u64 {
    3245            0 :         self.file_size
    3246            0 :     }
    3247            0 :     fn short_id(&self) -> std::string::String {
    3248            0 :         self.as_ref().short_id().to_string()
    3249            0 :     }
    3250            0 :     fn is_delta(&self) -> bool {
    3251            0 :         self.as_ref().is_delta()
    3252            0 :     }
    3253              : }
    3254              : 
    3255              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    3256            0 :     fn key_range(&self) -> &Range<Key> {
    3257            0 :         &self.layer_desc().key_range
    3258            0 :     }
    3259            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3260            0 :         &self.layer_desc().lsn_range
    3261            0 :     }
    3262            0 :     fn file_size(&self) -> u64 {
    3263            0 :         self.layer_desc().file_size
    3264            0 :     }
    3265            0 :     fn short_id(&self) -> std::string::String {
    3266            0 :         self.layer_desc().short_id().to_string()
    3267            0 :     }
    3268            0 :     fn is_delta(&self) -> bool {
    3269            0 :         true
    3270            0 :     }
    3271              : }
    3272              : 
    3273              : use crate::tenant::timeline::DeltaEntry;
    3274              : 
    3275              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    3276            0 :     fn key_range(&self) -> &Range<Key> {
    3277            0 :         &self.0.layer_desc().key_range
    3278            0 :     }
    3279            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3280            0 :         &self.0.layer_desc().lsn_range
    3281            0 :     }
    3282            0 :     fn file_size(&self) -> u64 {
    3283            0 :         self.0.layer_desc().file_size
    3284            0 :     }
    3285            0 :     fn short_id(&self) -> std::string::String {
    3286            0 :         self.0.layer_desc().short_id().to_string()
    3287            0 :     }
    3288            0 :     fn is_delta(&self) -> bool {
    3289            0 :         true
    3290            0 :     }
    3291              : }
    3292              : 
    3293              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    3294              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    3295              : 
    3296            0 :     async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    3297            0 :         self.0.get_as_delta(ctx).await?.index_entries(ctx).await
    3298            0 :     }
    3299              : }
    3300              : 
    3301              : impl CompactionLayer<Key> for ResidentImageLayer {
    3302            0 :     fn key_range(&self) -> &Range<Key> {
    3303            0 :         &self.0.layer_desc().key_range
    3304            0 :     }
    3305            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    3306            0 :         &self.0.layer_desc().lsn_range
    3307            0 :     }
    3308            0 :     fn file_size(&self) -> u64 {
    3309            0 :         self.0.layer_desc().file_size
    3310            0 :     }
    3311            0 :     fn short_id(&self) -> std::string::String {
    3312            0 :         self.0.layer_desc().short_id().to_string()
    3313            0 :     }
    3314            0 :     fn is_delta(&self) -> bool {
    3315            0 :         false
    3316            0 :     }
    3317              : }
    3318              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta