LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 63.3 % 1823 1154
Test Date: 2024-08-21 17:32:46 Functions: 34.2 % 155 53

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::key::KEY_SIZE;
      23              : use pageserver_api::keyspace::ShardedRange;
      24              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      25              : use serde::Serialize;
      26              : use tokio_util::sync::CancellationToken;
      27              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      28              : use utils::id::TimelineId;
      29              : 
      30              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      31              : use crate::page_cache;
      32              : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
      33              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      34              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      35              : use crate::tenant::storage_layer::{
      36              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      37              : };
      38              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      39              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      40              : use crate::tenant::timeline::{Layer, ResidentLayer};
      41              : use crate::tenant::DeltaLayer;
      42              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      43              : 
      44              : use crate::keyspace::KeySpace;
      45              : use crate::repository::{Key, Value};
      46              : use crate::walrecord::NeonWalRecord;
      47              : 
      48              : use utils::lsn::Lsn;
      49              : 
      50              : use pageserver_compaction::helpers::overlaps_with;
      51              : use pageserver_compaction::interface::*;
      52              : 
      53              : use super::CompactionError;
      54              : 
      55              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      56              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      57              : 
      58              : /// The result of bottom-most compaction for a single key at each LSN.
      59              : #[derive(Debug)]
      60              : #[cfg_attr(test, derive(PartialEq))]
      61              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
      62              : 
      63              : /// The result of bottom-most compaction.
      64              : #[derive(Debug)]
      65              : #[cfg_attr(test, derive(PartialEq))]
      66              : pub(crate) struct KeyHistoryRetention {
      67              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
      68              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
      69              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
      70              :     pub(crate) above_horizon: KeyLogAtLsn,
      71              : }
      72              : 
      73              : impl KeyHistoryRetention {
      74          338 :     async fn pipe_to(
      75          338 :         self,
      76          338 :         key: Key,
      77          338 :         delta_writer: &mut Vec<(Key, Lsn, Value)>,
      78          338 :         mut image_writer: Option<&mut ImageLayerWriter>,
      79          338 :         stat: &mut CompactionStatistics,
      80          338 :         ctx: &RequestContext,
      81          338 :     ) -> anyhow::Result<()> {
      82          338 :         let mut first_batch = true;
      83         1050 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
      84          712 :             if first_batch {
      85          338 :                 if logs.len() == 1 && logs[0].1.is_image() {
      86          324 :                     let Value::Image(img) = &logs[0].1 else {
      87            0 :                         unreachable!()
      88              :                     };
      89          324 :                     stat.produce_image_key(img);
      90          324 :                     if let Some(image_writer) = image_writer.as_mut() {
      91          328 :                         image_writer.put_image(key, img.clone(), ctx).await?;
      92            0 :                     } else {
      93            0 :                         delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
      94            0 :                     }
      95              :                 } else {
      96           28 :                     for (lsn, val) in logs {
      97           14 :                         stat.produce_key(&val);
      98           14 :                         delta_writer.push((key, lsn, val));
      99           14 :                     }
     100              :                 }
     101          338 :                 first_batch = false;
     102              :             } else {
     103          438 :                 for (lsn, val) in logs {
     104           64 :                     stat.produce_key(&val);
     105           64 :                     delta_writer.push((key, lsn, val));
     106           64 :                 }
     107              :             }
     108              :         }
     109          338 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     110          352 :         for (lsn, val) in above_horizon_logs {
     111           14 :             stat.produce_key(&val);
     112           14 :             delta_writer.push((key, lsn, val));
     113           14 :         }
     114          338 :         Ok(())
     115          338 :     }
     116              : }
     117              : 
     118              : #[derive(Debug, Serialize, Default)]
     119              : struct CompactionStatisticsNumSize {
     120              :     num: u64,
     121              :     size: u64,
     122              : }
     123              : 
     124              : #[derive(Debug, Serialize, Default)]
     125              : pub struct CompactionStatistics {
     126              :     delta_layer_visited: CompactionStatisticsNumSize,
     127              :     image_layer_visited: CompactionStatisticsNumSize,
     128              :     delta_layer_produced: CompactionStatisticsNumSize,
     129              :     image_layer_produced: CompactionStatisticsNumSize,
     130              :     num_delta_layer_discarded: usize,
     131              :     num_image_layer_discarded: usize,
     132              :     num_unique_keys_visited: usize,
     133              :     wal_keys_visited: CompactionStatisticsNumSize,
     134              :     image_keys_visited: CompactionStatisticsNumSize,
     135              :     wal_produced: CompactionStatisticsNumSize,
     136              :     image_produced: CompactionStatisticsNumSize,
     137              : }
     138              : 
     139              : impl CompactionStatistics {
     140          530 :     fn estimated_size_of_value(val: &Value) -> usize {
     141          194 :         match val {
     142          336 :             Value::Image(img) => img.len(),
     143            0 :             Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
     144          194 :             _ => std::mem::size_of::<NeonWalRecord>(),
     145              :         }
     146          530 :     }
     147          856 :     fn estimated_size_of_key() -> usize {
     148          856 :         KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
     149          856 :     }
     150           54 :     fn visit_delta_layer(&mut self, size: u64) {
     151           54 :         self.delta_layer_visited.num += 1;
     152           54 :         self.delta_layer_visited.size += size;
     153           54 :     }
     154           26 :     fn visit_image_layer(&mut self, size: u64) {
     155           26 :         self.image_layer_visited.num += 1;
     156           26 :         self.image_layer_visited.size += size;
     157           26 :     }
     158          338 :     fn on_unique_key_visited(&mut self) {
     159          338 :         self.num_unique_keys_visited += 1;
     160          338 :     }
     161          104 :     fn visit_wal_key(&mut self, val: &Value) {
     162          104 :         self.wal_keys_visited.num += 1;
     163          104 :         self.wal_keys_visited.size +=
     164          104 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     165          104 :     }
     166          336 :     fn visit_image_key(&mut self, val: &Value) {
     167          336 :         self.image_keys_visited.num += 1;
     168          336 :         self.image_keys_visited.size +=
     169          336 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     170          336 :     }
     171           92 :     fn produce_key(&mut self, val: &Value) {
     172           92 :         match val {
     173            2 :             Value::Image(img) => self.produce_image_key(img),
     174           90 :             Value::WalRecord(_) => self.produce_wal_key(val),
     175              :         }
     176           92 :     }
     177           90 :     fn produce_wal_key(&mut self, val: &Value) {
     178           90 :         self.wal_produced.num += 1;
     179           90 :         self.wal_produced.size +=
     180           90 :             Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
     181           90 :     }
     182          326 :     fn produce_image_key(&mut self, val: &Bytes) {
     183          326 :         self.image_produced.num += 1;
     184          326 :         self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
     185          326 :     }
     186           26 :     fn discard_delta_layer(&mut self) {
     187           26 :         self.num_delta_layer_discarded += 1;
     188           26 :     }
     189            6 :     fn discard_image_layer(&mut self) {
     190            6 :         self.num_image_layer_discarded += 1;
     191            6 :     }
     192           32 :     fn produce_delta_layer(&mut self, size: u64) {
     193           32 :         self.delta_layer_produced.num += 1;
     194           32 :         self.delta_layer_produced.size += size;
     195           32 :     }
     196           12 :     fn produce_image_layer(&mut self, size: u64) {
     197           12 :         self.image_layer_produced.num += 1;
     198           12 :         self.image_layer_produced.size += size;
     199           12 :     }
     200              : }
     201              : 
     202              : impl Timeline {
     203              :     /// TODO: cancellation
     204              :     ///
     205              :     /// Returns whether the compaction has pending tasks.
     206          364 :     pub(crate) async fn compact_legacy(
     207          364 :         self: &Arc<Self>,
     208          364 :         cancel: &CancellationToken,
     209          364 :         flags: EnumSet<CompactFlags>,
     210          364 :         ctx: &RequestContext,
     211          364 :     ) -> Result<bool, CompactionError> {
     212          364 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
     213            0 :             self.compact_with_gc(cancel, flags, ctx)
     214            0 :                 .await
     215            0 :                 .map_err(CompactionError::Other)?;
     216            0 :             return Ok(false);
     217          364 :         }
     218          364 : 
     219          364 :         if flags.contains(CompactFlags::DryRun) {
     220            0 :             return Err(CompactionError::Other(anyhow!(
     221            0 :                 "dry-run mode is not supported for legacy compaction for now"
     222            0 :             )));
     223          364 :         }
     224          364 : 
     225          364 :         // High level strategy for compaction / image creation:
     226          364 :         //
     227          364 :         // 1. First, calculate the desired "partitioning" of the
     228          364 :         // currently in-use key space. The goal is to partition the
     229          364 :         // key space into roughly fixed-size chunks, but also take into
     230          364 :         // account any existing image layers, and try to align the
     231          364 :         // chunk boundaries with the existing image layers to avoid
     232          364 :         // too much churn. Also try to align chunk boundaries with
     233          364 :         // relation boundaries.  In principle, we don't know about
     234          364 :         // relation boundaries here, we just deal with key-value
     235          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     236          364 :         // map relations into key-value pairs. But in practice we know
     237          364 :         // that 'field6' is the block number, and the fields 1-5
     238          364 :         // identify a relation. This is just an optimization,
     239          364 :         // though.
     240          364 :         //
     241          364 :         // 2. Once we know the partitioning, for each partition,
     242          364 :         // decide if it's time to create a new image layer. The
     243          364 :         // criteria is: there has been too much "churn" since the last
     244          364 :         // image layer? The "churn" is fuzzy concept, it's a
     245          364 :         // combination of too many delta files, or too much WAL in
     246          364 :         // total in the delta file. Or perhaps: if creating an image
     247          364 :         // file would allow to delete some older files.
     248          364 :         //
     249          364 :         // 3. After that, we compact all level0 delta files if there
     250          364 :         // are too many of them.  While compacting, we also garbage
     251          364 :         // collect any page versions that are no longer needed because
     252          364 :         // of the new image layers we created in step 2.
     253          364 :         //
     254          364 :         // TODO: This high level strategy hasn't been implemented yet.
     255          364 :         // Below are functions compact_level0() and create_image_layers()
     256          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     257          364 :         // above. Rewrite it.
     258          364 : 
     259          364 :         // Is the timeline being deleted?
     260          364 :         if self.is_stopping() {
     261            0 :             trace!("Dropping out of compaction on timeline shutdown");
     262            0 :             return Err(CompactionError::ShuttingDown);
     263          364 :         }
     264          364 : 
     265          364 :         let target_file_size = self.get_checkpoint_distance();
     266              : 
     267              :         // Define partitioning schema if needed
     268              : 
     269              :         // FIXME: the match should only cover repartitioning, not the next steps
     270          364 :         let (partition_count, has_pending_tasks) = match self
     271          364 :             .repartition(
     272          364 :                 self.get_last_record_lsn(),
     273          364 :                 self.get_compaction_target_size(),
     274          364 :                 flags,
     275          364 :                 ctx,
     276          364 :             )
     277        16406 :             .await
     278              :         {
     279          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     280          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     281          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     282          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     283          364 :                     .build();
     284          364 : 
     285          364 :                 // 2. Compact
     286          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     287        45831 :                 let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
     288          364 :                 timer.stop_and_record();
     289          364 : 
     290          364 :                 let mut partitioning = dense_partitioning;
     291          364 :                 partitioning
     292          364 :                     .parts
     293          364 :                     .extend(sparse_partitioning.into_dense().parts);
     294          364 : 
     295          364 :                 // 3. Create new image layers for partitions that have been modified
     296          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     297          364 :                 if fully_compacted {
     298          364 :                     let image_layers = self
     299          364 :                         .create_image_layers(
     300          364 :                             &partitioning,
     301          364 :                             lsn,
     302          364 :                             if flags.contains(CompactFlags::ForceImageLayerCreation) {
     303           14 :                                 ImageLayerCreationMode::Force
     304              :                             } else {
     305          350 :                                 ImageLayerCreationMode::Try
     306              :                             },
     307          364 :                             &image_ctx,
     308              :                         )
     309        14517 :                         .await?;
     310              : 
     311          364 :                     self.upload_new_image_layers(image_layers)?;
     312              :                 } else {
     313            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     314              :                 }
     315          364 :                 (partitioning.parts.len(), !fully_compacted)
     316              :             }
     317            0 :             Err(err) => {
     318            0 :                 // no partitioning? This is normal, if the timeline was just created
     319            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     320            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     321            0 :                 // error but continue.
     322            0 :                 //
     323            0 :                 // Suppress error when it's due to cancellation
     324            0 :                 if !self.cancel.is_cancelled() {
     325            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     326            0 :                 }
     327            0 :                 (1, false)
     328              :             }
     329              :         };
     330              : 
     331          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     332              :             // Limit the number of layer rewrites to the number of partitions: this means its
     333              :             // runtime should be comparable to a full round of image layer creations, rather than
     334              :             // being potentially much longer.
     335            0 :             let rewrite_max = partition_count;
     336            0 : 
     337            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     338          364 :         }
     339              : 
     340          364 :         Ok(has_pending_tasks)
     341          364 :     }
     342              : 
     343              :     /// Check for layers that are elegible to be rewritten:
     344              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     345              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     346              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     347              :     ///   rather not maintain compatibility with indefinitely.
     348              :     ///
     349              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     350              :     /// how much work it will try to do in each compaction pass.
     351            0 :     async fn compact_shard_ancestors(
     352            0 :         self: &Arc<Self>,
     353            0 :         rewrite_max: usize,
     354            0 :         ctx: &RequestContext,
     355            0 :     ) -> Result<(), CompactionError> {
     356            0 :         let mut drop_layers = Vec::new();
     357            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     358            0 : 
     359            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     360            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     361            0 :         // pitr_interval, for example because a branchpoint references it.
     362            0 :         //
     363            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     364            0 :         // are rewriting layers.
     365            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     366            0 : 
     367            0 :         tracing::info!(
     368            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     369            0 :             *latest_gc_cutoff,
     370            0 :             self.gc_info.read().unwrap().cutoffs.time
     371              :         );
     372              : 
     373            0 :         let layers = self.layers.read().await;
     374            0 :         for layer_desc in layers.layer_map()?.iter_historic_layers() {
     375            0 :             let layer = layers.get_from_desc(&layer_desc);
     376            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     377              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     378            0 :                 continue;
     379            0 :             }
     380            0 : 
     381            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     382            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     383            0 :             let layer_local_page_count = sharded_range.page_count();
     384            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     385            0 :             if layer_local_page_count == 0 {
     386              :                 // This ancestral layer only covers keys that belong to other shards.
     387              :                 // We include the full metadata in the log: if we had some critical bug that caused
     388              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     389            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     390            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     391              :                 );
     392              : 
     393            0 :                 if cfg!(debug_assertions) {
     394              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     395              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     396              :                     // should be !is_key_disposable()
     397            0 :                     let range = layer_desc.get_key_range();
     398            0 :                     let mut key = range.start;
     399            0 :                     while key < range.end {
     400            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     401            0 :                         key = key.next();
     402              :                     }
     403            0 :                 }
     404              : 
     405            0 :                 drop_layers.push(layer);
     406            0 :                 continue;
     407            0 :             } else if layer_local_page_count != u32::MAX
     408            0 :                 && layer_local_page_count == layer_raw_page_count
     409              :             {
     410            0 :                 debug!(%layer,
     411            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     412              :                     layer_local_page_count
     413              :                 );
     414            0 :                 continue;
     415            0 :             }
     416            0 : 
     417            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     418            0 :             if layer_local_page_count != u32::MAX
     419            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     420              :             {
     421            0 :                 debug!(%layer,
     422            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     423              :                     layer_local_page_count,
     424              :                     layer_raw_page_count
     425              :                 );
     426            0 :             }
     427              : 
     428              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     429              :             // without incurring the I/O cost of a rewrite.
     430            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     431            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     432            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     433            0 :                 continue;
     434            0 :             }
     435            0 : 
     436            0 :             if layer_desc.is_delta() {
     437              :                 // We do not yet implement rewrite of delta layers
     438            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     439            0 :                 continue;
     440            0 :             }
     441            0 : 
     442            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     443            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     444            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     445            0 :             if layer.metadata().generation == self.generation {
     446            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     447            0 :                 continue;
     448            0 :             }
     449            0 : 
     450            0 :             if layers_to_rewrite.len() >= rewrite_max {
     451            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     452            0 :                     layers_to_rewrite.len()
     453              :                 );
     454            0 :                 continue;
     455            0 :             }
     456            0 : 
     457            0 :             // Fall through: all our conditions for doing a rewrite passed.
     458            0 :             layers_to_rewrite.push(layer);
     459              :         }
     460              : 
     461              :         // Drop read lock on layer map before we start doing time-consuming I/O
     462            0 :         drop(layers);
     463            0 : 
     464            0 :         let mut replace_image_layers = Vec::new();
     465              : 
     466            0 :         for layer in layers_to_rewrite {
     467            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     468            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     469            0 :                 self.conf,
     470            0 :                 self.timeline_id,
     471            0 :                 self.tenant_shard_id,
     472            0 :                 &layer.layer_desc().key_range,
     473            0 :                 layer.layer_desc().image_layer_lsn(),
     474            0 :                 ctx,
     475            0 :             )
     476            0 :             .await
     477            0 :             .map_err(CompactionError::Other)?;
     478              : 
     479              :             // Safety of layer rewrites:
     480              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     481              :             //   cannot interfere with the new one.
     482              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     483              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     484              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     485              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     486              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     487              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     488              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     489              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     490              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     491              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     492            0 :             let resident = layer.download_and_keep_resident().await?;
     493              : 
     494            0 :             let keys_written = resident
     495            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     496            0 :                 .await?;
     497              : 
     498            0 :             if keys_written > 0 {
     499            0 :                 let new_layer = image_layer_writer
     500            0 :                     .finish(self, ctx)
     501            0 :                     .await
     502            0 :                     .map_err(CompactionError::Other)?;
     503            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     504            0 :                     layer.metadata().file_size,
     505            0 :                     new_layer.metadata().file_size);
     506              : 
     507            0 :                 replace_image_layers.push((layer, new_layer));
     508            0 :             } else {
     509            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     510            0 :                 // the layer has no data for us with the ShardedRange check above, but
     511            0 :                 drop_layers.push(layer);
     512            0 :             }
     513              :         }
     514              : 
     515              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     516              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     517              :         // to remote index) and be removed. This is inefficient but safe.
     518              :         fail::fail_point!("compact-shard-ancestors-localonly");
     519              : 
     520              :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     521            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     522            0 :             .await?;
     523              : 
     524              :         fail::fail_point!("compact-shard-ancestors-enqueued");
     525              : 
     526              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     527              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     528              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     529              :         // load.
     530            0 :         match self.remote_client.wait_completion().await {
     531            0 :             Ok(()) => (),
     532            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     533              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     534            0 :                 return Err(CompactionError::ShuttingDown)
     535              :             }
     536              :         }
     537              : 
     538              :         fail::fail_point!("compact-shard-ancestors-persistent");
     539              : 
     540            0 :         Ok(())
     541            0 :     }
     542              : 
     543              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     544              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     545              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     546              :     ///
     547              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     548              :     /// that we know won't be needed for reads.
     549          538 :     pub(super) async fn update_layer_visibility(
     550          538 :         &self,
     551          538 :     ) -> Result<(), super::layer_manager::Shutdown> {
     552          538 :         let head_lsn = self.get_last_record_lsn();
     553              : 
     554              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     555              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     556              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     557              :         // they will be subject to L0->L1 compaction in the near future.
     558          538 :         let layer_manager = self.layers.read().await;
     559          538 :         let layer_map = layer_manager.layer_map()?;
     560              : 
     561          538 :         let readable_points = {
     562          538 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     563          538 : 
     564          538 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     565          538 :             for (child_lsn, _child_timeline_id) in &children {
     566            0 :                 readable_points.push(*child_lsn);
     567            0 :             }
     568          538 :             readable_points.push(head_lsn);
     569          538 :             readable_points
     570          538 :         };
     571          538 : 
     572          538 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     573         3040 :         for (layer_desc, visibility) in layer_visibility {
     574         2502 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     575         2502 :             let layer = layer_manager.get_from_desc(&layer_desc);
     576         2502 :             layer.set_visibility(visibility);
     577         2502 :         }
     578              : 
     579              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     580              :         // avoid assuming that everything at a branch point is visible.
     581          538 :         drop(covered);
     582          538 :         Ok(())
     583          538 :     }
     584              : 
     585              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     586              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     587          364 :     async fn compact_level0(
     588          364 :         self: &Arc<Self>,
     589          364 :         target_file_size: u64,
     590          364 :         ctx: &RequestContext,
     591          364 :     ) -> Result<bool, CompactionError> {
     592              :         let CompactLevel0Phase1Result {
     593          364 :             new_layers,
     594          364 :             deltas_to_compact,
     595          364 :             fully_compacted,
     596              :         } = {
     597          364 :             let phase1_span = info_span!("compact_level0_phase1");
     598          364 :             let ctx = ctx.attached_child();
     599          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     600          364 :                 version: Some(2),
     601          364 :                 tenant_id: Some(self.tenant_shard_id),
     602          364 :                 timeline_id: Some(self.timeline_id),
     603          364 :                 ..Default::default()
     604          364 :             };
     605          364 : 
     606          364 :             let begin = tokio::time::Instant::now();
     607          364 :             let phase1_layers_locked = self.layers.read().await;
     608          364 :             let now = tokio::time::Instant::now();
     609          364 :             stats.read_lock_acquisition_micros =
     610          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     611          364 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     612          364 :                 .instrument(phase1_span)
     613        45831 :                 .await?
     614              :         };
     615              : 
     616          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     617              :             // nothing to do
     618          336 :             return Ok(true);
     619           28 :         }
     620           28 : 
     621           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     622            0 :             .await?;
     623           28 :         Ok(fully_compacted)
     624          364 :     }
     625              : 
     626              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     627          364 :     async fn compact_level0_phase1<'a>(
     628          364 :         self: &'a Arc<Self>,
     629          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     630          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     631          364 :         target_file_size: u64,
     632          364 :         ctx: &RequestContext,
     633          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     634          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     635          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     636          364 :         let layers = guard.layer_map()?;
     637          364 :         let level0_deltas = layers.level0_deltas();
     638          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     639          364 : 
     640          364 :         // Only compact if enough layers have accumulated.
     641          364 :         let threshold = self.get_compaction_threshold();
     642          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     643          336 :             debug!(
     644            0 :                 level0_deltas = level0_deltas.len(),
     645            0 :                 threshold, "too few deltas to compact"
     646              :             );
     647          336 :             return Ok(CompactLevel0Phase1Result::default());
     648           28 :         }
     649           28 : 
     650           28 :         let mut level0_deltas = level0_deltas
     651           28 :             .iter()
     652          402 :             .map(|x| guard.get_from_desc(x))
     653           28 :             .collect::<Vec<_>>();
     654           28 : 
     655           28 :         // Gather the files to compact in this iteration.
     656           28 :         //
     657           28 :         // Start with the oldest Level 0 delta file, and collect any other
     658           28 :         // level 0 files that form a contiguous sequence, such that the end
     659           28 :         // LSN of previous file matches the start LSN of the next file.
     660           28 :         //
     661           28 :         // Note that if the files don't form such a sequence, we might
     662           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     663           28 :         // us to get rid of the level 0 file, and compact the other files on
     664           28 :         // the next iteration. This could probably made smarter, but such
     665           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     666           28 :         // of a crash, partial download from cloud storage, or something like
     667           28 :         // that, so it's not a big deal in practice.
     668          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     669           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     670           28 : 
     671           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     672           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     673           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     674           28 : 
     675           28 :         // Accumulate the size of layers in `deltas_to_compact`
     676           28 :         let mut deltas_to_compact_bytes = 0;
     677           28 : 
     678           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     679           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     680           28 :         // work in this function to only operate on this much delta data at once.
     681           28 :         //
     682           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     683           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     684           28 :         // still let them compact a full stack of L0s in one go.
     685           28 :         let delta_size_limit = std::cmp::max(
     686           28 :             self.get_compaction_threshold(),
     687           28 :             DEFAULT_COMPACTION_THRESHOLD,
     688           28 :         ) as u64
     689           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     690           28 : 
     691           28 :         let mut fully_compacted = true;
     692           28 : 
     693           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     694          402 :         for l in level0_deltas_iter {
     695          374 :             let lsn_range = &l.layer_desc().lsn_range;
     696          374 : 
     697          374 :             if lsn_range.start != prev_lsn_end {
     698            0 :                 break;
     699          374 :             }
     700          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     701          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     702          374 :             prev_lsn_end = lsn_range.end;
     703          374 : 
     704          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     705            0 :                 info!(
     706            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     707            0 :                     l0_deltas_total = level0_deltas.len(),
     708            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     709              :                     delta_size_limit
     710              :                 );
     711            0 :                 fully_compacted = false;
     712            0 : 
     713            0 :                 // Proceed with compaction, but only a subset of L0s
     714            0 :                 break;
     715          374 :             }
     716              :         }
     717           28 :         let lsn_range = Range {
     718           28 :             start: deltas_to_compact
     719           28 :                 .first()
     720           28 :                 .unwrap()
     721           28 :                 .layer_desc()
     722           28 :                 .lsn_range
     723           28 :                 .start,
     724           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     725           28 :         };
     726           28 : 
     727           28 :         info!(
     728            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     729            0 :             lsn_range.start,
     730            0 :             lsn_range.end,
     731            0 :             deltas_to_compact.len(),
     732            0 :             level0_deltas.len()
     733              :         );
     734              : 
     735          402 :         for l in deltas_to_compact.iter() {
     736          402 :             info!("compact includes {l}");
     737              :         }
     738              : 
     739              :         // We don't need the original list of layers anymore. Drop it so that
     740              :         // we don't accidentally use it later in the function.
     741           28 :         drop(level0_deltas);
     742           28 : 
     743           28 :         stats.read_lock_held_prerequisites_micros = stats
     744           28 :             .read_lock_held_spawn_blocking_startup_micros
     745           28 :             .till_now();
     746              : 
     747              :         // TODO: replace with streaming k-merge
     748           28 :         let all_keys = {
     749           28 :             let mut all_keys = Vec::new();
     750          402 :             for l in deltas_to_compact.iter() {
     751          402 :                 if self.cancel.is_cancelled() {
     752            0 :                     return Err(CompactionError::ShuttingDown);
     753          402 :                 }
     754         2364 :                 all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
     755              :             }
     756              :             // The current stdlib sorting implementation is designed in a way where it is
     757              :             // particularly fast where the slice is made up of sorted sub-ranges.
     758      4431736 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     759           28 :             all_keys
     760           28 :         };
     761           28 : 
     762           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     763              : 
     764              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     765              :         //
     766              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     767              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     768              :         // cover the hole, but actually don't contain any WAL records for that key range.
     769              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     770              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     771              :         //
     772              :         // The algorithm chooses holes as follows.
     773              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     774              :         // - Filter: min threshold on range length
     775              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     776              :         //
     777              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     778              :         #[derive(PartialEq, Eq)]
     779              :         struct Hole {
     780              :             key_range: Range<Key>,
     781              :             coverage_size: usize,
     782              :         }
     783           28 :         let holes: Vec<Hole> = {
     784              :             use std::cmp::Ordering;
     785              :             impl Ord for Hole {
     786            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     787            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     788            0 :                 }
     789              :             }
     790              :             impl PartialOrd for Hole {
     791            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     792            0 :                     Some(self.cmp(other))
     793            0 :                 }
     794              :             }
     795           28 :             let max_holes = deltas_to_compact.len();
     796           28 :             let last_record_lsn = self.get_last_record_lsn();
     797           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     798           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     799           28 :                                             // min-heap (reserve space for one more element added before eviction)
     800           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     801           28 :             let mut prev: Option<Key> = None;
     802              : 
     803      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     804      2064038 :                 if let Some(prev_key) = prev {
     805              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     806              :                     // compaction is the gap between data key and metadata keys.
     807      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     808            0 :                         && !Key::is_metadata_key(&prev_key)
     809              :                     {
     810            0 :                         let key_range = prev_key..next_key;
     811            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     812            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     813            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     814            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     815            0 :                         let coverage_size =
     816            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     817            0 :                         if coverage_size >= min_hole_coverage_size {
     818            0 :                             heap.push(Hole {
     819            0 :                                 key_range,
     820            0 :                                 coverage_size,
     821            0 :                             });
     822            0 :                             if heap.len() > max_holes {
     823            0 :                                 heap.pop(); // remove smallest hole
     824            0 :                             }
     825            0 :                         }
     826      2064010 :                     }
     827           28 :                 }
     828      2064038 :                 prev = Some(next_key.next());
     829              :             }
     830           28 :             let mut holes = heap.into_vec();
     831           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
     832           28 :             holes
     833           28 :         };
     834           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     835           28 :         drop_rlock(guard);
     836           28 : 
     837           28 :         if self.cancel.is_cancelled() {
     838            0 :             return Err(CompactionError::ShuttingDown);
     839           28 :         }
     840           28 : 
     841           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     842              : 
     843              :         // This iterator walks through all key-value pairs from all the layers
     844              :         // we're compacting, in key, LSN order.
     845              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
     846              :         // then the Value::Image is ordered before Value::WalRecord.
     847              :         //
     848              :         // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
     849              :         // option and validation code once we've reached confidence.
     850              :         enum AllValuesIter<'a> {
     851              :             PageCachedBlobIo {
     852              :                 all_keys_iter: VecIter<'a>,
     853              :             },
     854              :             StreamingKmergeBypassingPageCache {
     855              :                 merge_iter: MergeIterator<'a>,
     856              :             },
     857              :             ValidatingStreamingKmergeBypassingPageCache {
     858              :                 mode: CompactL0BypassPageCacheValidation,
     859              :                 merge_iter: MergeIterator<'a>,
     860              :                 all_keys_iter: VecIter<'a>,
     861              :             },
     862              :         }
     863              :         type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
     864              :         impl AllValuesIter<'_> {
     865      2064066 :             async fn next_all_keys_iter(
     866      2064066 :                 iter: &mut VecIter<'_>,
     867      2064066 :                 ctx: &RequestContext,
     868      2064066 :             ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     869              :                 let Some(DeltaEntry {
     870      2064038 :                     key,
     871      2064038 :                     lsn,
     872      2064038 :                     val: value_ref,
     873              :                     ..
     874      2064066 :                 }) = iter.next()
     875              :                 else {
     876           28 :                     return Ok(None);
     877              :                 };
     878      2064038 :                 let value = value_ref.load(ctx).await?;
     879      2064038 :                 Ok(Some((*key, *lsn, value)))
     880      2064066 :             }
     881      2064066 :             async fn next(
     882      2064066 :                 &mut self,
     883      2064066 :                 ctx: &RequestContext,
     884      2064066 :             ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     885      2064066 :                 match self {
     886            0 :                     AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
     887            0 :                       Self::next_all_keys_iter(iter, ctx).await
     888              :                     }
     889            0 :                     AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
     890      2064066 :                     AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
     891              :                         // advance both iterators
     892      2064066 :                         let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
     893      2064066 :                         let merge_iter_item = merge_iter.next().await;
     894              :                         // compare results & log warnings as needed
     895              :                         macro_rules! rate_limited_warn {
     896              :                             ($($arg:tt)*) => {{
     897              :                                 if cfg!(debug_assertions) || cfg!(feature = "testing") {
     898              :                                     warn!($($arg)*);
     899              :                                     panic!("CompactL0BypassPageCacheValidation failure, check logs");
     900              :                                 }
     901              :                                 use once_cell::sync::Lazy;
     902              :                                 use utils::rate_limit::RateLimit;
     903              :                                 use std::sync::Mutex;
     904              :                                 use std::time::Duration;
     905              :                                 static LOGGED: Lazy<Mutex<RateLimit>> =
     906            0 :                                     Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
     907              :                                 let mut rate_limit = LOGGED.lock().unwrap();
     908            0 :                                 rate_limit.call(|| {
     909            0 :                                     warn!($($arg)*);
     910            0 :                                 });
     911              :                             }}
     912              :                         }
     913      2064066 :                         match (&all_keys_iter_item, &merge_iter_item) {
     914            0 :                             (Err(_), Err(_)) => {
     915            0 :                                 // don't bother asserting equivality of the errors
     916            0 :                             }
     917            0 :                             (Err(all_keys), Ok(merge)) => {
     918            0 :                                 rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
     919            0 :                             },
     920            0 :                             (Ok(all_keys), Err(merge)) => {
     921            0 :                                 rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
     922            0 :                             },
     923           28 :                             (Ok(None), Ok(None)) => { }
     924            0 :                             (Ok(Some(all_keys)), Ok(None)) => {
     925            0 :                                 rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
     926            0 :                             }
     927            0 :                             (Ok(None), Ok(Some(merge))) => {
     928            0 :                                 rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
     929            0 :                             }
     930      2064038 :                             (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
     931      2064038 :                                 match mode {
     932              :                                     // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
     933              :                                     CompactL0BypassPageCacheValidation::KeyLsn => {
     934            0 :                                         let all_keys = (all_keys_key, all_keys_lsn);
     935            0 :                                         let merge = (merge_key, merge_lsn);
     936            0 :                                         if all_keys != merge {
     937            0 :                                             rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
     938            0 :                                         }
     939              :                                     }
     940              :                                     CompactL0BypassPageCacheValidation::KeyLsnValue => {
     941      2064038 :                                         let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
     942      2064038 :                                         let merge = (merge_key, merge_lsn, merge_value);
     943      2064038 :                                         if all_keys != merge {
     944            0 :                                             rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
     945      2064038 :                                         }
     946              :                                     }
     947              :                                 }
     948              :                             }
     949              :                         }
     950              :                         // in case of mismatch, trust the legacy all_keys_iter_item
     951      2064066 :                         all_keys_iter_item
     952      2064066 :                     }.instrument(info_span!("next")).await
     953              :                 }
     954      2064066 :             }
     955              :         }
     956           28 :         let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
     957            0 :             CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
     958            0 :                 all_keys_iter: all_keys.iter(),
     959            0 :             },
     960           28 :             CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
     961           28 :                 let merge_iter = {
     962           28 :                     let mut deltas = Vec::with_capacity(deltas_to_compact.len());
     963          402 :                     for l in deltas_to_compact.iter() {
     964          402 :                         let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     965          402 :                         deltas.push(l);
     966              :                     }
     967           28 :                     MergeIterator::create(&deltas, &[], ctx)
     968           28 :                 };
     969           28 :                 match validate {
     970            0 :                     None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
     971           28 :                     Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
     972           28 :                         mode: validate.clone(),
     973           28 :                         merge_iter,
     974           28 :                         all_keys_iter: all_keys.iter(),
     975           28 :                     },
     976              :                 }
     977              :             }
     978              :         };
     979              : 
     980              :         // This iterator walks through all keys and is needed to calculate size used by each key
     981           28 :         let mut all_keys_iter = all_keys
     982           28 :             .iter()
     983      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     984      2064010 :             .coalesce(|mut prev, cur| {
     985      2064010 :                 // Coalesce keys that belong to the same key pair.
     986      2064010 :                 // This ensures that compaction doesn't put them
     987      2064010 :                 // into different layer files.
     988      2064010 :                 // Still limit this by the target file size,
     989      2064010 :                 // so that we keep the size of the files in
     990      2064010 :                 // check.
     991      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     992        40038 :                     prev.2 += cur.2;
     993        40038 :                     Ok(prev)
     994              :                 } else {
     995      2023972 :                     Err((prev, cur))
     996              :                 }
     997      2064010 :             });
     998           28 : 
     999           28 :         // Merge the contents of all the input delta layers into a new set
    1000           28 :         // of delta layers, based on the current partitioning.
    1001           28 :         //
    1002           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    1003           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    1004           28 :         // would be too large. In that case, we also split on the LSN dimension.
    1005           28 :         //
    1006           28 :         // LSN
    1007           28 :         //  ^
    1008           28 :         //  |
    1009           28 :         //  | +-----------+            +--+--+--+--+
    1010           28 :         //  | |           |            |  |  |  |  |
    1011           28 :         //  | +-----------+            |  |  |  |  |
    1012           28 :         //  | |           |            |  |  |  |  |
    1013           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1014           28 :         //  | |           |            |  |  |  |  |
    1015           28 :         //  | +-----------+            |  |  |  |  |
    1016           28 :         //  | |           |            |  |  |  |  |
    1017           28 :         //  | +-----------+            +--+--+--+--+
    1018           28 :         //  |
    1019           28 :         //  +--------------> key
    1020           28 :         //
    1021           28 :         //
    1022           28 :         // If one key (X) has a lot of page versions:
    1023           28 :         //
    1024           28 :         // LSN
    1025           28 :         //  ^
    1026           28 :         //  |                                 (X)
    1027           28 :         //  | +-----------+            +--+--+--+--+
    1028           28 :         //  | |           |            |  |  |  |  |
    1029           28 :         //  | +-----------+            |  |  +--+  |
    1030           28 :         //  | |           |            |  |  |  |  |
    1031           28 :         //  | +-----------+     ==>    |  |  |  |  |
    1032           28 :         //  | |           |            |  |  +--+  |
    1033           28 :         //  | +-----------+            |  |  |  |  |
    1034           28 :         //  | |           |            |  |  |  |  |
    1035           28 :         //  | +-----------+            +--+--+--+--+
    1036           28 :         //  |
    1037           28 :         //  +--------------> key
    1038           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
    1039           28 :         // based on the partitioning.
    1040           28 :         //
    1041           28 :         // TODO: we should also opportunistically materialize and
    1042           28 :         // garbage collect what we can.
    1043           28 :         let mut new_layers = Vec::new();
    1044           28 :         let mut prev_key: Option<Key> = None;
    1045           28 :         let mut writer: Option<DeltaLayerWriter> = None;
    1046           28 :         let mut key_values_total_size = 0u64;
    1047           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    1048           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    1049           28 :         let mut next_hole = 0; // index of next hole in holes vector
    1050           28 : 
    1051           28 :         let mut keys = 0;
    1052              : 
    1053      2064066 :         while let Some((key, lsn, value)) = all_values_iter
    1054      2064066 :             .next(ctx)
    1055        39353 :             .await
    1056      2064066 :             .map_err(CompactionError::Other)?
    1057              :         {
    1058      2064038 :             keys += 1;
    1059      2064038 : 
    1060      2064038 :             if keys % 32_768 == 0 && self.cancel.is_cancelled() {
    1061              :                 // avoid hitting the cancellation token on every key. in benches, we end up
    1062              :                 // shuffling an order of million keys per layer, this means we'll check it
    1063              :                 // around tens of times per layer.
    1064            0 :                 return Err(CompactionError::ShuttingDown);
    1065      2064038 :             }
    1066      2064038 : 
    1067      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    1068      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    1069      2064038 :             if !same_key || lsn == dup_end_lsn {
    1070      2024000 :                 let mut next_key_size = 0u64;
    1071      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    1072      2024000 :                 dup_start_lsn = Lsn::INVALID;
    1073      2024000 :                 if !same_key {
    1074      2024000 :                     dup_end_lsn = Lsn::INVALID;
    1075      2024000 :                 }
    1076              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    1077      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    1078      2024000 :                     next_key_size = next_size;
    1079      2024000 :                     if key != next_key {
    1080      2023972 :                         if dup_end_lsn.is_valid() {
    1081            0 :                             // We are writting segment with duplicates:
    1082            0 :                             // place all remaining values of this key in separate segment
    1083            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    1084            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    1085      2023972 :                         }
    1086      2023972 :                         break;
    1087           28 :                     }
    1088           28 :                     key_values_total_size += next_size;
    1089           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    1090           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    1091           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    1092              :                         // Split key between multiple layers: such layer can contain only single key
    1093            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    1094            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    1095              :                         } else {
    1096            0 :                             lsn // start with the first LSN for this key
    1097              :                         };
    1098            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    1099            0 :                         break;
    1100           28 :                     }
    1101              :                 }
    1102              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    1103      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    1104            0 :                     dup_start_lsn = dup_end_lsn;
    1105            0 :                     dup_end_lsn = lsn_range.end;
    1106      2024000 :                 }
    1107      2024000 :                 if writer.is_some() {
    1108      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1109      2023972 :                     let contains_hole =
    1110      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1111              :                     // check if key cause layer overflow or contains hole...
    1112      2023972 :                     if is_dup_layer
    1113      2023972 :                         || dup_end_lsn.is_valid()
    1114      2023972 :                         || written_size + key_values_total_size > target_file_size
    1115      2023692 :                         || contains_hole
    1116              :                     {
    1117              :                         // ... if so, flush previous layer and prepare to write new one
    1118          280 :                         let (desc, path) = writer
    1119          280 :                             .take()
    1120          280 :                             .unwrap()
    1121          280 :                             .finish(prev_key.unwrap().next(), ctx)
    1122          720 :                             .await
    1123          280 :                             .map_err(CompactionError::Other)?;
    1124          280 :                         let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1125          280 :                             .map_err(CompactionError::Other)?;
    1126              : 
    1127          280 :                         new_layers.push(new_delta);
    1128          280 :                         writer = None;
    1129          280 : 
    1130          280 :                         if contains_hole {
    1131            0 :                             // skip hole
    1132            0 :                             next_hole += 1;
    1133          280 :                         }
    1134      2023692 :                     }
    1135           28 :                 }
    1136              :                 // Remember size of key value because at next iteration we will access next item
    1137      2024000 :                 key_values_total_size = next_key_size;
    1138        40038 :             }
    1139      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1140            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1141            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1142            0 :                 )))
    1143      2064038 :             });
    1144              : 
    1145      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1146      2064038 :                 if writer.is_none() {
    1147          308 :                     if self.cancel.is_cancelled() {
    1148              :                         // to be somewhat responsive to cancellation, check for each new layer
    1149            0 :                         return Err(CompactionError::ShuttingDown);
    1150          308 :                     }
    1151              :                     // Create writer if not initiaized yet
    1152          308 :                     writer = Some(
    1153              :                         DeltaLayerWriter::new(
    1154          308 :                             self.conf,
    1155          308 :                             self.timeline_id,
    1156          308 :                             self.tenant_shard_id,
    1157          308 :                             key,
    1158          308 :                             if dup_end_lsn.is_valid() {
    1159              :                                 // this is a layer containing slice of values of the same key
    1160            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1161            0 :                                 dup_start_lsn..dup_end_lsn
    1162              :                             } else {
    1163          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1164          308 :                                 lsn_range.clone()
    1165              :                             },
    1166          308 :                             ctx,
    1167              :                         )
    1168          154 :                         .await
    1169          308 :                         .map_err(CompactionError::Other)?,
    1170              :                     );
    1171              : 
    1172          308 :                     keys = 0;
    1173      2063730 :                 }
    1174              : 
    1175      2064038 :                 writer
    1176      2064038 :                     .as_mut()
    1177      2064038 :                     .unwrap()
    1178      2064038 :                     .put_value(key, lsn, value, ctx)
    1179         1220 :                     .await
    1180      2064038 :                     .map_err(CompactionError::Other)?;
    1181              :             } else {
    1182            0 :                 debug!(
    1183            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    1184            0 :                     key,
    1185            0 :                     self.shard_identity.get_shard_number(&key)
    1186              :                 );
    1187              :             }
    1188              : 
    1189      2064038 :             if !new_layers.is_empty() {
    1190        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1191      2044252 :             }
    1192              : 
    1193      2064038 :             prev_key = Some(key);
    1194              :         }
    1195           28 :         if let Some(writer) = writer {
    1196           28 :             let (desc, path) = writer
    1197           28 :                 .finish(prev_key.unwrap().next(), ctx)
    1198         1992 :                 .await
    1199           28 :                 .map_err(CompactionError::Other)?;
    1200           28 :             let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
    1201           28 :                 .map_err(CompactionError::Other)?;
    1202           28 :             new_layers.push(new_delta);
    1203            0 :         }
    1204              : 
    1205              :         // Sync layers
    1206           28 :         if !new_layers.is_empty() {
    1207              :             // Print a warning if the created layer is larger than double the target size
    1208              :             // Add two pages for potential overhead. This should in theory be already
    1209              :             // accounted for in the target calculation, but for very small targets,
    1210              :             // we still might easily hit the limit otherwise.
    1211           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1212          308 :             for layer in new_layers.iter() {
    1213          308 :                 if layer.layer_desc().file_size > warn_limit {
    1214            0 :                     warn!(
    1215              :                         %layer,
    1216            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1217              :                     );
    1218          308 :                 }
    1219              :             }
    1220              : 
    1221              :             // The writer.finish() above already did the fsync of the inodes.
    1222              :             // We just need to fsync the directory in which these inodes are linked,
    1223              :             // which we know to be the timeline directory.
    1224              :             //
    1225              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1226              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1227              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1228           28 :             let timeline_dir = VirtualFile::open(
    1229           28 :                 &self
    1230           28 :                     .conf
    1231           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1232           28 :                 ctx,
    1233           28 :             )
    1234           14 :             .await
    1235           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1236           28 :             timeline_dir
    1237           28 :                 .sync_all()
    1238           14 :                 .await
    1239           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1240            0 :         }
    1241              : 
    1242           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1243           28 :         stats.new_deltas_count = Some(new_layers.len());
    1244          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1245           28 : 
    1246           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1247           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1248              :         {
    1249           28 :             Ok(stats_json) => {
    1250           28 :                 info!(
    1251            0 :                     stats_json = stats_json.as_str(),
    1252            0 :                     "compact_level0_phase1 stats available"
    1253              :                 )
    1254              :             }
    1255            0 :             Err(e) => {
    1256            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1257              :             }
    1258              :         }
    1259              : 
    1260              :         // Without this, rustc complains about deltas_to_compact still
    1261              :         // being borrowed when we `.into_iter()` below.
    1262           28 :         drop(all_values_iter);
    1263           28 : 
    1264           28 :         Ok(CompactLevel0Phase1Result {
    1265           28 :             new_layers,
    1266           28 :             deltas_to_compact: deltas_to_compact
    1267           28 :                 .into_iter()
    1268          402 :                 .map(|x| x.drop_eviction_guard())
    1269           28 :                 .collect::<Vec<_>>(),
    1270           28 :             fully_compacted,
    1271           28 :         })
    1272          364 :     }
    1273              : }
    1274              : 
    1275              : #[derive(Default)]
    1276              : struct CompactLevel0Phase1Result {
    1277              :     new_layers: Vec<ResidentLayer>,
    1278              :     deltas_to_compact: Vec<Layer>,
    1279              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1280              :     // L0 compaction size limit.
    1281              :     fully_compacted: bool,
    1282              : }
    1283              : 
    1284              : #[derive(Default)]
    1285              : struct CompactLevel0Phase1StatsBuilder {
    1286              :     version: Option<u64>,
    1287              :     tenant_id: Option<TenantShardId>,
    1288              :     timeline_id: Option<TimelineId>,
    1289              :     read_lock_acquisition_micros: DurationRecorder,
    1290              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1291              :     read_lock_held_key_sort_micros: DurationRecorder,
    1292              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1293              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1294              :     read_lock_drop_micros: DurationRecorder,
    1295              :     write_layer_files_micros: DurationRecorder,
    1296              :     level0_deltas_count: Option<usize>,
    1297              :     new_deltas_count: Option<usize>,
    1298              :     new_deltas_size: Option<u64>,
    1299              : }
    1300              : 
    1301              : #[derive(serde::Serialize)]
    1302              : struct CompactLevel0Phase1Stats {
    1303              :     version: u64,
    1304              :     tenant_id: TenantShardId,
    1305              :     timeline_id: TimelineId,
    1306              :     read_lock_acquisition_micros: RecordedDuration,
    1307              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1308              :     read_lock_held_key_sort_micros: RecordedDuration,
    1309              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1310              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1311              :     read_lock_drop_micros: RecordedDuration,
    1312              :     write_layer_files_micros: RecordedDuration,
    1313              :     level0_deltas_count: usize,
    1314              :     new_deltas_count: usize,
    1315              :     new_deltas_size: u64,
    1316              : }
    1317              : 
    1318              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1319              :     type Error = anyhow::Error;
    1320              : 
    1321           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1322           28 :         Ok(Self {
    1323           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1324           28 :             tenant_id: value
    1325           28 :                 .tenant_id
    1326           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1327           28 :             timeline_id: value
    1328           28 :                 .timeline_id
    1329           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1330           28 :             read_lock_acquisition_micros: value
    1331           28 :                 .read_lock_acquisition_micros
    1332           28 :                 .into_recorded()
    1333           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1334           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1335           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1336           28 :                 .into_recorded()
    1337           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1338           28 :             read_lock_held_key_sort_micros: value
    1339           28 :                 .read_lock_held_key_sort_micros
    1340           28 :                 .into_recorded()
    1341           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1342           28 :             read_lock_held_prerequisites_micros: value
    1343           28 :                 .read_lock_held_prerequisites_micros
    1344           28 :                 .into_recorded()
    1345           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1346           28 :             read_lock_held_compute_holes_micros: value
    1347           28 :                 .read_lock_held_compute_holes_micros
    1348           28 :                 .into_recorded()
    1349           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1350           28 :             read_lock_drop_micros: value
    1351           28 :                 .read_lock_drop_micros
    1352           28 :                 .into_recorded()
    1353           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1354           28 :             write_layer_files_micros: value
    1355           28 :                 .write_layer_files_micros
    1356           28 :                 .into_recorded()
    1357           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1358           28 :             level0_deltas_count: value
    1359           28 :                 .level0_deltas_count
    1360           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1361           28 :             new_deltas_count: value
    1362           28 :                 .new_deltas_count
    1363           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1364           28 :             new_deltas_size: value
    1365           28 :                 .new_deltas_size
    1366           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1367              :         })
    1368           28 :     }
    1369              : }
    1370              : 
    1371            0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
    1372              : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
    1373              : pub enum CompactL0Phase1ValueAccess {
    1374              :     /// The old way.
    1375              :     PageCachedBlobIo,
    1376              :     /// The new way.
    1377              :     StreamingKmerge {
    1378              :         /// If set, we run both the old way and the new way, validate that
    1379              :         /// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
    1380              :         /// and if the validation fails,
    1381              :         /// - in tests: fail them with a panic or
    1382              :         /// - in prod, log a rate-limited warning and use the old way's results.
    1383              :         ///
    1384              :         /// If not set, we only run the new way and trust its results.
    1385              :         validate: Option<CompactL0BypassPageCacheValidation>,
    1386              :     },
    1387              : }
    1388              : 
    1389              : /// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
    1390            0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
    1391              : #[serde(rename_all = "kebab-case")]
    1392              : pub enum CompactL0BypassPageCacheValidation {
    1393              :     /// Validate that the series of (key, lsn) pairs are the same.
    1394              :     KeyLsn,
    1395              :     /// Validate that the entire output of old and new way is identical.
    1396              :     KeyLsnValue,
    1397              : }
    1398              : 
    1399              : impl Default for CompactL0Phase1ValueAccess {
    1400          212 :     fn default() -> Self {
    1401          212 :         CompactL0Phase1ValueAccess::StreamingKmerge {
    1402          212 :             // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
    1403          212 :             validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
    1404          212 :         }
    1405          212 :     }
    1406              : }
    1407              : 
    1408              : impl Timeline {
    1409              :     /// Entry point for new tiered compaction algorithm.
    1410              :     ///
    1411              :     /// All the real work is in the implementation in the pageserver_compaction
    1412              :     /// crate. The code here would apply to any algorithm implemented by the
    1413              :     /// same interface, but tiered is the only one at the moment.
    1414              :     ///
    1415              :     /// TODO: cancellation
    1416            0 :     pub(crate) async fn compact_tiered(
    1417            0 :         self: &Arc<Self>,
    1418            0 :         _cancel: &CancellationToken,
    1419            0 :         ctx: &RequestContext,
    1420            0 :     ) -> Result<(), CompactionError> {
    1421            0 :         let fanout = self.get_compaction_threshold() as u64;
    1422            0 :         let target_file_size = self.get_checkpoint_distance();
    1423              : 
    1424              :         // Find the top of the historical layers
    1425            0 :         let end_lsn = {
    1426            0 :             let guard = self.layers.read().await;
    1427            0 :             let layers = guard.layer_map()?;
    1428              : 
    1429            0 :             let l0_deltas = layers.level0_deltas();
    1430            0 : 
    1431            0 :             // As an optimization, if we find that there are too few L0 layers,
    1432            0 :             // bail out early. We know that the compaction algorithm would do
    1433            0 :             // nothing in that case.
    1434            0 :             if l0_deltas.len() < fanout as usize {
    1435              :                 // doesn't need compacting
    1436            0 :                 return Ok(());
    1437            0 :             }
    1438            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1439            0 :         };
    1440            0 : 
    1441            0 :         // Is the timeline being deleted?
    1442            0 :         if self.is_stopping() {
    1443            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1444            0 :             return Err(CompactionError::ShuttingDown);
    1445            0 :         }
    1446              : 
    1447            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1448              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1449            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1450            0 : 
    1451            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1452            0 :             &mut adaptor,
    1453            0 :             end_lsn,
    1454            0 :             target_file_size,
    1455            0 :             fanout,
    1456            0 :             ctx,
    1457            0 :         )
    1458            0 :         .await
    1459              :         // TODO: compact_tiered needs to return CompactionError
    1460            0 :         .map_err(CompactionError::Other)?;
    1461              : 
    1462            0 :         adaptor.flush_updates().await?;
    1463            0 :         Ok(())
    1464            0 :     }
    1465              : 
    1466              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1467              :     ///
    1468              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1469              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1470              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1471              :     ///
    1472              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1473              :     ///
    1474              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1475              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1476              :     ///
    1477              :     /// The function will produce:
    1478              :     ///
    1479              :     /// ```plain
    1480              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1481              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1482              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1483              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1484              :     /// ```
    1485              :     ///
    1486              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1487          346 :     pub(crate) async fn generate_key_retention(
    1488          346 :         self: &Arc<Timeline>,
    1489          346 :         key: Key,
    1490          346 :         full_history: &[(Key, Lsn, Value)],
    1491          346 :         horizon: Lsn,
    1492          346 :         retain_lsn_below_horizon: &[Lsn],
    1493          346 :         delta_threshold_cnt: usize,
    1494          346 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1495          346 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1496          346 :         // Pre-checks for the invariants
    1497          346 :         if cfg!(debug_assertions) {
    1498          836 :             for (log_key, _, _) in full_history {
    1499          490 :                 assert_eq!(log_key, &key, "mismatched key");
    1500              :             }
    1501          346 :             for i in 1..full_history.len() {
    1502          144 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1503          144 :                 if full_history[i - 1].1 == full_history[i].1 {
    1504            0 :                     assert!(
    1505            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1506            0 :                         "unordered delta/image, or duplicated delta"
    1507              :                     );
    1508          144 :                 }
    1509              :             }
    1510              :             // There was an assertion for no base image that checks if the first
    1511              :             // record in the history is `will_init` before, but it was removed.
    1512              :             // This is explained in the test cases for generate_key_retention.
    1513              :             // Search "incomplete history" for more information.
    1514          732 :             for lsn in retain_lsn_below_horizon {
    1515          386 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1516              :             }
    1517          346 :             for i in 1..retain_lsn_below_horizon.len() {
    1518          186 :                 assert!(
    1519          186 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1520            0 :                     "unordered LSN"
    1521              :                 );
    1522              :             }
    1523            0 :         }
    1524          346 :         let has_ancestor = base_img_from_ancestor.is_some();
    1525              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1526              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1527          346 :         let (mut split_history, lsn_split_points) = {
    1528          346 :             let mut split_history = Vec::new();
    1529          346 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1530          346 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1531          732 :             for lsn in retain_lsn_below_horizon {
    1532          386 :                 lsn_split_points.push(*lsn);
    1533          386 :             }
    1534          346 :             lsn_split_points.push(horizon);
    1535          346 :             let mut current_idx = 0;
    1536          836 :             for item @ (_, lsn, _) in full_history {
    1537          598 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1538          108 :                     current_idx += 1;
    1539          108 :                 }
    1540          490 :                 split_history[current_idx].push(item);
    1541              :             }
    1542          346 :             (split_history, lsn_split_points)
    1543              :         };
    1544              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1545         1424 :         for split_for_lsn in &mut split_history {
    1546         1078 :             let mut prev_lsn = None;
    1547         1078 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1548         1078 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1549          490 :                 if let Some(prev_lsn) = &prev_lsn {
    1550           60 :                     if *prev_lsn == lsn {
    1551              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1552              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1553              :                         // drop this delta and keep the image.
    1554              :                         //
    1555              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1556              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1557              :                         // dropped.
    1558              :                         //
    1559              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1560              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1561            0 :                         continue;
    1562           60 :                     }
    1563          430 :                 }
    1564          490 :                 prev_lsn = Some(lsn);
    1565          490 :                 new_split_for_lsn.push(record);
    1566              :             }
    1567         1078 :             *split_for_lsn = new_split_for_lsn;
    1568              :         }
    1569              :         // Step 3: generate images when necessary
    1570          346 :         let mut retention = Vec::with_capacity(split_history.len());
    1571          346 :         let mut records_since_last_image = 0;
    1572          346 :         let batch_cnt = split_history.len();
    1573          346 :         assert!(
    1574          346 :             batch_cnt >= 2,
    1575            0 :             "should have at least below + above horizon batches"
    1576              :         );
    1577          346 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1578          346 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1579           18 :             replay_history.push((key, lsn, Value::Image(img)));
    1580          328 :         }
    1581              : 
    1582              :         /// Generate debug information for the replay history
    1583            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1584            0 :             use std::fmt::Write;
    1585            0 :             let mut output = String::new();
    1586            0 :             if let Some((key, _, _)) = replay_history.first() {
    1587            0 :                 write!(output, "key={} ", key).unwrap();
    1588            0 :                 let mut cnt = 0;
    1589            0 :                 for (_, lsn, val) in replay_history {
    1590            0 :                     if val.is_image() {
    1591            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1592            0 :                     } else if val.will_init() {
    1593            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1594            0 :                     } else {
    1595            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1596            0 :                     }
    1597            0 :                     cnt += 1;
    1598            0 :                     if cnt >= 128 {
    1599            0 :                         write!(output, "... and more").unwrap();
    1600            0 :                         break;
    1601            0 :                     }
    1602              :                 }
    1603            0 :             } else {
    1604            0 :                 write!(output, "<no history>").unwrap();
    1605            0 :             }
    1606            0 :             output
    1607            0 :         }
    1608              : 
    1609            0 :         fn generate_debug_trace(
    1610            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1611            0 :             full_history: &[(Key, Lsn, Value)],
    1612            0 :             lsns: &[Lsn],
    1613            0 :             horizon: Lsn,
    1614            0 :         ) -> String {
    1615            0 :             use std::fmt::Write;
    1616            0 :             let mut output = String::new();
    1617            0 :             if let Some(replay_history) = replay_history {
    1618            0 :                 writeln!(
    1619            0 :                     output,
    1620            0 :                     "replay_history: {}",
    1621            0 :                     generate_history_trace(replay_history)
    1622            0 :                 )
    1623            0 :                 .unwrap();
    1624            0 :             } else {
    1625            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1626            0 :             }
    1627            0 :             writeln!(
    1628            0 :                 output,
    1629            0 :                 "full_history: {}",
    1630            0 :                 generate_history_trace(full_history)
    1631            0 :             )
    1632            0 :             .unwrap();
    1633            0 :             writeln!(
    1634            0 :                 output,
    1635            0 :                 "when processing: [{}] horizon={}",
    1636            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1637            0 :                 horizon
    1638            0 :             )
    1639            0 :             .unwrap();
    1640            0 :             output
    1641            0 :         }
    1642              : 
    1643         1078 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1644              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1645         1078 :             records_since_last_image += split_for_lsn.len();
    1646         1078 :             let generate_image = if i == 0 && !has_ancestor {
    1647              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1648          328 :                 true
    1649          750 :             } else if i == batch_cnt - 1 {
    1650              :                 // Do not generate images for the last batch (above horizon)
    1651          346 :                 false
    1652          404 :             } else if records_since_last_image >= delta_threshold_cnt {
    1653              :                 // Generate images when there are too many records
    1654            6 :                 true
    1655              :             } else {
    1656          398 :                 false
    1657              :             };
    1658         1078 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1659              :             // Only retain the items after the last image record
    1660         1346 :             for idx in (0..replay_history.len()).rev() {
    1661         1346 :                 if replay_history[idx].2.will_init() {
    1662         1078 :                     replay_history = replay_history[idx..].to_vec();
    1663         1078 :                     break;
    1664          268 :                 }
    1665              :             }
    1666         1078 :             if let Some((_, _, val)) = replay_history.first() {
    1667         1078 :                 if !val.will_init() {
    1668            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1669            0 :                         || {
    1670            0 :                             generate_debug_trace(
    1671            0 :                                 Some(&replay_history),
    1672            0 :                                 full_history,
    1673            0 :                                 retain_lsn_below_horizon,
    1674            0 :                                 horizon,
    1675            0 :                             )
    1676            0 :                         },
    1677            0 :                     );
    1678         1078 :                 }
    1679            0 :             }
    1680         1078 :             if generate_image && records_since_last_image > 0 {
    1681          334 :                 records_since_last_image = 0;
    1682          334 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1683          334 :                     Some(replay_history.clone())
    1684              :                 } else {
    1685            0 :                     None
    1686              :                 };
    1687          334 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1688          334 :                 let history = std::mem::take(&mut replay_history);
    1689          334 :                 let mut img = None;
    1690          334 :                 let mut records = Vec::with_capacity(history.len());
    1691          334 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1692          334 :                     img = Some((*lsn, val.clone()));
    1693          334 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1694           34 :                         let Value::WalRecord(rec) = val else {
    1695            0 :                             return Err(anyhow::anyhow!(
    1696            0 :                                 "invalid record, first record is image, expect walrecords"
    1697            0 :                             ))
    1698            0 :                             .with_context(|| {
    1699            0 :                                 generate_debug_trace(
    1700            0 :                                     replay_history_for_debug_ref,
    1701            0 :                                     full_history,
    1702            0 :                                     retain_lsn_below_horizon,
    1703            0 :                                     horizon,
    1704            0 :                                 )
    1705            0 :                             });
    1706              :                         };
    1707           34 :                         records.push((lsn, rec));
    1708              :                     }
    1709              :                 } else {
    1710            0 :                     for (_, lsn, val) in history.into_iter() {
    1711            0 :                         let Value::WalRecord(rec) = val else {
    1712            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1713            0 :                                 .with_context(|| generate_debug_trace(
    1714            0 :                                     replay_history_for_debug_ref,
    1715            0 :                                     full_history,
    1716            0 :                                     retain_lsn_below_horizon,
    1717            0 :                                     horizon,
    1718            0 :                                 ));
    1719              :                         };
    1720            0 :                         records.push((lsn, rec));
    1721              :                     }
    1722              :                 }
    1723          334 :                 records.reverse();
    1724          334 :                 let state = ValueReconstructState { img, records };
    1725          334 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1726          334 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1727          334 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1728          334 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1729          744 :             } else {
    1730          744 :                 let deltas = split_for_lsn
    1731          744 :                     .iter()
    1732          744 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1733          744 :                     .collect_vec();
    1734          744 :                 retention.push(deltas);
    1735          744 :             }
    1736              :         }
    1737          346 :         let mut result = Vec::with_capacity(retention.len());
    1738          346 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1739         1078 :         for (idx, logs) in retention.into_iter().enumerate() {
    1740         1078 :             if idx == lsn_split_points.len() {
    1741          346 :                 return Ok(KeyHistoryRetention {
    1742          346 :                     below_horizon: result,
    1743          346 :                     above_horizon: KeyLogAtLsn(logs),
    1744          346 :                 });
    1745          732 :             } else {
    1746          732 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1747          732 :             }
    1748              :         }
    1749            0 :         unreachable!("key retention is empty")
    1750          346 :     }
    1751              : 
    1752              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1753              :     ///
    1754              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1755              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1756              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1757              :     /// and create delta layers with all deltas >= gc horizon.
    1758           20 :     pub(crate) async fn compact_with_gc(
    1759           20 :         self: &Arc<Self>,
    1760           20 :         cancel: &CancellationToken,
    1761           20 :         flags: EnumSet<CompactFlags>,
    1762           20 :         ctx: &RequestContext,
    1763           20 :     ) -> anyhow::Result<()> {
    1764           20 :         use std::collections::BTreeSet;
    1765           20 : 
    1766           20 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1767           20 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1768           20 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1769           20 : 
    1770           20 :         let gc_lock = async {
    1771              :             tokio::select! {
    1772              :                 guard = self.gc_lock.lock() => Ok(guard),
    1773              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1774              :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1775              :             }
    1776           20 :         };
    1777              : 
    1778           20 :         let gc_lock = crate::timed(
    1779           20 :             gc_lock,
    1780           20 :             "acquires gc lock",
    1781           20 :             std::time::Duration::from_secs(5),
    1782           20 :         )
    1783            0 :         .await?;
    1784              : 
    1785           20 :         let dry_run = flags.contains(CompactFlags::DryRun);
    1786           20 : 
    1787           20 :         info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
    1788              : 
    1789              :         scopeguard::defer! {
    1790              :             info!("done enhanced gc bottom-most compaction");
    1791              :         };
    1792              : 
    1793           20 :         let mut stat = CompactionStatistics::default();
    1794              : 
    1795              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1796              :         // The layer selection has the following properties:
    1797              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1798              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1799           20 :         let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
    1800           20 :             let guard = self.layers.read().await;
    1801           20 :             let layers = guard.layer_map()?;
    1802           20 :             let gc_info = self.gc_info.read().unwrap();
    1803           20 :             let mut retain_lsns_below_horizon = Vec::new();
    1804           20 :             let gc_cutoff = gc_info.cutoffs.select_min();
    1805           22 :             for (lsn, _timeline_id) in &gc_info.retain_lsns {
    1806           22 :                 if lsn < &gc_cutoff {
    1807           22 :                     retain_lsns_below_horizon.push(*lsn);
    1808           22 :                 }
    1809              :             }
    1810           20 :             for lsn in gc_info.leases.keys() {
    1811            0 :                 if lsn < &gc_cutoff {
    1812            0 :                     retain_lsns_below_horizon.push(*lsn);
    1813            0 :                 }
    1814              :             }
    1815           20 :             let mut selected_layers = Vec::new();
    1816           20 :             drop(gc_info);
    1817           98 :             for desc in layers.iter_historic_layers() {
    1818           98 :                 if desc.get_lsn_range().start <= gc_cutoff {
    1819           80 :                     selected_layers.push(guard.get_from_desc(&desc));
    1820           80 :                 }
    1821              :             }
    1822           20 :             retain_lsns_below_horizon.sort();
    1823           20 :             (selected_layers, gc_cutoff, retain_lsns_below_horizon)
    1824              :         };
    1825           20 :         let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
    1826            2 :             Lsn(self.ancestor_lsn.0 + 1)
    1827              :         } else {
    1828           18 :             let res = retain_lsns_below_horizon
    1829           18 :                 .first()
    1830           18 :                 .copied()
    1831           18 :                 .unwrap_or(gc_cutoff);
    1832           18 :             if cfg!(debug_assertions) {
    1833           18 :                 assert_eq!(
    1834           18 :                     res,
    1835           18 :                     retain_lsns_below_horizon
    1836           18 :                         .iter()
    1837           18 :                         .min()
    1838           18 :                         .copied()
    1839           18 :                         .unwrap_or(gc_cutoff)
    1840           18 :                 );
    1841            0 :             }
    1842           18 :             res
    1843              :         };
    1844           20 :         info!(
    1845            0 :             "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
    1846            0 :             layer_selection.len(),
    1847              :             gc_cutoff,
    1848              :             lowest_retain_lsn
    1849              :         );
    1850              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1851              :         // Also, collect the layer information to decide when to split the new delta layers.
    1852           20 :         let mut downloaded_layers = Vec::new();
    1853           20 :         let mut delta_split_points = BTreeSet::new();
    1854          100 :         for layer in &layer_selection {
    1855           80 :             let resident_layer = layer.download_and_keep_resident().await?;
    1856           80 :             downloaded_layers.push(resident_layer);
    1857           80 : 
    1858           80 :             let desc = layer.layer_desc();
    1859           80 :             if desc.is_delta() {
    1860           54 :                 // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
    1861           54 :                 // so that we can avoid having too many small delta layers.
    1862           54 :                 let key_range = desc.get_key_range();
    1863           54 :                 delta_split_points.insert(key_range.start);
    1864           54 :                 delta_split_points.insert(key_range.end);
    1865           54 :                 stat.visit_delta_layer(desc.file_size());
    1866           54 :             } else {
    1867           26 :                 stat.visit_image_layer(desc.file_size());
    1868           26 :             }
    1869              :         }
    1870           20 :         let mut delta_layers = Vec::new();
    1871           20 :         let mut image_layers = Vec::new();
    1872          100 :         for resident_layer in &downloaded_layers {
    1873           80 :             if resident_layer.layer_desc().is_delta() {
    1874           54 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    1875           54 :                 delta_layers.push(layer);
    1876           26 :             } else {
    1877           26 :                 let layer = resident_layer.get_as_image(ctx).await?;
    1878           26 :                 image_layers.push(layer);
    1879              :             }
    1880              :         }
    1881           20 :         let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
    1882           20 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1883           20 :         // Data of the same key.
    1884           20 :         let mut accumulated_values = Vec::new();
    1885           20 :         let mut last_key: Option<Key> = None;
    1886           20 : 
    1887           20 :         enum FlushDeltaResult {
    1888           20 :             /// Create a new resident layer
    1889           20 :             CreateResidentLayer(ResidentLayer),
    1890           20 :             /// Keep an original delta layer
    1891           20 :             KeepLayer(PersistentLayerKey),
    1892           20 :         }
    1893           20 : 
    1894           20 :         #[allow(clippy::too_many_arguments)]
    1895          338 :         async fn flush_deltas(
    1896          338 :             deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
    1897          338 :             last_key: Key,
    1898          338 :             delta_split_points: &[Key],
    1899          338 :             current_delta_split_point: &mut usize,
    1900          338 :             tline: &Arc<Timeline>,
    1901          338 :             lowest_retain_lsn: Lsn,
    1902          338 :             ctx: &RequestContext,
    1903          338 :             stats: &mut CompactionStatistics,
    1904          338 :             dry_run: bool,
    1905          338 :             last_batch: bool,
    1906          338 :         ) -> anyhow::Result<Option<FlushDeltaResult>> {
    1907          338 :             // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
    1908          338 :             // overlapping layers.
    1909          338 :             //
    1910          338 :             // If we have a structure like this:
    1911          338 :             //
    1912          338 :             // | Delta 1 |         | Delta 4 |
    1913          338 :             // |---------| Delta 2 |---------|
    1914          338 :             // | Delta 3 |         | Delta 5 |
    1915          338 :             //
    1916          338 :             // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
    1917          338 :             // A simple solution here is to split the delta layers using the original boundary, while this
    1918          338 :             // might produce a lot of small layers. This should be improved and fixed in the future.
    1919          338 :             let mut need_split = false;
    1920          428 :             while *current_delta_split_point < delta_split_points.len()
    1921          362 :                 && last_key >= delta_split_points[*current_delta_split_point]
    1922           90 :             {
    1923           90 :                 *current_delta_split_point += 1;
    1924           90 :                 need_split = true;
    1925           90 :             }
    1926          338 :             if !need_split && !last_batch {
    1927          232 :                 return Ok(None);
    1928          106 :             }
    1929          106 :             let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
    1930          106 :             if deltas.is_empty() {
    1931           48 :                 return Ok(None);
    1932           58 :             }
    1933           92 :             let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
    1934           58 :             let delta_key = PersistentLayerKey {
    1935           58 :                 key_range: {
    1936           58 :                     let key_start = deltas.first().unwrap().0;
    1937           58 :                     let key_end = deltas.last().unwrap().0.next();
    1938           58 :                     key_start..key_end
    1939           58 :                 },
    1940           58 :                 lsn_range: lowest_retain_lsn..end_lsn,
    1941           58 :                 is_delta: true,
    1942           58 :             };
    1943           20 :             {
    1944           20 :                 // Hack: skip delta layer if we need to produce a layer of a same key-lsn.
    1945           20 :                 //
    1946           20 :                 // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
    1947           20 :                 // For example, consider the case where a single delta with range [0x10,0x50) exists.
    1948           20 :                 // And we have branches at LSN 0x10, 0x20, 0x30.
    1949           20 :                 // Then we delete branch @ 0x20.
    1950           20 :                 // Bottom-most compaction may now delete the delta [0x20,0x30).
    1951           20 :                 // And that wouldnt' change the shape of the layer.
    1952           20 :                 //
    1953           20 :                 // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
    1954           20 :                 // That's why it's safe to skip.
    1955           58 :                 let guard = tline.layers.read().await;
    1956           20 : 
    1957           58 :                 if guard.contains_key(&delta_key) {
    1958           26 :                     let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
    1959           26 :                     drop(guard);
    1960           26 :                     if layer_generation == tline.generation {
    1961           26 :                         stats.discard_delta_layer();
    1962           26 :                         // TODO: depending on whether we design this compaction process to run along with
    1963           26 :                         // other compactions, there could be layer map modifications after we drop the
    1964           26 :                         // layer guard, and in case it creates duplicated layer key, we will still error
    1965           26 :                         // in the end.
    1966           26 :                         info!(
    1967           20 :                             key=%delta_key,
    1968           20 :                             ?layer_generation,
    1969           20 :                             "discard delta layer due to duplicated layer in the same generation"
    1970           20 :                         );
    1971           26 :                         return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
    1972           20 :                     }
    1973           32 :                 }
    1974           20 :             }
    1975           20 : 
    1976           32 :             let mut delta_layer_writer = DeltaLayerWriter::new(
    1977           32 :                 tline.conf,
    1978           32 :                 tline.timeline_id,
    1979           32 :                 tline.tenant_shard_id,
    1980           32 :                 delta_key.key_range.start,
    1981           32 :                 lowest_retain_lsn..end_lsn,
    1982           32 :                 ctx,
    1983           32 :             )
    1984           20 :             .await?;
    1985           90 :             for (key, lsn, val) in deltas {
    1986           58 :                 delta_layer_writer.put_value(key, lsn, val, ctx).await?;
    1987           20 :             }
    1988           20 : 
    1989           32 :             stats.produce_delta_layer(delta_layer_writer.size());
    1990           32 :             if dry_run {
    1991           20 :                 return Ok(None);
    1992           24 :             }
    1993           20 : 
    1994           24 :             let (desc, path) = delta_layer_writer
    1995           24 :                 .finish(delta_key.key_range.end, ctx)
    1996           62 :                 .await?;
    1997           24 :             let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
    1998           24 :             Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
    1999          338 :         }
    2000           20 : 
    2001           20 :         // Hack the key range to be min..(max-1). Otherwise, the image layer will be
    2002           20 :         // interpreted as an L0 delta layer.
    2003           20 :         let hack_image_layer_range = {
    2004           20 :             let mut end_key = Key::MAX;
    2005           20 :             end_key.field6 -= 1;
    2006           20 :             Key::MIN..end_key
    2007              :         };
    2008              : 
    2009              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    2010              :         // when some condition meet.
    2011           20 :         let mut image_layer_writer = if self.ancestor_timeline.is_none() {
    2012              :             Some(
    2013           18 :                 ImageLayerWriter::new(
    2014           18 :                     self.conf,
    2015           18 :                     self.timeline_id,
    2016           18 :                     self.tenant_shard_id,
    2017           18 :                     &hack_image_layer_range, // covers the full key range
    2018           18 :                     lowest_retain_lsn,
    2019           18 :                     ctx,
    2020           18 :                 )
    2021            9 :                 .await?,
    2022              :             )
    2023              :         } else {
    2024            2 :             None
    2025              :         };
    2026              : 
    2027              :         /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
    2028              :         ///
    2029              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    2030              :         /// is needed for reconstruction. This should be fixed in the future.
    2031              :         ///
    2032              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    2033              :         /// images.
    2034          338 :         async fn get_ancestor_image(
    2035          338 :             tline: &Arc<Timeline>,
    2036          338 :             key: Key,
    2037          338 :             ctx: &RequestContext,
    2038          338 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    2039          338 :             if tline.ancestor_timeline.is_none() {
    2040          324 :                 return Ok(None);
    2041           14 :             };
    2042              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    2043              :             // as much existing code as possible.
    2044           14 :             let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
    2045           14 :             Ok(Some((key, tline.ancestor_lsn, img)))
    2046          338 :         }
    2047           20 :         let image_layer_key = PersistentLayerKey {
    2048           20 :             key_range: hack_image_layer_range,
    2049           20 :             lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
    2050           20 :             is_delta: false,
    2051           20 :         };
    2052              : 
    2053              :         // Like with delta layers, it can happen that we re-produce an already existing image layer.
    2054              :         // This could happen when a user triggers force compaction and image generation. In this case,
    2055              :         // it's always safe to rewrite the layer.
    2056           20 :         let discard_image_layer = {
    2057           20 :             let guard = self.layers.read().await;
    2058           20 :             if guard.contains_key(&image_layer_key) {
    2059            6 :                 let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
    2060            6 :                 drop(guard);
    2061            6 :                 if layer_generation == self.generation {
    2062              :                     // TODO: depending on whether we design this compaction process to run along with
    2063              :                     // other compactions, there could be layer map modifications after we drop the
    2064              :                     // layer guard, and in case it creates duplicated layer key, we will still error
    2065              :                     // in the end.
    2066            6 :                     info!(
    2067              :                         key=%image_layer_key,
    2068              :                         ?layer_generation,
    2069            0 :                         "discard image layer due to duplicated layer key in the same generation",
    2070              :                     );
    2071            6 :                     true
    2072              :                 } else {
    2073            0 :                     false
    2074              :                 }
    2075              :             } else {
    2076           14 :                 false
    2077              :             }
    2078              :         };
    2079              : 
    2080              :         // Actually, we can decide not to write to the image layer at all at this point because
    2081              :         // the key and LSN range are determined. However, to keep things simple here, we still
    2082              :         // create this writer, and discard the writer in the end.
    2083              : 
    2084           20 :         let mut delta_values = Vec::new();
    2085           20 :         let delta_split_points = delta_split_points.into_iter().collect_vec();
    2086           20 :         let mut current_delta_split_point = 0;
    2087           20 :         let mut delta_layers = Vec::new();
    2088          460 :         while let Some((key, lsn, val)) = merge_iter.next().await? {
    2089          440 :             if cancel.is_cancelled() {
    2090            0 :                 return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
    2091          440 :             }
    2092          440 :             match val {
    2093          336 :                 Value::Image(_) => stat.visit_image_key(&val),
    2094          104 :                 Value::WalRecord(_) => stat.visit_wal_key(&val),
    2095              :             }
    2096          440 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    2097          122 :                 if last_key.is_none() {
    2098           20 :                     last_key = Some(key);
    2099          102 :                 }
    2100          122 :                 accumulated_values.push((key, lsn, val));
    2101              :             } else {
    2102          318 :                 let last_key = last_key.as_mut().unwrap();
    2103          318 :                 stat.on_unique_key_visited();
    2104          318 :                 let retention = self
    2105          318 :                     .generate_key_retention(
    2106          318 :                         *last_key,
    2107          318 :                         &accumulated_values,
    2108          318 :                         gc_cutoff,
    2109          318 :                         &retain_lsns_below_horizon,
    2110          318 :                         COMPACTION_DELTA_THRESHOLD,
    2111          318 :                         get_ancestor_image(self, *last_key, ctx).await?,
    2112              :                     )
    2113            0 :                     .await?;
    2114              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    2115          318 :                 retention
    2116          318 :                     .pipe_to(
    2117          318 :                         *last_key,
    2118          318 :                         &mut delta_values,
    2119          318 :                         image_layer_writer.as_mut(),
    2120          318 :                         &mut stat,
    2121          318 :                         ctx,
    2122          318 :                     )
    2123          310 :                     .await?;
    2124          318 :                 delta_layers.extend(
    2125          318 :                     flush_deltas(
    2126          318 :                         &mut delta_values,
    2127          318 :                         *last_key,
    2128          318 :                         &delta_split_points,
    2129          318 :                         &mut current_delta_split_point,
    2130          318 :                         self,
    2131          318 :                         lowest_retain_lsn,
    2132          318 :                         ctx,
    2133          318 :                         &mut stat,
    2134          318 :                         dry_run,
    2135          318 :                         false,
    2136          318 :                     )
    2137           71 :                     .await?,
    2138              :                 );
    2139          318 :                 accumulated_values.clear();
    2140          318 :                 *last_key = key;
    2141          318 :                 accumulated_values.push((key, lsn, val));
    2142              :             }
    2143              :         }
    2144              : 
    2145           20 :         let last_key = last_key.expect("no keys produced during compaction");
    2146           20 :         // TODO: move this part to the loop body
    2147           20 :         stat.on_unique_key_visited();
    2148           20 :         let retention = self
    2149           20 :             .generate_key_retention(
    2150           20 :                 last_key,
    2151           20 :                 &accumulated_values,
    2152           20 :                 gc_cutoff,
    2153           20 :                 &retain_lsns_below_horizon,
    2154           20 :                 COMPACTION_DELTA_THRESHOLD,
    2155           20 :                 get_ancestor_image(self, last_key, ctx).await?,
    2156              :             )
    2157            0 :             .await?;
    2158              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    2159           20 :         retention
    2160           20 :             .pipe_to(
    2161           20 :                 last_key,
    2162           20 :                 &mut delta_values,
    2163           20 :                 image_layer_writer.as_mut(),
    2164           20 :                 &mut stat,
    2165           20 :                 ctx,
    2166           20 :             )
    2167           18 :             .await?;
    2168           20 :         delta_layers.extend(
    2169           20 :             flush_deltas(
    2170           20 :                 &mut delta_values,
    2171           20 :                 last_key,
    2172           20 :                 &delta_split_points,
    2173           20 :                 &mut current_delta_split_point,
    2174           20 :                 self,
    2175           20 :                 lowest_retain_lsn,
    2176           20 :                 ctx,
    2177           20 :                 &mut stat,
    2178           20 :                 dry_run,
    2179           20 :                 true,
    2180           20 :             )
    2181            7 :             .await?,
    2182              :         );
    2183           20 :         assert!(delta_values.is_empty(), "unprocessed keys");
    2184              : 
    2185           20 :         let image_layer = if discard_image_layer {
    2186            6 :             stat.discard_image_layer();
    2187            6 :             None
    2188           14 :         } else if let Some(writer) = image_layer_writer {
    2189           12 :             stat.produce_image_layer(writer.size());
    2190           12 :             if !dry_run {
    2191           20 :                 Some(writer.finish(self, ctx).await?)
    2192              :             } else {
    2193            2 :                 None
    2194              :             }
    2195              :         } else {
    2196            2 :             None
    2197              :         };
    2198              : 
    2199           20 :         info!(
    2200            0 :             "gc-compaction statistics: {}",
    2201            0 :             serde_json::to_string(&stat)?
    2202              :         );
    2203              : 
    2204           20 :         if dry_run {
    2205            2 :             return Ok(());
    2206           18 :         }
    2207           18 : 
    2208           18 :         info!(
    2209            0 :             "produced {} delta layers and {} image layers",
    2210            0 :             delta_layers.len(),
    2211            0 :             if image_layer.is_some() { 1 } else { 0 }
    2212              :         );
    2213           18 :         let mut compact_to = Vec::new();
    2214           18 :         let mut keep_layers = HashSet::new();
    2215           68 :         for action in delta_layers {
    2216           50 :             match action {
    2217           24 :                 FlushDeltaResult::CreateResidentLayer(layer) => {
    2218           24 :                     compact_to.push(layer);
    2219           24 :                 }
    2220           26 :                 FlushDeltaResult::KeepLayer(l) => {
    2221           26 :                     keep_layers.insert(l);
    2222           26 :                 }
    2223              :             }
    2224              :         }
    2225           18 :         if discard_image_layer {
    2226            6 :             keep_layers.insert(image_layer_key);
    2227           12 :         }
    2228           18 :         let mut layer_selection = layer_selection;
    2229           72 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2230           18 :         compact_to.extend(image_layer);
    2231              : 
    2232              :         // Step 3: Place back to the layer map.
    2233              :         {
    2234           18 :             let mut guard = self.layers.write().await;
    2235           18 :             guard
    2236           18 :                 .open_mut()?
    2237           18 :                 .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2238           18 :         };
    2239           18 :         self.remote_client
    2240           18 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2241              : 
    2242           18 :         drop(gc_lock);
    2243           18 : 
    2244           18 :         Ok(())
    2245           20 :     }
    2246              : }
    2247              : 
    2248              : struct TimelineAdaptor {
    2249              :     timeline: Arc<Timeline>,
    2250              : 
    2251              :     keyspace: (Lsn, KeySpace),
    2252              : 
    2253              :     new_deltas: Vec<ResidentLayer>,
    2254              :     new_images: Vec<ResidentLayer>,
    2255              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2256              : }
    2257              : 
    2258              : impl TimelineAdaptor {
    2259            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2260            0 :         Self {
    2261            0 :             timeline: timeline.clone(),
    2262            0 :             keyspace,
    2263            0 :             new_images: Vec::new(),
    2264            0 :             new_deltas: Vec::new(),
    2265            0 :             layers_to_delete: Vec::new(),
    2266            0 :         }
    2267            0 :     }
    2268              : 
    2269            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2270            0 :         let layers_to_delete = {
    2271            0 :             let guard = self.timeline.layers.read().await;
    2272            0 :             self.layers_to_delete
    2273            0 :                 .iter()
    2274            0 :                 .map(|x| guard.get_from_desc(x))
    2275            0 :                 .collect::<Vec<Layer>>()
    2276            0 :         };
    2277            0 :         self.timeline
    2278            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2279            0 :             .await?;
    2280              : 
    2281            0 :         self.timeline
    2282            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2283              : 
    2284            0 :         self.new_deltas.clear();
    2285            0 :         self.layers_to_delete.clear();
    2286            0 :         Ok(())
    2287            0 :     }
    2288              : }
    2289              : 
    2290              : #[derive(Clone)]
    2291              : struct ResidentDeltaLayer(ResidentLayer);
    2292              : #[derive(Clone)]
    2293              : struct ResidentImageLayer(ResidentLayer);
    2294              : 
    2295              : impl CompactionJobExecutor for TimelineAdaptor {
    2296              :     type Key = crate::repository::Key;
    2297              : 
    2298              :     type Layer = OwnArc<PersistentLayerDesc>;
    2299              :     type DeltaLayer = ResidentDeltaLayer;
    2300              :     type ImageLayer = ResidentImageLayer;
    2301              : 
    2302              :     type RequestContext = crate::context::RequestContext;
    2303              : 
    2304            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2305            0 :         self.timeline.get_shard_identity()
    2306            0 :     }
    2307              : 
    2308            0 :     async fn get_layers(
    2309            0 :         &mut self,
    2310            0 :         key_range: &Range<Key>,
    2311            0 :         lsn_range: &Range<Lsn>,
    2312            0 :         _ctx: &RequestContext,
    2313            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2314            0 :         self.flush_updates().await?;
    2315              : 
    2316            0 :         let guard = self.timeline.layers.read().await;
    2317            0 :         let layer_map = guard.layer_map()?;
    2318              : 
    2319            0 :         let result = layer_map
    2320            0 :             .iter_historic_layers()
    2321            0 :             .filter(|l| {
    2322            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2323            0 :             })
    2324            0 :             .map(OwnArc)
    2325            0 :             .collect();
    2326            0 :         Ok(result)
    2327            0 :     }
    2328              : 
    2329            0 :     async fn get_keyspace(
    2330            0 :         &mut self,
    2331            0 :         key_range: &Range<Key>,
    2332            0 :         lsn: Lsn,
    2333            0 :         _ctx: &RequestContext,
    2334            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2335            0 :         if lsn == self.keyspace.0 {
    2336            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2337            0 :                 &self.keyspace.1.ranges,
    2338            0 :                 key_range,
    2339            0 :             ))
    2340              :         } else {
    2341              :             // The current compaction implementation only ever requests the key space
    2342              :             // at the compaction end LSN.
    2343            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2344              :         }
    2345            0 :     }
    2346              : 
    2347            0 :     async fn downcast_delta_layer(
    2348            0 :         &self,
    2349            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2350            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2351            0 :         // this is a lot more complex than a simple downcast...
    2352            0 :         if layer.is_delta() {
    2353            0 :             let l = {
    2354            0 :                 let guard = self.timeline.layers.read().await;
    2355            0 :                 guard.get_from_desc(layer)
    2356              :             };
    2357            0 :             let result = l.download_and_keep_resident().await?;
    2358              : 
    2359            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2360              :         } else {
    2361            0 :             Ok(None)
    2362              :         }
    2363            0 :     }
    2364              : 
    2365            0 :     async fn create_image(
    2366            0 :         &mut self,
    2367            0 :         lsn: Lsn,
    2368            0 :         key_range: &Range<Key>,
    2369            0 :         ctx: &RequestContext,
    2370            0 :     ) -> anyhow::Result<()> {
    2371            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2372            0 :     }
    2373              : 
    2374            0 :     async fn create_delta(
    2375            0 :         &mut self,
    2376            0 :         lsn_range: &Range<Lsn>,
    2377            0 :         key_range: &Range<Key>,
    2378            0 :         input_layers: &[ResidentDeltaLayer],
    2379            0 :         ctx: &RequestContext,
    2380            0 :     ) -> anyhow::Result<()> {
    2381            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2382              : 
    2383            0 :         let mut all_entries = Vec::new();
    2384            0 :         for dl in input_layers.iter() {
    2385            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2386              :         }
    2387              : 
    2388              :         // The current stdlib sorting implementation is designed in a way where it is
    2389              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2390            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2391              : 
    2392            0 :         let mut writer = DeltaLayerWriter::new(
    2393            0 :             self.timeline.conf,
    2394            0 :             self.timeline.timeline_id,
    2395            0 :             self.timeline.tenant_shard_id,
    2396            0 :             key_range.start,
    2397            0 :             lsn_range.clone(),
    2398            0 :             ctx,
    2399            0 :         )
    2400            0 :         .await?;
    2401              : 
    2402            0 :         let mut dup_values = 0;
    2403            0 : 
    2404            0 :         // This iterator walks through all key-value pairs from all the layers
    2405            0 :         // we're compacting, in key, LSN order.
    2406            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2407              :         for &DeltaEntry {
    2408            0 :             key, lsn, ref val, ..
    2409            0 :         } in all_entries.iter()
    2410              :         {
    2411            0 :             if prev == Some((key, lsn)) {
    2412              :                 // This is a duplicate. Skip it.
    2413              :                 //
    2414              :                 // It can happen if compaction is interrupted after writing some
    2415              :                 // layers but not all, and we are compacting the range again.
    2416              :                 // The calculations in the algorithm assume that there are no
    2417              :                 // duplicates, so the math on targeted file size is likely off,
    2418              :                 // and we will create smaller files than expected.
    2419            0 :                 dup_values += 1;
    2420            0 :                 continue;
    2421            0 :             }
    2422              : 
    2423            0 :             let value = val.load(ctx).await?;
    2424              : 
    2425            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2426              : 
    2427            0 :             prev = Some((key, lsn));
    2428              :         }
    2429              : 
    2430            0 :         if dup_values > 0 {
    2431            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2432            0 :         }
    2433              : 
    2434            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2435            0 :             Err(anyhow::anyhow!(
    2436            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2437            0 :             ))
    2438            0 :         });
    2439              : 
    2440            0 :         let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
    2441            0 :         let new_delta_layer =
    2442            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
    2443              : 
    2444            0 :         self.new_deltas.push(new_delta_layer);
    2445            0 :         Ok(())
    2446            0 :     }
    2447              : 
    2448            0 :     async fn delete_layer(
    2449            0 :         &mut self,
    2450            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2451            0 :         _ctx: &RequestContext,
    2452            0 :     ) -> anyhow::Result<()> {
    2453            0 :         self.layers_to_delete.push(layer.clone().0);
    2454            0 :         Ok(())
    2455            0 :     }
    2456              : }
    2457              : 
    2458              : impl TimelineAdaptor {
    2459            0 :     async fn create_image_impl(
    2460            0 :         &mut self,
    2461            0 :         lsn: Lsn,
    2462            0 :         key_range: &Range<Key>,
    2463            0 :         ctx: &RequestContext,
    2464            0 :     ) -> Result<(), CreateImageLayersError> {
    2465            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2466              : 
    2467            0 :         let image_layer_writer = ImageLayerWriter::new(
    2468            0 :             self.timeline.conf,
    2469            0 :             self.timeline.timeline_id,
    2470            0 :             self.timeline.tenant_shard_id,
    2471            0 :             key_range,
    2472            0 :             lsn,
    2473            0 :             ctx,
    2474            0 :         )
    2475            0 :         .await?;
    2476              : 
    2477            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2478            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2479            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2480            0 :             )))
    2481            0 :         });
    2482              : 
    2483            0 :         let keyspace = KeySpace {
    2484            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2485              :         };
    2486              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2487            0 :         let start = Key::MIN;
    2488              :         let ImageLayerCreationOutcome {
    2489            0 :             image,
    2490              :             next_start_key: _,
    2491            0 :         } = self
    2492            0 :             .timeline
    2493            0 :             .create_image_layer_for_rel_blocks(
    2494            0 :                 &keyspace,
    2495            0 :                 image_layer_writer,
    2496            0 :                 lsn,
    2497            0 :                 ctx,
    2498            0 :                 key_range.clone(),
    2499            0 :                 start,
    2500            0 :             )
    2501            0 :             .await?;
    2502              : 
    2503            0 :         if let Some(image_layer) = image {
    2504            0 :             self.new_images.push(image_layer);
    2505            0 :         }
    2506              : 
    2507            0 :         timer.stop_and_record();
    2508            0 : 
    2509            0 :         Ok(())
    2510            0 :     }
    2511              : }
    2512              : 
    2513              : impl CompactionRequestContext for crate::context::RequestContext {}
    2514              : 
    2515              : #[derive(Debug, Clone)]
    2516              : pub struct OwnArc<T>(pub Arc<T>);
    2517              : 
    2518              : impl<T> Deref for OwnArc<T> {
    2519              :     type Target = <Arc<T> as Deref>::Target;
    2520            0 :     fn deref(&self) -> &Self::Target {
    2521            0 :         &self.0
    2522            0 :     }
    2523              : }
    2524              : 
    2525              : impl<T> AsRef<T> for OwnArc<T> {
    2526            0 :     fn as_ref(&self) -> &T {
    2527            0 :         self.0.as_ref()
    2528            0 :     }
    2529              : }
    2530              : 
    2531              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2532            0 :     fn key_range(&self) -> &Range<Key> {
    2533            0 :         &self.key_range
    2534            0 :     }
    2535            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2536            0 :         &self.lsn_range
    2537            0 :     }
    2538            0 :     fn file_size(&self) -> u64 {
    2539            0 :         self.file_size
    2540            0 :     }
    2541            0 :     fn short_id(&self) -> std::string::String {
    2542            0 :         self.as_ref().short_id().to_string()
    2543            0 :     }
    2544            0 :     fn is_delta(&self) -> bool {
    2545            0 :         self.as_ref().is_delta()
    2546            0 :     }
    2547              : }
    2548              : 
    2549              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2550            0 :     fn key_range(&self) -> &Range<Key> {
    2551            0 :         &self.layer_desc().key_range
    2552            0 :     }
    2553            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2554            0 :         &self.layer_desc().lsn_range
    2555            0 :     }
    2556            0 :     fn file_size(&self) -> u64 {
    2557            0 :         self.layer_desc().file_size
    2558            0 :     }
    2559            0 :     fn short_id(&self) -> std::string::String {
    2560            0 :         self.layer_desc().short_id().to_string()
    2561            0 :     }
    2562            0 :     fn is_delta(&self) -> bool {
    2563            0 :         true
    2564            0 :     }
    2565              : }
    2566              : 
    2567              : use crate::tenant::timeline::DeltaEntry;
    2568              : 
    2569              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2570            0 :     fn key_range(&self) -> &Range<Key> {
    2571            0 :         &self.0.layer_desc().key_range
    2572            0 :     }
    2573            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2574            0 :         &self.0.layer_desc().lsn_range
    2575            0 :     }
    2576            0 :     fn file_size(&self) -> u64 {
    2577            0 :         self.0.layer_desc().file_size
    2578            0 :     }
    2579            0 :     fn short_id(&self) -> std::string::String {
    2580            0 :         self.0.layer_desc().short_id().to_string()
    2581            0 :     }
    2582            0 :     fn is_delta(&self) -> bool {
    2583            0 :         true
    2584            0 :     }
    2585              : }
    2586              : 
    2587              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2588              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2589              : 
    2590            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2591            0 :         self.0.load_keys(ctx).await
    2592            0 :     }
    2593              : }
    2594              : 
    2595              : impl CompactionLayer<Key> for ResidentImageLayer {
    2596            0 :     fn key_range(&self) -> &Range<Key> {
    2597            0 :         &self.0.layer_desc().key_range
    2598            0 :     }
    2599            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2600            0 :         &self.0.layer_desc().lsn_range
    2601            0 :     }
    2602            0 :     fn file_size(&self) -> u64 {
    2603            0 :         self.0.layer_desc().file_size
    2604            0 :     }
    2605            0 :     fn short_id(&self) -> std::string::String {
    2606            0 :         self.0.layer_desc().short_id().to_string()
    2607            0 :     }
    2608            0 :     fn is_delta(&self) -> bool {
    2609            0 :         false
    2610            0 :     }
    2611              : }
    2612              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta