LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 60.7 % 1695 1029
Test Date: 2024-08-02 21:34:27 Functions: 27.7 % 141 39

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::{BinaryHeap, HashSet};
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context};
      18              : use bytes::Bytes;
      19              : use enumset::EnumSet;
      20              : use fail::fail_point;
      21              : use itertools::Itertools;
      22              : use pageserver_api::keyspace::ShardedRange;
      23              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      24              : use tokio_util::sync::CancellationToken;
      25              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      26              : use utils::id::TimelineId;
      27              : 
      28              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      29              : use crate::page_cache;
      30              : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
      31              : use crate::tenant::remote_timeline_client::WaitCompletionError;
      32              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      33              : use crate::tenant::storage_layer::{
      34              :     AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
      35              : };
      36              : use crate::tenant::timeline::ImageLayerCreationOutcome;
      37              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      38              : use crate::tenant::timeline::{Layer, ResidentLayer};
      39              : use crate::tenant::DeltaLayer;
      40              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      41              : 
      42              : use crate::keyspace::KeySpace;
      43              : use crate::repository::{Key, Value};
      44              : 
      45              : use utils::lsn::Lsn;
      46              : 
      47              : use pageserver_compaction::helpers::overlaps_with;
      48              : use pageserver_compaction::interface::*;
      49              : 
      50              : use super::CompactionError;
      51              : 
      52              : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
      53              : const COMPACTION_DELTA_THRESHOLD: usize = 5;
      54              : 
      55              : /// The result of bottom-most compaction for a single key at each LSN.
      56              : #[derive(Debug)]
      57              : #[cfg_attr(test, derive(PartialEq))]
      58              : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
      59              : 
      60              : /// The result of bottom-most compaction.
      61              : #[derive(Debug)]
      62              : #[cfg_attr(test, derive(PartialEq))]
      63              : pub(crate) struct KeyHistoryRetention {
      64              :     /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
      65              :     pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
      66              :     /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
      67              :     pub(crate) above_horizon: KeyLogAtLsn,
      68              : }
      69              : 
      70              : impl KeyHistoryRetention {
      71          302 :     async fn pipe_to(
      72          302 :         self,
      73          302 :         key: Key,
      74          302 :         delta_writer: &mut Vec<(Key, Lsn, Value)>,
      75          302 :         mut image_writer: Option<&mut ImageLayerWriter>,
      76          302 :         ctx: &RequestContext,
      77          302 :     ) -> anyhow::Result<()> {
      78          302 :         let mut first_batch = true;
      79          906 :         for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
      80          604 :             if first_batch {
      81          302 :                 if logs.len() == 1 && logs[0].1.is_image() {
      82          288 :                     let Value::Image(img) = &logs[0].1 else {
      83            0 :                         unreachable!()
      84              :                     };
      85          288 :                     if let Some(image_writer) = image_writer.as_mut() {
      86          291 :                         image_writer.put_image(key, img.clone(), ctx).await?;
      87            0 :                     } else {
      88            0 :                         delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
      89            0 :                     }
      90              :                 } else {
      91           28 :                     for (lsn, val) in logs {
      92           14 :                         delta_writer.push((key, lsn, val));
      93           14 :                     }
      94              :                 }
      95          302 :                 first_batch = false;
      96              :             } else {
      97          354 :                 for (lsn, val) in logs {
      98           52 :                     delta_writer.push((key, lsn, val));
      99           52 :                 }
     100              :             }
     101              :         }
     102          302 :         let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
     103          314 :         for (lsn, val) in above_horizon_logs {
     104           12 :             delta_writer.push((key, lsn, val));
     105           12 :         }
     106          302 :         Ok(())
     107          302 :     }
     108              : }
     109              : 
     110              : impl Timeline {
     111              :     /// TODO: cancellation
     112              :     ///
     113              :     /// Returns whether the compaction has pending tasks.
     114          364 :     pub(crate) async fn compact_legacy(
     115          364 :         self: &Arc<Self>,
     116          364 :         cancel: &CancellationToken,
     117          364 :         flags: EnumSet<CompactFlags>,
     118          364 :         ctx: &RequestContext,
     119          364 :     ) -> Result<bool, CompactionError> {
     120          364 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
     121            0 :             self.compact_with_gc(cancel, ctx)
     122            0 :                 .await
     123            0 :                 .map_err(CompactionError::Other)?;
     124            0 :             return Ok(false);
     125          364 :         }
     126          364 : 
     127          364 :         // High level strategy for compaction / image creation:
     128          364 :         //
     129          364 :         // 1. First, calculate the desired "partitioning" of the
     130          364 :         // currently in-use key space. The goal is to partition the
     131          364 :         // key space into roughly fixed-size chunks, but also take into
     132          364 :         // account any existing image layers, and try to align the
     133          364 :         // chunk boundaries with the existing image layers to avoid
     134          364 :         // too much churn. Also try to align chunk boundaries with
     135          364 :         // relation boundaries.  In principle, we don't know about
     136          364 :         // relation boundaries here, we just deal with key-value
     137          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     138          364 :         // map relations into key-value pairs. But in practice we know
     139          364 :         // that 'field6' is the block number, and the fields 1-5
     140          364 :         // identify a relation. This is just an optimization,
     141          364 :         // though.
     142          364 :         //
     143          364 :         // 2. Once we know the partitioning, for each partition,
     144          364 :         // decide if it's time to create a new image layer. The
     145          364 :         // criteria is: there has been too much "churn" since the last
     146          364 :         // image layer? The "churn" is fuzzy concept, it's a
     147          364 :         // combination of too many delta files, or too much WAL in
     148          364 :         // total in the delta file. Or perhaps: if creating an image
     149          364 :         // file would allow to delete some older files.
     150          364 :         //
     151          364 :         // 3. After that, we compact all level0 delta files if there
     152          364 :         // are too many of them.  While compacting, we also garbage
     153          364 :         // collect any page versions that are no longer needed because
     154          364 :         // of the new image layers we created in step 2.
     155          364 :         //
     156          364 :         // TODO: This high level strategy hasn't been implemented yet.
     157          364 :         // Below are functions compact_level0() and create_image_layers()
     158          364 :         // but they are a bit ad hoc and don't quite work like it's explained
     159          364 :         // above. Rewrite it.
     160          364 : 
     161          364 :         // Is the timeline being deleted?
     162          364 :         if self.is_stopping() {
     163            0 :             trace!("Dropping out of compaction on timeline shutdown");
     164            0 :             return Err(CompactionError::ShuttingDown);
     165          364 :         }
     166          364 : 
     167          364 :         let target_file_size = self.get_checkpoint_distance();
     168              : 
     169              :         // Define partitioning schema if needed
     170              : 
     171              :         // FIXME: the match should only cover repartitioning, not the next steps
     172          364 :         let (partition_count, has_pending_tasks) = match self
     173          364 :             .repartition(
     174          364 :                 self.get_last_record_lsn(),
     175          364 :                 self.get_compaction_target_size(),
     176          364 :                 flags,
     177          364 :                 ctx,
     178          364 :             )
     179        16708 :             .await
     180              :         {
     181          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     182          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     183          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     184          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     185          364 :                     .build();
     186          364 : 
     187          364 :                 // 2. Compact
     188          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     189        45834 :                 let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
     190          364 :                 timer.stop_and_record();
     191          364 : 
     192          364 :                 let mut partitioning = dense_partitioning;
     193          364 :                 partitioning
     194          364 :                     .parts
     195          364 :                     .extend(sparse_partitioning.into_dense().parts);
     196          364 : 
     197          364 :                 // 3. Create new image layers for partitions that have been modified
     198          364 :                 // "enough". Skip image layer creation if L0 compaction cannot keep up.
     199          364 :                 if fully_compacted {
     200          364 :                     let image_layers = self
     201          364 :                         .create_image_layers(
     202          364 :                             &partitioning,
     203          364 :                             lsn,
     204          364 :                             if flags.contains(CompactFlags::ForceImageLayerCreation) {
     205           14 :                                 ImageLayerCreationMode::Force
     206              :                             } else {
     207          350 :                                 ImageLayerCreationMode::Try
     208              :                             },
     209          364 :                             &image_ctx,
     210              :                         )
     211        14446 :                         .await?;
     212              : 
     213          364 :                     self.upload_new_image_layers(image_layers)?;
     214              :                 } else {
     215            0 :                     info!("skipping image layer generation due to L0 compaction did not include all layers.");
     216              :                 }
     217          364 :                 (partitioning.parts.len(), !fully_compacted)
     218              :             }
     219            0 :             Err(err) => {
     220            0 :                 // no partitioning? This is normal, if the timeline was just created
     221            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     222            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     223            0 :                 // error but continue.
     224            0 :                 //
     225            0 :                 // Suppress error when it's due to cancellation
     226            0 :                 if !self.cancel.is_cancelled() {
     227            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     228            0 :                 }
     229            0 :                 (1, false)
     230              :             }
     231              :         };
     232              : 
     233          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     234              :             // Limit the number of layer rewrites to the number of partitions: this means its
     235              :             // runtime should be comparable to a full round of image layer creations, rather than
     236              :             // being potentially much longer.
     237            0 :             let rewrite_max = partition_count;
     238            0 : 
     239            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     240          364 :         }
     241              : 
     242          364 :         Ok(has_pending_tasks)
     243          364 :     }
     244              : 
     245              :     /// Check for layers that are elegible to be rewritten:
     246              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     247              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     248              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     249              :     ///   rather not maintain compatibility with indefinitely.
     250              :     ///
     251              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     252              :     /// how much work it will try to do in each compaction pass.
     253            0 :     async fn compact_shard_ancestors(
     254            0 :         self: &Arc<Self>,
     255            0 :         rewrite_max: usize,
     256            0 :         ctx: &RequestContext,
     257            0 :     ) -> Result<(), CompactionError> {
     258            0 :         let mut drop_layers = Vec::new();
     259            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     260            0 : 
     261            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     262            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     263            0 :         // pitr_interval, for example because a branchpoint references it.
     264            0 :         //
     265            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     266            0 :         // are rewriting layers.
     267            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     268            0 : 
     269            0 :         tracing::info!(
     270            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     271            0 :             *latest_gc_cutoff,
     272            0 :             self.gc_info.read().unwrap().cutoffs.time
     273              :         );
     274              : 
     275            0 :         let layers = self.layers.read().await;
     276            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     277            0 :             let layer = layers.get_from_desc(&layer_desc);
     278            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     279              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     280            0 :                 continue;
     281            0 :             }
     282            0 : 
     283            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     284            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     285            0 :             let layer_local_page_count = sharded_range.page_count();
     286            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     287            0 :             if layer_local_page_count == 0 {
     288              :                 // This ancestral layer only covers keys that belong to other shards.
     289              :                 // We include the full metadata in the log: if we had some critical bug that caused
     290              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     291            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     292            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     293              :                 );
     294              : 
     295            0 :                 if cfg!(debug_assertions) {
     296              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     297              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     298              :                     // should be !is_key_disposable()
     299            0 :                     let range = layer_desc.get_key_range();
     300            0 :                     let mut key = range.start;
     301            0 :                     while key < range.end {
     302            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     303            0 :                         key = key.next();
     304              :                     }
     305            0 :                 }
     306              : 
     307            0 :                 drop_layers.push(layer);
     308            0 :                 continue;
     309            0 :             } else if layer_local_page_count != u32::MAX
     310            0 :                 && layer_local_page_count == layer_raw_page_count
     311              :             {
     312            0 :                 debug!(%layer,
     313            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     314              :                     layer_local_page_count
     315              :                 );
     316            0 :                 continue;
     317            0 :             }
     318            0 : 
     319            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     320            0 :             if layer_local_page_count != u32::MAX
     321            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     322              :             {
     323            0 :                 debug!(%layer,
     324            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     325              :                     layer_local_page_count,
     326              :                     layer_raw_page_count
     327              :                 );
     328            0 :             }
     329              : 
     330              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     331              :             // without incurring the I/O cost of a rewrite.
     332            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     333            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     334            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     335            0 :                 continue;
     336            0 :             }
     337            0 : 
     338            0 :             if layer_desc.is_delta() {
     339              :                 // We do not yet implement rewrite of delta layers
     340            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     341            0 :                 continue;
     342            0 :             }
     343            0 : 
     344            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     345            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     346            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     347            0 :             if layer.metadata().generation == self.generation {
     348            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     349            0 :                 continue;
     350            0 :             }
     351            0 : 
     352            0 :             if layers_to_rewrite.len() >= rewrite_max {
     353            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     354            0 :                     layers_to_rewrite.len()
     355              :                 );
     356            0 :                 continue;
     357            0 :             }
     358            0 : 
     359            0 :             // Fall through: all our conditions for doing a rewrite passed.
     360            0 :             layers_to_rewrite.push(layer);
     361              :         }
     362              : 
     363              :         // Drop read lock on layer map before we start doing time-consuming I/O
     364            0 :         drop(layers);
     365            0 : 
     366            0 :         let mut replace_image_layers = Vec::new();
     367              : 
     368            0 :         for layer in layers_to_rewrite {
     369            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     370            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     371            0 :                 self.conf,
     372            0 :                 self.timeline_id,
     373            0 :                 self.tenant_shard_id,
     374            0 :                 &layer.layer_desc().key_range,
     375            0 :                 layer.layer_desc().image_layer_lsn(),
     376            0 :                 ctx,
     377            0 :             )
     378            0 :             .await
     379            0 :             .map_err(CompactionError::Other)?;
     380              : 
     381              :             // Safety of layer rewrites:
     382              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     383              :             //   cannot interfere with the new one.
     384              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     385              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     386              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     387              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     388              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     389              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     390              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     391              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     392              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     393              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     394            0 :             let resident = layer
     395            0 :                 .download_and_keep_resident()
     396            0 :                 .await
     397            0 :                 .map_err(CompactionError::input_layer_download_failed)?;
     398              : 
     399            0 :             let keys_written = resident
     400            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     401            0 :                 .await?;
     402              : 
     403            0 :             if keys_written > 0 {
     404            0 :                 let new_layer = image_layer_writer
     405            0 :                     .finish(self, ctx)
     406            0 :                     .await
     407            0 :                     .map_err(CompactionError::Other)?;
     408            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     409            0 :                     layer.metadata().file_size,
     410            0 :                     new_layer.metadata().file_size);
     411              : 
     412            0 :                 replace_image_layers.push((layer, new_layer));
     413            0 :             } else {
     414            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     415            0 :                 // the layer has no data for us with the ShardedRange check above, but
     416            0 :                 drop_layers.push(layer);
     417            0 :             }
     418              :         }
     419              : 
     420              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     421              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     422              :         // to remote index) and be removed. This is inefficient but safe.
     423              :         fail::fail_point!("compact-shard-ancestors-localonly");
     424              : 
     425              :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     426            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     427            0 :             .await?;
     428              : 
     429              :         fail::fail_point!("compact-shard-ancestors-enqueued");
     430              : 
     431              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     432              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     433              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     434              :         // load.
     435            0 :         match self.remote_client.wait_completion().await {
     436            0 :             Ok(()) => (),
     437            0 :             Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
     438              :             Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
     439            0 :                 return Err(CompactionError::ShuttingDown)
     440              :             }
     441              :         }
     442              : 
     443              :         fail::fail_point!("compact-shard-ancestors-persistent");
     444              : 
     445            0 :         Ok(())
     446            0 :     }
     447              : 
     448              :     /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
     449              :     /// an image layer between them and the most recent readable LSN (branch point or tip of timeline).  The
     450              :     /// purpose of the visibility hint is to record which layers need to be available to service reads.
     451              :     ///
     452              :     /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
     453              :     /// that we know won't be needed for reads.
     454          528 :     pub(super) async fn update_layer_visibility(&self) {
     455          528 :         let head_lsn = self.get_last_record_lsn();
     456              : 
     457              :         // We will sweep through layers in reverse-LSN order.  We only do historic layers.  L0 deltas
     458              :         // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
     459              :         // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
     460              :         // they will be subject to L0->L1 compaction in the near future.
     461          528 :         let layer_manager = self.layers.read().await;
     462          528 :         let layer_map = layer_manager.layer_map();
     463              : 
     464          528 :         let readable_points = {
     465          528 :             let children = self.gc_info.read().unwrap().retain_lsns.clone();
     466          528 : 
     467          528 :             let mut readable_points = Vec::with_capacity(children.len() + 1);
     468          528 :             for (child_lsn, _child_timeline_id) in &children {
     469            0 :                 readable_points.push(*child_lsn);
     470            0 :             }
     471          528 :             readable_points.push(head_lsn);
     472          528 :             readable_points
     473          528 :         };
     474          528 : 
     475          528 :         let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
     476         3012 :         for (layer_desc, visibility) in layer_visibility {
     477         2484 :             // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
     478         2484 :             let layer = layer_manager.get_from_desc(&layer_desc);
     479         2484 :             layer.set_visibility(visibility);
     480         2484 :         }
     481              : 
     482              :         // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
     483              :         // avoid assuming that everything at a branch point is visible.
     484          528 :         drop(covered);
     485          528 :     }
     486              : 
     487              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     488              :     /// as Level 1 files. Returns whether the L0 layers are fully compacted.
     489          364 :     async fn compact_level0(
     490          364 :         self: &Arc<Self>,
     491          364 :         target_file_size: u64,
     492          364 :         ctx: &RequestContext,
     493          364 :     ) -> Result<bool, CompactionError> {
     494              :         let CompactLevel0Phase1Result {
     495          364 :             new_layers,
     496          364 :             deltas_to_compact,
     497          364 :             fully_compacted,
     498              :         } = {
     499          364 :             let phase1_span = info_span!("compact_level0_phase1");
     500          364 :             let ctx = ctx.attached_child();
     501          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     502          364 :                 version: Some(2),
     503          364 :                 tenant_id: Some(self.tenant_shard_id),
     504          364 :                 timeline_id: Some(self.timeline_id),
     505          364 :                 ..Default::default()
     506          364 :             };
     507          364 : 
     508          364 :             let begin = tokio::time::Instant::now();
     509          364 :             let phase1_layers_locked = self.layers.read().await;
     510          364 :             let now = tokio::time::Instant::now();
     511          364 :             stats.read_lock_acquisition_micros =
     512          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     513          364 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     514          364 :                 .instrument(phase1_span)
     515        45834 :                 .await?
     516              :         };
     517              : 
     518          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     519              :             // nothing to do
     520          336 :             return Ok(true);
     521           28 :         }
     522           28 : 
     523           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     524            0 :             .await?;
     525           28 :         Ok(fully_compacted)
     526          364 :     }
     527              : 
     528              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     529          364 :     async fn compact_level0_phase1<'a>(
     530          364 :         self: &'a Arc<Self>,
     531          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     532          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     533          364 :         target_file_size: u64,
     534          364 :         ctx: &RequestContext,
     535          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     536          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     537          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     538          364 :         let layers = guard.layer_map();
     539          364 :         let level0_deltas = layers.get_level0_deltas();
     540          364 :         let mut level0_deltas = level0_deltas
     541          364 :             .into_iter()
     542         1616 :             .map(|x| guard.get_from_desc(&x))
     543          364 :             .collect_vec();
     544          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     545          364 : 
     546          364 :         // Only compact if enough layers have accumulated.
     547          364 :         let threshold = self.get_compaction_threshold();
     548          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     549          336 :             debug!(
     550            0 :                 level0_deltas = level0_deltas.len(),
     551            0 :                 threshold, "too few deltas to compact"
     552              :             );
     553          336 :             return Ok(CompactLevel0Phase1Result::default());
     554           28 :         }
     555           28 : 
     556           28 :         // Gather the files to compact in this iteration.
     557           28 :         //
     558           28 :         // Start with the oldest Level 0 delta file, and collect any other
     559           28 :         // level 0 files that form a contiguous sequence, such that the end
     560           28 :         // LSN of previous file matches the start LSN of the next file.
     561           28 :         //
     562           28 :         // Note that if the files don't form such a sequence, we might
     563           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     564           28 :         // us to get rid of the level 0 file, and compact the other files on
     565           28 :         // the next iteration. This could probably made smarter, but such
     566           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     567           28 :         // of a crash, partial download from cloud storage, or something like
     568           28 :         // that, so it's not a big deal in practice.
     569          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     570           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     571           28 : 
     572           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     573           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     574           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     575           28 : 
     576           28 :         // Accumulate the size of layers in `deltas_to_compact`
     577           28 :         let mut deltas_to_compact_bytes = 0;
     578           28 : 
     579           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     580           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     581           28 :         // work in this function to only operate on this much delta data at once.
     582           28 :         //
     583           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     584           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     585           28 :         // still let them compact a full stack of L0s in one go.
     586           28 :         let delta_size_limit = std::cmp::max(
     587           28 :             self.get_compaction_threshold(),
     588           28 :             DEFAULT_COMPACTION_THRESHOLD,
     589           28 :         ) as u64
     590           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     591           28 : 
     592           28 :         let mut fully_compacted = true;
     593           28 : 
     594           28 :         deltas_to_compact.push(
     595           28 :             first_level0_delta
     596           28 :                 .download_and_keep_resident()
     597            0 :                 .await
     598           28 :                 .map_err(CompactionError::input_layer_download_failed)?,
     599              :         );
     600          402 :         for l in level0_deltas_iter {
     601          374 :             let lsn_range = &l.layer_desc().lsn_range;
     602          374 : 
     603          374 :             if lsn_range.start != prev_lsn_end {
     604            0 :                 break;
     605          374 :             }
     606          374 :             deltas_to_compact.push(
     607          374 :                 l.download_and_keep_resident()
     608            0 :                     .await
     609          374 :                     .map_err(CompactionError::input_layer_download_failed)?,
     610              :             );
     611          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     612          374 :             prev_lsn_end = lsn_range.end;
     613          374 : 
     614          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     615            0 :                 info!(
     616            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     617            0 :                     l0_deltas_total = level0_deltas.len(),
     618            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     619              :                     delta_size_limit
     620              :                 );
     621            0 :                 fully_compacted = false;
     622            0 : 
     623            0 :                 // Proceed with compaction, but only a subset of L0s
     624            0 :                 break;
     625          374 :             }
     626              :         }
     627           28 :         let lsn_range = Range {
     628           28 :             start: deltas_to_compact
     629           28 :                 .first()
     630           28 :                 .unwrap()
     631           28 :                 .layer_desc()
     632           28 :                 .lsn_range
     633           28 :                 .start,
     634           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     635           28 :         };
     636           28 : 
     637           28 :         info!(
     638            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     639            0 :             lsn_range.start,
     640            0 :             lsn_range.end,
     641            0 :             deltas_to_compact.len(),
     642            0 :             level0_deltas.len()
     643              :         );
     644              : 
     645          402 :         for l in deltas_to_compact.iter() {
     646          402 :             info!("compact includes {l}");
     647              :         }
     648              : 
     649              :         // We don't need the original list of layers anymore. Drop it so that
     650              :         // we don't accidentally use it later in the function.
     651           28 :         drop(level0_deltas);
     652           28 : 
     653           28 :         stats.read_lock_held_prerequisites_micros = stats
     654           28 :             .read_lock_held_spawn_blocking_startup_micros
     655           28 :             .till_now();
     656              : 
     657              :         // TODO: replace with streaming k-merge
     658           28 :         let all_keys = {
     659           28 :             let mut all_keys = Vec::new();
     660          402 :             for l in deltas_to_compact.iter() {
     661         2363 :                 all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
     662              :             }
     663              :             // The current stdlib sorting implementation is designed in a way where it is
     664              :             // particularly fast where the slice is made up of sorted sub-ranges.
     665      4431788 :             all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     666           28 :             all_keys
     667           28 :         };
     668           28 : 
     669           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     670              : 
     671              :         // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
     672              :         //
     673              :         // A hole is a key range for which this compaction doesn't have any WAL records.
     674              :         // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
     675              :         // cover the hole, but actually don't contain any WAL records for that key range.
     676              :         // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
     677              :         // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
     678              :         //
     679              :         // The algorithm chooses holes as follows.
     680              :         // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
     681              :         // - Filter: min threshold on range length
     682              :         // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
     683              :         //
     684              :         // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
     685              :         #[derive(PartialEq, Eq)]
     686              :         struct Hole {
     687              :             key_range: Range<Key>,
     688              :             coverage_size: usize,
     689              :         }
     690           28 :         let holes: Vec<Hole> = {
     691              :             use std::cmp::Ordering;
     692              :             impl Ord for Hole {
     693            0 :                 fn cmp(&self, other: &Self) -> Ordering {
     694            0 :                     self.coverage_size.cmp(&other.coverage_size).reverse()
     695            0 :                 }
     696              :             }
     697              :             impl PartialOrd for Hole {
     698            0 :                 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     699            0 :                     Some(self.cmp(other))
     700            0 :                 }
     701              :             }
     702           28 :             let max_holes = deltas_to_compact.len();
     703           28 :             let last_record_lsn = self.get_last_record_lsn();
     704           28 :             let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     705           28 :             let min_hole_coverage_size = 3; // TODO: something more flexible?
     706           28 :                                             // min-heap (reserve space for one more element added before eviction)
     707           28 :             let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     708           28 :             let mut prev: Option<Key> = None;
     709              : 
     710      2064038 :             for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     711      2064038 :                 if let Some(prev_key) = prev {
     712              :                     // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     713              :                     // compaction is the gap between data key and metadata keys.
     714      2064010 :                     if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     715            0 :                         && !Key::is_metadata_key(&prev_key)
     716              :                     {
     717            0 :                         let key_range = prev_key..next_key;
     718            0 :                         // Measuring hole by just subtraction of i128 representation of key range boundaries
     719            0 :                         // has not so much sense, because largest holes will corresponds field1/field2 changes.
     720            0 :                         // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     721            0 :                         // That is why it is better to measure size of hole as number of covering image layers.
     722            0 :                         let coverage_size =
     723            0 :                             layers.image_coverage(&key_range, last_record_lsn).len();
     724            0 :                         if coverage_size >= min_hole_coverage_size {
     725            0 :                             heap.push(Hole {
     726            0 :                                 key_range,
     727            0 :                                 coverage_size,
     728            0 :                             });
     729            0 :                             if heap.len() > max_holes {
     730            0 :                                 heap.pop(); // remove smallest hole
     731            0 :                             }
     732            0 :                         }
     733      2064010 :                     }
     734           28 :                 }
     735      2064038 :                 prev = Some(next_key.next());
     736              :             }
     737           28 :             let mut holes = heap.into_vec();
     738           28 :             holes.sort_unstable_by_key(|hole| hole.key_range.start);
     739           28 :             holes
     740           28 :         };
     741           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     742           28 :         drop_rlock(guard);
     743           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     744              : 
     745              :         // This iterator walks through all key-value pairs from all the layers
     746              :         // we're compacting, in key, LSN order.
     747              :         // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
     748              :         // then the Value::Image is ordered before Value::WalRecord.
     749              :         //
     750              :         // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
     751              :         // option and validation code once we've reached confidence.
     752              :         enum AllValuesIter<'a> {
     753              :             PageCachedBlobIo {
     754              :                 all_keys_iter: VecIter<'a>,
     755              :             },
     756              :             StreamingKmergeBypassingPageCache {
     757              :                 merge_iter: MergeIterator<'a>,
     758              :             },
     759              :             ValidatingStreamingKmergeBypassingPageCache {
     760              :                 mode: CompactL0BypassPageCacheValidation,
     761              :                 merge_iter: MergeIterator<'a>,
     762              :                 all_keys_iter: VecIter<'a>,
     763              :             },
     764              :         }
     765              :         type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
     766              :         impl AllValuesIter<'_> {
     767      2064066 :             async fn next_all_keys_iter(
     768      2064066 :                 iter: &mut VecIter<'_>,
     769      2064066 :                 ctx: &RequestContext,
     770      2064066 :             ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     771              :                 let Some(DeltaEntry {
     772      2064038 :                     key,
     773      2064038 :                     lsn,
     774      2064038 :                     val: value_ref,
     775              :                     ..
     776      2064066 :                 }) = iter.next()
     777              :                 else {
     778           28 :                     return Ok(None);
     779              :                 };
     780      2064038 :                 let value = value_ref.load(ctx).await?;
     781      2064038 :                 Ok(Some((*key, *lsn, value)))
     782      2064066 :             }
     783      2064066 :             async fn next(
     784      2064066 :                 &mut self,
     785      2064066 :                 ctx: &RequestContext,
     786      2064066 :             ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     787      2064066 :                 match self {
     788            0 :                     AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
     789            0 :                       Self::next_all_keys_iter(iter, ctx).await
     790              :                     }
     791            0 :                     AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
     792      2064066 :                     AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
     793              :                         // advance both iterators
     794      2064066 :                         let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
     795      2064066 :                         let merge_iter_item = merge_iter.next().await;
     796              :                         // compare results & log warnings as needed
     797              :                         macro_rules! rate_limited_warn {
     798              :                             ($($arg:tt)*) => {{
     799              :                                 if cfg!(debug_assertions) || cfg!(feature = "testing") {
     800              :                                     warn!($($arg)*);
     801              :                                     panic!("CompactL0BypassPageCacheValidation failure, check logs");
     802              :                                 }
     803              :                                 use once_cell::sync::Lazy;
     804              :                                 use utils::rate_limit::RateLimit;
     805              :                                 use std::sync::Mutex;
     806              :                                 use std::time::Duration;
     807              :                                 static LOGGED: Lazy<Mutex<RateLimit>> =
     808            0 :                                     Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
     809              :                                 let mut rate_limit = LOGGED.lock().unwrap();
     810            0 :                                 rate_limit.call(|| {
     811            0 :                                     warn!($($arg)*);
     812            0 :                                 });
     813              :                             }}
     814              :                         }
     815      2064066 :                         match (&all_keys_iter_item, &merge_iter_item) {
     816            0 :                             (Err(_), Err(_)) => {
     817            0 :                                 // don't bother asserting equivality of the errors
     818            0 :                             }
     819            0 :                             (Err(all_keys), Ok(merge)) => {
     820            0 :                                 rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
     821            0 :                             },
     822            0 :                             (Ok(all_keys), Err(merge)) => {
     823            0 :                                 rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
     824            0 :                             },
     825           28 :                             (Ok(None), Ok(None)) => { }
     826            0 :                             (Ok(Some(all_keys)), Ok(None)) => {
     827            0 :                                 rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
     828            0 :                             }
     829            0 :                             (Ok(None), Ok(Some(merge))) => {
     830            0 :                                 rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
     831            0 :                             }
     832      2064038 :                             (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
     833      2064038 :                                 match mode {
     834              :                                     // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
     835              :                                     CompactL0BypassPageCacheValidation::KeyLsn => {
     836            0 :                                         let all_keys = (all_keys_key, all_keys_lsn);
     837            0 :                                         let merge = (merge_key, merge_lsn);
     838            0 :                                         if all_keys != merge {
     839            0 :                                             rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
     840            0 :                                         }
     841              :                                     }
     842              :                                     CompactL0BypassPageCacheValidation::KeyLsnValue => {
     843      2064038 :                                         let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
     844      2064038 :                                         let merge = (merge_key, merge_lsn, merge_value);
     845      2064038 :                                         if all_keys != merge {
     846            0 :                                             rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
     847      2064038 :                                         }
     848              :                                     }
     849              :                                 }
     850              :                             }
     851              :                         }
     852              :                         // in case of mismatch, trust the legacy all_keys_iter_item
     853      2064066 :                         all_keys_iter_item
     854      2064066 :                     }.instrument(info_span!("next")).await
     855              :                 }
     856      2064066 :             }
     857              :         }
     858           28 :         let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
     859            0 :             CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
     860            0 :                 all_keys_iter: all_keys.iter(),
     861            0 :             },
     862           28 :             CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
     863           28 :                 let merge_iter = {
     864           28 :                     let mut deltas = Vec::with_capacity(deltas_to_compact.len());
     865          402 :                     for l in deltas_to_compact.iter() {
     866          402 :                         let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
     867          402 :                         deltas.push(l);
     868              :                     }
     869           28 :                     MergeIterator::create(&deltas, &[], ctx)
     870           28 :                 };
     871           28 :                 match validate {
     872            0 :                     None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
     873           28 :                     Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
     874           28 :                         mode: validate.clone(),
     875           28 :                         merge_iter,
     876           28 :                         all_keys_iter: all_keys.iter(),
     877           28 :                     },
     878              :                 }
     879              :             }
     880              :         };
     881              : 
     882              :         // This iterator walks through all keys and is needed to calculate size used by each key
     883           28 :         let mut all_keys_iter = all_keys
     884           28 :             .iter()
     885      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     886      2064010 :             .coalesce(|mut prev, cur| {
     887      2064010 :                 // Coalesce keys that belong to the same key pair.
     888      2064010 :                 // This ensures that compaction doesn't put them
     889      2064010 :                 // into different layer files.
     890      2064010 :                 // Still limit this by the target file size,
     891      2064010 :                 // so that we keep the size of the files in
     892      2064010 :                 // check.
     893      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     894        40038 :                     prev.2 += cur.2;
     895        40038 :                     Ok(prev)
     896              :                 } else {
     897      2023972 :                     Err((prev, cur))
     898              :                 }
     899      2064010 :             });
     900           28 : 
     901           28 :         // Merge the contents of all the input delta layers into a new set
     902           28 :         // of delta layers, based on the current partitioning.
     903           28 :         //
     904           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     905           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     906           28 :         // would be too large. In that case, we also split on the LSN dimension.
     907           28 :         //
     908           28 :         // LSN
     909           28 :         //  ^
     910           28 :         //  |
     911           28 :         //  | +-----------+            +--+--+--+--+
     912           28 :         //  | |           |            |  |  |  |  |
     913           28 :         //  | +-----------+            |  |  |  |  |
     914           28 :         //  | |           |            |  |  |  |  |
     915           28 :         //  | +-----------+     ==>    |  |  |  |  |
     916           28 :         //  | |           |            |  |  |  |  |
     917           28 :         //  | +-----------+            |  |  |  |  |
     918           28 :         //  | |           |            |  |  |  |  |
     919           28 :         //  | +-----------+            +--+--+--+--+
     920           28 :         //  |
     921           28 :         //  +--------------> key
     922           28 :         //
     923           28 :         //
     924           28 :         // If one key (X) has a lot of page versions:
     925           28 :         //
     926           28 :         // LSN
     927           28 :         //  ^
     928           28 :         //  |                                 (X)
     929           28 :         //  | +-----------+            +--+--+--+--+
     930           28 :         //  | |           |            |  |  |  |  |
     931           28 :         //  | +-----------+            |  |  +--+  |
     932           28 :         //  | |           |            |  |  |  |  |
     933           28 :         //  | +-----------+     ==>    |  |  |  |  |
     934           28 :         //  | |           |            |  |  +--+  |
     935           28 :         //  | +-----------+            |  |  |  |  |
     936           28 :         //  | |           |            |  |  |  |  |
     937           28 :         //  | +-----------+            +--+--+--+--+
     938           28 :         //  |
     939           28 :         //  +--------------> key
     940           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
     941           28 :         // based on the partitioning.
     942           28 :         //
     943           28 :         // TODO: we should also opportunistically materialize and
     944           28 :         // garbage collect what we can.
     945           28 :         let mut new_layers = Vec::new();
     946           28 :         let mut prev_key: Option<Key> = None;
     947           28 :         let mut writer: Option<DeltaLayerWriter> = None;
     948           28 :         let mut key_values_total_size = 0u64;
     949           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     950           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     951           28 :         let mut next_hole = 0; // index of next hole in holes vector
     952              : 
     953      2064066 :         while let Some((key, lsn, value)) = all_values_iter
     954      2064066 :             .next(ctx)
     955        39360 :             .await
     956      2064066 :             .map_err(CompactionError::Other)?
     957              :         {
     958      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     959      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     960      2064038 :             if !same_key || lsn == dup_end_lsn {
     961      2024000 :                 let mut next_key_size = 0u64;
     962      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
     963      2024000 :                 dup_start_lsn = Lsn::INVALID;
     964      2024000 :                 if !same_key {
     965      2024000 :                     dup_end_lsn = Lsn::INVALID;
     966      2024000 :                 }
     967              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     968      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     969      2024000 :                     next_key_size = next_size;
     970      2024000 :                     if key != next_key {
     971      2023972 :                         if dup_end_lsn.is_valid() {
     972            0 :                             // We are writting segment with duplicates:
     973            0 :                             // place all remaining values of this key in separate segment
     974            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     975            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     976      2023972 :                         }
     977      2023972 :                         break;
     978           28 :                     }
     979           28 :                     key_values_total_size += next_size;
     980           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     981           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     982           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     983              :                         // Split key between multiple layers: such layer can contain only single key
     984            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     985            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     986              :                         } else {
     987            0 :                             lsn // start with the first LSN for this key
     988              :                         };
     989            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     990            0 :                         break;
     991           28 :                     }
     992              :                 }
     993              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     994      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     995            0 :                     dup_start_lsn = dup_end_lsn;
     996            0 :                     dup_end_lsn = lsn_range.end;
     997      2024000 :                 }
     998      2024000 :                 if writer.is_some() {
     999      2023972 :                     let written_size = writer.as_mut().unwrap().size();
    1000      2023972 :                     let contains_hole =
    1001      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    1002              :                     // check if key cause layer overflow or contains hole...
    1003      2023972 :                     if is_dup_layer
    1004      2023972 :                         || dup_end_lsn.is_valid()
    1005      2023972 :                         || written_size + key_values_total_size > target_file_size
    1006      2023692 :                         || contains_hole
    1007              :                     {
    1008              :                         // ... if so, flush previous layer and prepare to write new one
    1009          280 :                         new_layers.push(
    1010          280 :                             writer
    1011          280 :                                 .take()
    1012          280 :                                 .unwrap()
    1013          280 :                                 .finish(prev_key.unwrap().next(), self, ctx)
    1014          720 :                                 .await
    1015          280 :                                 .map_err(CompactionError::Other)?,
    1016              :                         );
    1017          280 :                         writer = None;
    1018          280 : 
    1019          280 :                         if contains_hole {
    1020            0 :                             // skip hole
    1021            0 :                             next_hole += 1;
    1022          280 :                         }
    1023      2023692 :                     }
    1024           28 :                 }
    1025              :                 // Remember size of key value because at next iteration we will access next item
    1026      2024000 :                 key_values_total_size = next_key_size;
    1027        40038 :             }
    1028      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1029            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    1030            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    1031            0 :                 )))
    1032      2064038 :             });
    1033              : 
    1034      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
    1035      2064038 :                 if writer.is_none() {
    1036          308 :                     // Create writer if not initiaized yet
    1037          308 :                     writer = Some(
    1038              :                         DeltaLayerWriter::new(
    1039          308 :                             self.conf,
    1040          308 :                             self.timeline_id,
    1041          308 :                             self.tenant_shard_id,
    1042          308 :                             key,
    1043          308 :                             if dup_end_lsn.is_valid() {
    1044              :                                 // this is a layer containing slice of values of the same key
    1045            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    1046            0 :                                 dup_start_lsn..dup_end_lsn
    1047              :                             } else {
    1048          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1049          308 :                                 lsn_range.clone()
    1050              :                             },
    1051          308 :                             ctx,
    1052              :                         )
    1053          154 :                         .await
    1054          308 :                         .map_err(CompactionError::Other)?,
    1055              :                     );
    1056      2063730 :                 }
    1057              : 
    1058      2064038 :                 writer
    1059      2064038 :                     .as_mut()
    1060      2064038 :                     .unwrap()
    1061      2064038 :                     .put_value(key, lsn, value, ctx)
    1062         1217 :                     .await
    1063      2064038 :                     .map_err(CompactionError::Other)?;
    1064              :             } else {
    1065            0 :                 debug!(
    1066            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    1067            0 :                     key,
    1068            0 :                     self.shard_identity.get_shard_number(&key)
    1069              :                 );
    1070              :             }
    1071              : 
    1072      2064038 :             if !new_layers.is_empty() {
    1073        19786 :                 fail_point!("after-timeline-compacted-first-L1");
    1074      2044252 :             }
    1075              : 
    1076      2064038 :             prev_key = Some(key);
    1077              :         }
    1078           28 :         if let Some(writer) = writer {
    1079           28 :             new_layers.push(
    1080           28 :                 writer
    1081           28 :                     .finish(prev_key.unwrap().next(), self, ctx)
    1082         1992 :                     .await
    1083           28 :                     .map_err(CompactionError::Other)?,
    1084              :             );
    1085            0 :         }
    1086              : 
    1087              :         // Sync layers
    1088           28 :         if !new_layers.is_empty() {
    1089              :             // Print a warning if the created layer is larger than double the target size
    1090              :             // Add two pages for potential overhead. This should in theory be already
    1091              :             // accounted for in the target calculation, but for very small targets,
    1092              :             // we still might easily hit the limit otherwise.
    1093           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    1094          308 :             for layer in new_layers.iter() {
    1095          308 :                 if layer.layer_desc().file_size > warn_limit {
    1096            0 :                     warn!(
    1097              :                         %layer,
    1098            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    1099              :                     );
    1100          308 :                 }
    1101              :             }
    1102              : 
    1103              :             // The writer.finish() above already did the fsync of the inodes.
    1104              :             // We just need to fsync the directory in which these inodes are linked,
    1105              :             // which we know to be the timeline directory.
    1106              :             //
    1107              :             // We use fatal_err() below because the after writer.finish() returns with success,
    1108              :             // the in-memory state of the filesystem already has the layer file in its final place,
    1109              :             // and subsequent pageserver code could think it's durable while it really isn't.
    1110           28 :             let timeline_dir = VirtualFile::open(
    1111           28 :                 &self
    1112           28 :                     .conf
    1113           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
    1114           28 :                 ctx,
    1115           28 :             )
    1116           14 :             .await
    1117           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
    1118           28 :             timeline_dir
    1119           28 :                 .sync_all()
    1120           14 :                 .await
    1121           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
    1122            0 :         }
    1123              : 
    1124           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    1125           28 :         stats.new_deltas_count = Some(new_layers.len());
    1126          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    1127           28 : 
    1128           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    1129           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    1130              :         {
    1131           28 :             Ok(stats_json) => {
    1132           28 :                 info!(
    1133            0 :                     stats_json = stats_json.as_str(),
    1134            0 :                     "compact_level0_phase1 stats available"
    1135              :                 )
    1136              :             }
    1137            0 :             Err(e) => {
    1138            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    1139              :             }
    1140              :         }
    1141              : 
    1142              :         // Without this, rustc complains about deltas_to_compact still
    1143              :         // being borrowed when we `.into_iter()` below.
    1144           28 :         drop(all_values_iter);
    1145           28 : 
    1146           28 :         Ok(CompactLevel0Phase1Result {
    1147           28 :             new_layers,
    1148           28 :             deltas_to_compact: deltas_to_compact
    1149           28 :                 .into_iter()
    1150          402 :                 .map(|x| x.drop_eviction_guard())
    1151           28 :                 .collect::<Vec<_>>(),
    1152           28 :             fully_compacted,
    1153           28 :         })
    1154          364 :     }
    1155              : }
    1156              : 
    1157              : #[derive(Default)]
    1158              : struct CompactLevel0Phase1Result {
    1159              :     new_layers: Vec<ResidentLayer>,
    1160              :     deltas_to_compact: Vec<Layer>,
    1161              :     // Whether we have included all L0 layers, or selected only part of them due to the
    1162              :     // L0 compaction size limit.
    1163              :     fully_compacted: bool,
    1164              : }
    1165              : 
    1166              : #[derive(Default)]
    1167              : struct CompactLevel0Phase1StatsBuilder {
    1168              :     version: Option<u64>,
    1169              :     tenant_id: Option<TenantShardId>,
    1170              :     timeline_id: Option<TimelineId>,
    1171              :     read_lock_acquisition_micros: DurationRecorder,
    1172              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    1173              :     read_lock_held_key_sort_micros: DurationRecorder,
    1174              :     read_lock_held_prerequisites_micros: DurationRecorder,
    1175              :     read_lock_held_compute_holes_micros: DurationRecorder,
    1176              :     read_lock_drop_micros: DurationRecorder,
    1177              :     write_layer_files_micros: DurationRecorder,
    1178              :     level0_deltas_count: Option<usize>,
    1179              :     new_deltas_count: Option<usize>,
    1180              :     new_deltas_size: Option<u64>,
    1181              : }
    1182              : 
    1183              : #[derive(serde::Serialize)]
    1184              : struct CompactLevel0Phase1Stats {
    1185              :     version: u64,
    1186              :     tenant_id: TenantShardId,
    1187              :     timeline_id: TimelineId,
    1188              :     read_lock_acquisition_micros: RecordedDuration,
    1189              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    1190              :     read_lock_held_key_sort_micros: RecordedDuration,
    1191              :     read_lock_held_prerequisites_micros: RecordedDuration,
    1192              :     read_lock_held_compute_holes_micros: RecordedDuration,
    1193              :     read_lock_drop_micros: RecordedDuration,
    1194              :     write_layer_files_micros: RecordedDuration,
    1195              :     level0_deltas_count: usize,
    1196              :     new_deltas_count: usize,
    1197              :     new_deltas_size: u64,
    1198              : }
    1199              : 
    1200              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    1201              :     type Error = anyhow::Error;
    1202              : 
    1203           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    1204           28 :         Ok(Self {
    1205           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    1206           28 :             tenant_id: value
    1207           28 :                 .tenant_id
    1208           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    1209           28 :             timeline_id: value
    1210           28 :                 .timeline_id
    1211           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    1212           28 :             read_lock_acquisition_micros: value
    1213           28 :                 .read_lock_acquisition_micros
    1214           28 :                 .into_recorded()
    1215           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    1216           28 :             read_lock_held_spawn_blocking_startup_micros: value
    1217           28 :                 .read_lock_held_spawn_blocking_startup_micros
    1218           28 :                 .into_recorded()
    1219           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    1220           28 :             read_lock_held_key_sort_micros: value
    1221           28 :                 .read_lock_held_key_sort_micros
    1222           28 :                 .into_recorded()
    1223           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    1224           28 :             read_lock_held_prerequisites_micros: value
    1225           28 :                 .read_lock_held_prerequisites_micros
    1226           28 :                 .into_recorded()
    1227           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    1228           28 :             read_lock_held_compute_holes_micros: value
    1229           28 :                 .read_lock_held_compute_holes_micros
    1230           28 :                 .into_recorded()
    1231           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    1232           28 :             read_lock_drop_micros: value
    1233           28 :                 .read_lock_drop_micros
    1234           28 :                 .into_recorded()
    1235           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    1236           28 :             write_layer_files_micros: value
    1237           28 :                 .write_layer_files_micros
    1238           28 :                 .into_recorded()
    1239           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    1240           28 :             level0_deltas_count: value
    1241           28 :                 .level0_deltas_count
    1242           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    1243           28 :             new_deltas_count: value
    1244           28 :                 .new_deltas_count
    1245           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    1246           28 :             new_deltas_size: value
    1247           28 :                 .new_deltas_size
    1248           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    1249              :         })
    1250           28 :     }
    1251              : }
    1252              : 
    1253            0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
    1254              : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
    1255              : pub enum CompactL0Phase1ValueAccess {
    1256              :     /// The old way.
    1257              :     PageCachedBlobIo,
    1258              :     /// The new way.
    1259              :     StreamingKmerge {
    1260              :         /// If set, we run both the old way and the new way, validate that
    1261              :         /// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
    1262              :         /// and if the validation fails,
    1263              :         /// - in tests: fail them with a panic or
    1264              :         /// - in prod, log a rate-limited warning and use the old way's results.
    1265              :         ///
    1266              :         /// If not set, we only run the new way and trust its results.
    1267              :         validate: Option<CompactL0BypassPageCacheValidation>,
    1268              :     },
    1269              : }
    1270              : 
    1271              : /// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
    1272            0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
    1273              : #[serde(rename_all = "kebab-case")]
    1274              : pub enum CompactL0BypassPageCacheValidation {
    1275              :     /// Validate that the series of (key, lsn) pairs are the same.
    1276              :     KeyLsn,
    1277              :     /// Validate that the entire output of old and new way is identical.
    1278              :     KeyLsnValue,
    1279              : }
    1280              : 
    1281              : impl Default for CompactL0Phase1ValueAccess {
    1282          202 :     fn default() -> Self {
    1283          202 :         CompactL0Phase1ValueAccess::StreamingKmerge {
    1284          202 :             // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
    1285          202 :             validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
    1286          202 :         }
    1287          202 :     }
    1288              : }
    1289              : 
    1290              : impl Timeline {
    1291              :     /// Entry point for new tiered compaction algorithm.
    1292              :     ///
    1293              :     /// All the real work is in the implementation in the pageserver_compaction
    1294              :     /// crate. The code here would apply to any algorithm implemented by the
    1295              :     /// same interface, but tiered is the only one at the moment.
    1296              :     ///
    1297              :     /// TODO: cancellation
    1298            0 :     pub(crate) async fn compact_tiered(
    1299            0 :         self: &Arc<Self>,
    1300            0 :         _cancel: &CancellationToken,
    1301            0 :         ctx: &RequestContext,
    1302            0 :     ) -> Result<(), CompactionError> {
    1303            0 :         let fanout = self.get_compaction_threshold() as u64;
    1304            0 :         let target_file_size = self.get_checkpoint_distance();
    1305              : 
    1306              :         // Find the top of the historical layers
    1307            0 :         let end_lsn = {
    1308            0 :             let guard = self.layers.read().await;
    1309            0 :             let layers = guard.layer_map();
    1310            0 : 
    1311            0 :             let l0_deltas = layers.get_level0_deltas();
    1312            0 :             drop(guard);
    1313            0 : 
    1314            0 :             // As an optimization, if we find that there are too few L0 layers,
    1315            0 :             // bail out early. We know that the compaction algorithm would do
    1316            0 :             // nothing in that case.
    1317            0 :             if l0_deltas.len() < fanout as usize {
    1318              :                 // doesn't need compacting
    1319            0 :                 return Ok(());
    1320            0 :             }
    1321            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
    1322            0 :         };
    1323            0 : 
    1324            0 :         // Is the timeline being deleted?
    1325            0 :         if self.is_stopping() {
    1326            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1327            0 :             return Err(CompactionError::ShuttingDown);
    1328            0 :         }
    1329              : 
    1330            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
    1331              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
    1332            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
    1333            0 : 
    1334            0 :         pageserver_compaction::compact_tiered::compact_tiered(
    1335            0 :             &mut adaptor,
    1336            0 :             end_lsn,
    1337            0 :             target_file_size,
    1338            0 :             fanout,
    1339            0 :             ctx,
    1340            0 :         )
    1341            0 :         .await
    1342              :         // TODO: compact_tiered needs to return CompactionError
    1343            0 :         .map_err(CompactionError::Other)?;
    1344              : 
    1345            0 :         adaptor.flush_updates().await?;
    1346            0 :         Ok(())
    1347            0 :     }
    1348              : 
    1349              :     /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
    1350              :     ///
    1351              :     /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
    1352              :     /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
    1353              :     /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
    1354              :     ///
    1355              :     /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
    1356              :     ///
    1357              :     /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
    1358              :     /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
    1359              :     ///
    1360              :     /// The function will produce:
    1361              :     ///
    1362              :     /// ```plain
    1363              :     /// 0x20(retain_lsn) -> img=AB@0x20                  always produce a single image below the lowest retain LSN
    1364              :     /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40]    two deltas since the last base image, keeping the deltas
    1365              :     /// 0x50(horizon)    -> deltas=[ABCDE@0x50]          three deltas since the last base image, generate an image but put it in the delta
    1366              :     /// above_horizon    -> deltas=[+F@0x60]             full history above the horizon
    1367              :     /// ```
    1368              :     ///
    1369              :     /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
    1370          310 :     pub(crate) async fn generate_key_retention(
    1371          310 :         self: &Arc<Timeline>,
    1372          310 :         key: Key,
    1373          310 :         full_history: &[(Key, Lsn, Value)],
    1374          310 :         horizon: Lsn,
    1375          310 :         retain_lsn_below_horizon: &[Lsn],
    1376          310 :         delta_threshold_cnt: usize,
    1377          310 :         base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
    1378          310 :     ) -> anyhow::Result<KeyHistoryRetention> {
    1379          310 :         // Pre-checks for the invariants
    1380          310 :         if cfg!(debug_assertions) {
    1381          750 :             for (log_key, _, _) in full_history {
    1382          440 :                 assert_eq!(log_key, &key, "mismatched key");
    1383              :             }
    1384          310 :             for i in 1..full_history.len() {
    1385          130 :                 assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
    1386          130 :                 if full_history[i - 1].1 == full_history[i].1 {
    1387            0 :                     assert!(
    1388            0 :                         matches!(full_history[i - 1].2, Value::Image(_)),
    1389            0 :                         "unordered delta/image, or duplicated delta"
    1390              :                     );
    1391          130 :                 }
    1392              :             }
    1393              :             // There was an assertion for no base image that checks if the first
    1394              :             // record in the history is `will_init` before, but it was removed.
    1395              :             // This is explained in the test cases for generate_key_retention.
    1396              :             // Search "incomplete history" for more information.
    1397          624 :             for lsn in retain_lsn_below_horizon {
    1398          314 :                 assert!(lsn < &horizon, "retain lsn must be below horizon")
    1399              :             }
    1400          310 :             for i in 1..retain_lsn_below_horizon.len() {
    1401          150 :                 assert!(
    1402          150 :                     retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
    1403            0 :                     "unordered LSN"
    1404              :                 );
    1405              :             }
    1406            0 :         }
    1407          310 :         let has_ancestor = base_img_from_ancestor.is_some();
    1408              :         // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
    1409              :         // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
    1410          310 :         let (mut split_history, lsn_split_points) = {
    1411          310 :             let mut split_history = Vec::new();
    1412          310 :             split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
    1413          310 :             let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
    1414          624 :             for lsn in retain_lsn_below_horizon {
    1415          314 :                 lsn_split_points.push(*lsn);
    1416          314 :             }
    1417          310 :             lsn_split_points.push(horizon);
    1418          310 :             let mut current_idx = 0;
    1419          750 :             for item @ (_, lsn, _) in full_history {
    1420          532 :                 while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
    1421           92 :                     current_idx += 1;
    1422           92 :                 }
    1423          440 :                 split_history[current_idx].push(item);
    1424              :             }
    1425          310 :             (split_history, lsn_split_points)
    1426              :         };
    1427              :         // Step 2: filter out duplicated records due to the k-merge of image/delta layers
    1428         1244 :         for split_for_lsn in &mut split_history {
    1429          934 :             let mut prev_lsn = None;
    1430          934 :             let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
    1431          934 :             for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
    1432          440 :                 if let Some(prev_lsn) = &prev_lsn {
    1433           58 :                     if *prev_lsn == lsn {
    1434              :                         // The case that we have an LSN with both data from the delta layer and the image layer. As
    1435              :                         // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1436              :                         // drop this delta and keep the image.
    1437              :                         //
    1438              :                         // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1439              :                         // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1440              :                         // dropped.
    1441              :                         //
    1442              :                         // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
    1443              :                         // threshold, we could have kept delta instead to save space. This is an optimization for the future.
    1444            0 :                         continue;
    1445           58 :                     }
    1446          382 :                 }
    1447          440 :                 prev_lsn = Some(lsn);
    1448          440 :                 new_split_for_lsn.push(record);
    1449              :             }
    1450          934 :             *split_for_lsn = new_split_for_lsn;
    1451              :         }
    1452              :         // Step 3: generate images when necessary
    1453          310 :         let mut retention = Vec::with_capacity(split_history.len());
    1454          310 :         let mut records_since_last_image = 0;
    1455          310 :         let batch_cnt = split_history.len();
    1456          310 :         assert!(
    1457          310 :             batch_cnt >= 2,
    1458            0 :             "should have at least below + above horizon batches"
    1459              :         );
    1460          310 :         let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
    1461          310 :         if let Some((key, lsn, img)) = base_img_from_ancestor {
    1462           18 :             replay_history.push((key, lsn, Value::Image(img)));
    1463          292 :         }
    1464              : 
    1465              :         /// Generate debug information for the replay history
    1466            0 :         fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
    1467            0 :             use std::fmt::Write;
    1468            0 :             let mut output = String::new();
    1469            0 :             if let Some((key, _, _)) = replay_history.first() {
    1470            0 :                 write!(output, "key={} ", key).unwrap();
    1471            0 :                 let mut cnt = 0;
    1472            0 :                 for (_, lsn, val) in replay_history {
    1473            0 :                     if val.is_image() {
    1474            0 :                         write!(output, "i@{} ", lsn).unwrap();
    1475            0 :                     } else if val.will_init() {
    1476            0 :                         write!(output, "di@{} ", lsn).unwrap();
    1477            0 :                     } else {
    1478            0 :                         write!(output, "d@{} ", lsn).unwrap();
    1479            0 :                     }
    1480            0 :                     cnt += 1;
    1481            0 :                     if cnt >= 128 {
    1482            0 :                         write!(output, "... and more").unwrap();
    1483            0 :                         break;
    1484            0 :                     }
    1485              :                 }
    1486            0 :             } else {
    1487            0 :                 write!(output, "<no history>").unwrap();
    1488            0 :             }
    1489            0 :             output
    1490            0 :         }
    1491              : 
    1492            0 :         fn generate_debug_trace(
    1493            0 :             replay_history: Option<&[(Key, Lsn, Value)]>,
    1494            0 :             full_history: &[(Key, Lsn, Value)],
    1495            0 :             lsns: &[Lsn],
    1496            0 :             horizon: Lsn,
    1497            0 :         ) -> String {
    1498            0 :             use std::fmt::Write;
    1499            0 :             let mut output = String::new();
    1500            0 :             if let Some(replay_history) = replay_history {
    1501            0 :                 writeln!(
    1502            0 :                     output,
    1503            0 :                     "replay_history: {}",
    1504            0 :                     generate_history_trace(replay_history)
    1505            0 :                 )
    1506            0 :                 .unwrap();
    1507            0 :             } else {
    1508            0 :                 writeln!(output, "replay_history: <disabled>",).unwrap();
    1509            0 :             }
    1510            0 :             writeln!(
    1511            0 :                 output,
    1512            0 :                 "full_history: {}",
    1513            0 :                 generate_history_trace(full_history)
    1514            0 :             )
    1515            0 :             .unwrap();
    1516            0 :             writeln!(
    1517            0 :                 output,
    1518            0 :                 "when processing: [{}] horizon={}",
    1519            0 :                 lsns.iter().map(|l| format!("{l}")).join(","),
    1520            0 :                 horizon
    1521            0 :             )
    1522            0 :             .unwrap();
    1523            0 :             output
    1524            0 :         }
    1525              : 
    1526          934 :         for (i, split_for_lsn) in split_history.into_iter().enumerate() {
    1527              :             // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
    1528          934 :             records_since_last_image += split_for_lsn.len();
    1529          934 :             let generate_image = if i == 0 && !has_ancestor {
    1530              :                 // We always generate images for the first batch (below horizon / lowest retain_lsn)
    1531          292 :                 true
    1532          642 :             } else if i == batch_cnt - 1 {
    1533              :                 // Do not generate images for the last batch (above horizon)
    1534          310 :                 false
    1535          332 :             } else if records_since_last_image >= delta_threshold_cnt {
    1536              :                 // Generate images when there are too many records
    1537            6 :                 true
    1538              :             } else {
    1539          326 :                 false
    1540              :             };
    1541          934 :             replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
    1542              :             // Only retain the items after the last image record
    1543         1170 :             for idx in (0..replay_history.len()).rev() {
    1544         1170 :                 if replay_history[idx].2.will_init() {
    1545          934 :                     replay_history = replay_history[idx..].to_vec();
    1546          934 :                     break;
    1547          236 :                 }
    1548              :             }
    1549          934 :             if let Some((_, _, val)) = replay_history.first() {
    1550          934 :                 if !val.will_init() {
    1551            0 :                     return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
    1552            0 :                         || {
    1553            0 :                             generate_debug_trace(
    1554            0 :                                 Some(&replay_history),
    1555            0 :                                 full_history,
    1556            0 :                                 retain_lsn_below_horizon,
    1557            0 :                                 horizon,
    1558            0 :                             )
    1559            0 :                         },
    1560            0 :                     );
    1561          934 :                 }
    1562            0 :             }
    1563          934 :             if generate_image && records_since_last_image > 0 {
    1564          298 :                 records_since_last_image = 0;
    1565          298 :                 let replay_history_for_debug = if cfg!(debug_assertions) {
    1566          298 :                     Some(replay_history.clone())
    1567              :                 } else {
    1568            0 :                     None
    1569              :                 };
    1570          298 :                 let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
    1571          298 :                 let history = std::mem::take(&mut replay_history);
    1572          298 :                 let mut img = None;
    1573          298 :                 let mut records = Vec::with_capacity(history.len());
    1574          298 :                 if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
    1575          298 :                     img = Some((*lsn, val.clone()));
    1576          298 :                     for (_, lsn, val) in history.into_iter().skip(1) {
    1577           34 :                         let Value::WalRecord(rec) = val else {
    1578            0 :                             return Err(anyhow::anyhow!(
    1579            0 :                                 "invalid record, first record is image, expect walrecords"
    1580            0 :                             ))
    1581            0 :                             .with_context(|| {
    1582            0 :                                 generate_debug_trace(
    1583            0 :                                     replay_history_for_debug_ref,
    1584            0 :                                     full_history,
    1585            0 :                                     retain_lsn_below_horizon,
    1586            0 :                                     horizon,
    1587            0 :                                 )
    1588            0 :                             });
    1589              :                         };
    1590           34 :                         records.push((lsn, rec));
    1591              :                     }
    1592              :                 } else {
    1593            0 :                     for (_, lsn, val) in history.into_iter() {
    1594            0 :                         let Value::WalRecord(rec) = val else {
    1595            0 :                             return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
    1596            0 :                                 .with_context(|| generate_debug_trace(
    1597            0 :                                     replay_history_for_debug_ref,
    1598            0 :                                     full_history,
    1599            0 :                                     retain_lsn_below_horizon,
    1600            0 :                                     horizon,
    1601            0 :                                 ));
    1602              :                         };
    1603            0 :                         records.push((lsn, rec));
    1604              :                     }
    1605              :                 }
    1606          298 :                 records.reverse();
    1607          298 :                 let state = ValueReconstructState { img, records };
    1608          298 :                 let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
    1609          298 :                 let img = self.reconstruct_value(key, request_lsn, state).await?;
    1610          298 :                 replay_history.push((key, request_lsn, Value::Image(img.clone())));
    1611          298 :                 retention.push(vec![(request_lsn, Value::Image(img))]);
    1612          636 :             } else {
    1613          636 :                 let deltas = split_for_lsn
    1614          636 :                     .iter()
    1615          636 :                     .map(|(_, lsn, value)| (*lsn, value.clone()))
    1616          636 :                     .collect_vec();
    1617          636 :                 retention.push(deltas);
    1618          636 :             }
    1619              :         }
    1620          310 :         let mut result = Vec::with_capacity(retention.len());
    1621          310 :         assert_eq!(retention.len(), lsn_split_points.len() + 1);
    1622          934 :         for (idx, logs) in retention.into_iter().enumerate() {
    1623          934 :             if idx == lsn_split_points.len() {
    1624          310 :                 return Ok(KeyHistoryRetention {
    1625          310 :                     below_horizon: result,
    1626          310 :                     above_horizon: KeyLogAtLsn(logs),
    1627          310 :                 });
    1628          624 :             } else {
    1629          624 :                 result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
    1630          624 :             }
    1631              :         }
    1632            0 :         unreachable!("key retention is empty")
    1633          310 :     }
    1634              : 
    1635              :     /// An experimental compaction building block that combines compaction with garbage collection.
    1636              :     ///
    1637              :     /// The current implementation picks all delta + image layers that are below or intersecting with
    1638              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
    1639              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
    1640              :     /// and create delta layers with all deltas >= gc horizon.
    1641           18 :     pub(crate) async fn compact_with_gc(
    1642           18 :         self: &Arc<Self>,
    1643           18 :         cancel: &CancellationToken,
    1644           18 :         ctx: &RequestContext,
    1645           18 :     ) -> anyhow::Result<()> {
    1646           18 :         use std::collections::BTreeSet;
    1647           18 : 
    1648           18 :         // Block other compaction/GC tasks from running for now. GC-compaction could run along
    1649           18 :         // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
    1650           18 :         // Note that we already acquired the compaction lock when the outer `compact` function gets called.
    1651           18 : 
    1652           18 :         let gc_lock = async {
    1653              :             tokio::select! {
    1654              :                 guard = self.gc_lock.lock() => Ok(guard),
    1655              :                 // TODO: refactor to CompactionError to correctly pass cancelled error
    1656              :                 _ = cancel.cancelled() => Err(anyhow!("cancelled")),
    1657              :             }
    1658           18 :         };
    1659              : 
    1660           18 :         let gc_lock = crate::timed(
    1661           18 :             gc_lock,
    1662           18 :             "acquires gc lock",
    1663           18 :             std::time::Duration::from_secs(5),
    1664           18 :         )
    1665            0 :         .await?;
    1666              : 
    1667           18 :         info!("running enhanced gc bottom-most compaction");
    1668              : 
    1669              :         scopeguard::defer! {
    1670              :             info!("done enhanced gc bottom-most compaction");
    1671              :         };
    1672              : 
    1673              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1674              :         // The layer selection has the following properties:
    1675              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1676              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1677           18 :         let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
    1678           18 :             let guard = self.layers.read().await;
    1679           18 :             let layers = guard.layer_map();
    1680           18 :             let gc_info = self.gc_info.read().unwrap();
    1681           18 :             let mut retain_lsns_below_horizon = Vec::new();
    1682           18 :             let gc_cutoff = gc_info.cutoffs.select_min();
    1683           18 :             for (lsn, _timeline_id) in &gc_info.retain_lsns {
    1684           18 :                 if lsn < &gc_cutoff {
    1685           18 :                     retain_lsns_below_horizon.push(*lsn);
    1686           18 :                 }
    1687              :             }
    1688           18 :             for lsn in gc_info.leases.keys() {
    1689            0 :                 if lsn < &gc_cutoff {
    1690            0 :                     retain_lsns_below_horizon.push(*lsn);
    1691            0 :                 }
    1692              :             }
    1693           18 :             let mut selected_layers = Vec::new();
    1694           18 :             drop(gc_info);
    1695           88 :             for desc in layers.iter_historic_layers() {
    1696           88 :                 if desc.get_lsn_range().start <= gc_cutoff {
    1697           72 :                     selected_layers.push(guard.get_from_desc(&desc));
    1698           72 :                 }
    1699              :             }
    1700           18 :             retain_lsns_below_horizon.sort();
    1701           18 :             (selected_layers, gc_cutoff, retain_lsns_below_horizon)
    1702              :         };
    1703           18 :         let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
    1704            2 :             Lsn(self.ancestor_lsn.0 + 1)
    1705              :         } else {
    1706           16 :             let res = retain_lsns_below_horizon
    1707           16 :                 .first()
    1708           16 :                 .copied()
    1709           16 :                 .unwrap_or(gc_cutoff);
    1710           16 :             if cfg!(debug_assertions) {
    1711           16 :                 assert_eq!(
    1712           16 :                     res,
    1713           16 :                     retain_lsns_below_horizon
    1714           16 :                         .iter()
    1715           16 :                         .min()
    1716           16 :                         .copied()
    1717           16 :                         .unwrap_or(gc_cutoff)
    1718           16 :                 );
    1719            0 :             }
    1720           16 :             res
    1721              :         };
    1722           18 :         info!(
    1723            0 :             "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
    1724            0 :             layer_selection.len(),
    1725              :             gc_cutoff,
    1726              :             lowest_retain_lsn
    1727              :         );
    1728              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1729              :         // Also, collect the layer information to decide when to split the new delta layers.
    1730           18 :         let mut downloaded_layers = Vec::new();
    1731           18 :         let mut delta_split_points = BTreeSet::new();
    1732           90 :         for layer in &layer_selection {
    1733           72 :             let resident_layer = layer.download_and_keep_resident().await?;
    1734           72 :             downloaded_layers.push(resident_layer);
    1735           72 : 
    1736           72 :             let desc = layer.layer_desc();
    1737           72 :             if desc.is_delta() {
    1738           50 :                 // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
    1739           50 :                 // so that we can avoid having too many small delta layers.
    1740           50 :                 let key_range = desc.get_key_range();
    1741           50 :                 delta_split_points.insert(key_range.start);
    1742           50 :                 delta_split_points.insert(key_range.end);
    1743           50 :             }
    1744              :         }
    1745           18 :         let mut delta_layers = Vec::new();
    1746           18 :         let mut image_layers = Vec::new();
    1747           90 :         for resident_layer in &downloaded_layers {
    1748           72 :             if resident_layer.layer_desc().is_delta() {
    1749           50 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    1750           50 :                 delta_layers.push(layer);
    1751           22 :             } else {
    1752           22 :                 let layer = resident_layer.get_as_image(ctx).await?;
    1753           22 :                 image_layers.push(layer);
    1754              :             }
    1755              :         }
    1756           18 :         let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
    1757           18 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1758           18 :         // Data of the same key.
    1759           18 :         let mut accumulated_values = Vec::new();
    1760           18 :         let mut last_key: Option<Key> = None;
    1761           18 : 
    1762           18 :         enum FlushDeltaResult {
    1763           18 :             /// Create a new resident layer
    1764           18 :             CreateResidentLayer(ResidentLayer),
    1765           18 :             /// Keep an original delta layer
    1766           18 :             KeepLayer(PersistentLayerKey),
    1767           18 :         }
    1768           18 : 
    1769           18 :         #[allow(clippy::too_many_arguments)]
    1770          302 :         async fn flush_deltas(
    1771          302 :             deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
    1772          302 :             last_key: Key,
    1773          302 :             delta_split_points: &[Key],
    1774          302 :             current_delta_split_point: &mut usize,
    1775          302 :             tline: &Arc<Timeline>,
    1776          302 :             lowest_retain_lsn: Lsn,
    1777          302 :             ctx: &RequestContext,
    1778          302 :             last_batch: bool,
    1779          302 :         ) -> anyhow::Result<Option<FlushDeltaResult>> {
    1780          302 :             // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
    1781          302 :             // overlapping layers.
    1782          302 :             //
    1783          302 :             // If we have a structure like this:
    1784          302 :             //
    1785          302 :             // | Delta 1 |         | Delta 4 |
    1786          302 :             // |---------| Delta 2 |---------|
    1787          302 :             // | Delta 3 |         | Delta 5 |
    1788          302 :             //
    1789          302 :             // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
    1790          302 :             // A simple solution here is to split the delta layers using the original boundary, while this
    1791          302 :             // might produce a lot of small layers. This should be improved and fixed in the future.
    1792          302 :             let mut need_split = false;
    1793          384 :             while *current_delta_split_point < delta_split_points.len()
    1794          324 :                 && last_key >= delta_split_points[*current_delta_split_point]
    1795           82 :             {
    1796           82 :                 *current_delta_split_point += 1;
    1797           82 :                 need_split = true;
    1798           82 :             }
    1799          302 :             if !need_split && !last_batch {
    1800          206 :                 return Ok(None);
    1801           96 :             }
    1802           96 :             let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
    1803           96 :             if deltas.is_empty() {
    1804           46 :                 return Ok(None);
    1805           50 :             }
    1806           78 :             let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
    1807           50 :             let delta_key = PersistentLayerKey {
    1808           50 :                 key_range: {
    1809           50 :                     let key_start = deltas.first().unwrap().0;
    1810           50 :                     let key_end = deltas.last().unwrap().0.next();
    1811           50 :                     key_start..key_end
    1812           50 :                 },
    1813           50 :                 lsn_range: lowest_retain_lsn..end_lsn,
    1814           50 :                 is_delta: true,
    1815           50 :             };
    1816           18 :             {
    1817           18 :                 // Hack: skip delta layer if we need to produce a layer of a same key-lsn.
    1818           18 :                 //
    1819           18 :                 // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
    1820           18 :                 // For example, consider the case where a single delta with range [0x10,0x50) exists.
    1821           18 :                 // And we have branches at LSN 0x10, 0x20, 0x30.
    1822           18 :                 // Then we delete branch @ 0x20.
    1823           18 :                 // Bottom-most compaction may now delete the delta [0x20,0x30).
    1824           18 :                 // And that wouldnt' change the shape of the layer.
    1825           18 :                 //
    1826           18 :                 // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
    1827           18 :                 // That's why it's safe to skip.
    1828           50 :                 let guard = tline.layers.read().await;
    1829           18 : 
    1830           50 :                 if guard.contains_key(&delta_key) {
    1831           26 :                     let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
    1832           26 :                     drop(guard);
    1833           26 :                     if layer_generation == tline.generation {
    1834           18 :                         // TODO: depending on whether we design this compaction process to run along with
    1835           18 :                         // other compactions, there could be layer map modifications after we drop the
    1836           18 :                         // layer guard, and in case it creates duplicated layer key, we will still error
    1837           18 :                         // in the end.
    1838           26 :                         info!(
    1839           18 :                             key=%delta_key,
    1840           18 :                             ?layer_generation,
    1841           18 :                             "discard delta layer due to duplicated layer in the same generation"
    1842           18 :                         );
    1843           26 :                         return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
    1844           18 :                     }
    1845           24 :                 }
    1846           18 :             }
    1847           18 : 
    1848           24 :             let mut delta_layer_writer = DeltaLayerWriter::new(
    1849           24 :                 tline.conf,
    1850           24 :                 tline.timeline_id,
    1851           24 :                 tline.tenant_shard_id,
    1852           24 :                 delta_key.key_range.start,
    1853           24 :                 lowest_retain_lsn..end_lsn,
    1854           24 :                 ctx,
    1855           24 :             )
    1856           18 :             .await?;
    1857           68 :             for (key, lsn, val) in deltas {
    1858           44 :                 delta_layer_writer.put_value(key, lsn, val, ctx).await?;
    1859           18 :             }
    1860           24 :             let delta_layer = delta_layer_writer
    1861           24 :                 .finish(delta_key.key_range.end, tline, ctx)
    1862           62 :                 .await?;
    1863           24 :             Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
    1864          302 :         }
    1865           18 : 
    1866           18 :         // Hack the key range to be min..(max-1). Otherwise, the image layer will be
    1867           18 :         // interpreted as an L0 delta layer.
    1868           18 :         let hack_image_layer_range = {
    1869           18 :             let mut end_key = Key::MAX;
    1870           18 :             end_key.field6 -= 1;
    1871           18 :             Key::MIN..end_key
    1872              :         };
    1873              : 
    1874              :         // Only create image layers when there is no ancestor branches. TODO: create covering image layer
    1875              :         // when some condition meet.
    1876           18 :         let mut image_layer_writer = if self.ancestor_timeline.is_none() {
    1877              :             Some(
    1878           16 :                 ImageLayerWriter::new(
    1879           16 :                     self.conf,
    1880           16 :                     self.timeline_id,
    1881           16 :                     self.tenant_shard_id,
    1882           16 :                     &hack_image_layer_range, // covers the full key range
    1883           16 :                     lowest_retain_lsn,
    1884           16 :                     ctx,
    1885           16 :                 )
    1886            8 :                 .await?,
    1887              :             )
    1888              :         } else {
    1889            2 :             None
    1890              :         };
    1891              : 
    1892              :         /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
    1893              :         ///
    1894              :         /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
    1895              :         /// is needed for reconstruction. This should be fixed in the future.
    1896              :         ///
    1897              :         /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
    1898              :         /// images.
    1899          302 :         async fn get_ancestor_image(
    1900          302 :             tline: &Arc<Timeline>,
    1901          302 :             key: Key,
    1902          302 :             ctx: &RequestContext,
    1903          302 :         ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
    1904          302 :             if tline.ancestor_timeline.is_none() {
    1905          288 :                 return Ok(None);
    1906           14 :             };
    1907              :             // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
    1908              :             // as much existing code as possible.
    1909           14 :             let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
    1910           14 :             Ok(Some((key, tline.ancestor_lsn, img)))
    1911          302 :         }
    1912           18 :         let image_layer_key = PersistentLayerKey {
    1913           18 :             key_range: hack_image_layer_range,
    1914           18 :             lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
    1915           18 :             is_delta: false,
    1916           18 :         };
    1917              : 
    1918              :         // Like with delta layers, it can happen that we re-produce an already existing image layer.
    1919              :         // This could happen when a user triggers force compaction and image generation. In this case,
    1920              :         // it's always safe to rewrite the layer.
    1921           18 :         let discard_image_layer = {
    1922           18 :             let guard = self.layers.read().await;
    1923           18 :             if guard.contains_key(&image_layer_key) {
    1924            6 :                 let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
    1925            6 :                 drop(guard);
    1926            6 :                 if layer_generation == self.generation {
    1927              :                     // TODO: depending on whether we design this compaction process to run along with
    1928              :                     // other compactions, there could be layer map modifications after we drop the
    1929              :                     // layer guard, and in case it creates duplicated layer key, we will still error
    1930              :                     // in the end.
    1931            6 :                     info!(
    1932              :                         key=%image_layer_key,
    1933              :                         ?layer_generation,
    1934            0 :                         "discard image layer due to duplicated layer key in the same generation",
    1935              :                     );
    1936            6 :                     true
    1937              :                 } else {
    1938            0 :                     false
    1939              :                 }
    1940              :             } else {
    1941           12 :                 false
    1942              :             }
    1943              :         };
    1944              : 
    1945              :         // Actually, we can decide not to write to the image layer at all at this point because
    1946              :         // the key and LSN range are determined. However, to keep things simple here, we still
    1947              :         // create this writer, and discard the writer in the end.
    1948              : 
    1949           18 :         let mut delta_values = Vec::new();
    1950           18 :         let delta_split_points = delta_split_points.into_iter().collect_vec();
    1951           18 :         let mut current_delta_split_point = 0;
    1952           18 :         let mut delta_layers = Vec::new();
    1953          408 :         while let Some((key, lsn, val)) = merge_iter.next().await? {
    1954          390 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    1955          106 :                 if last_key.is_none() {
    1956           18 :                     last_key = Some(key);
    1957           88 :                 }
    1958          106 :                 accumulated_values.push((key, lsn, val));
    1959              :             } else {
    1960          284 :                 let last_key = last_key.as_mut().unwrap();
    1961          284 :                 let retention = self
    1962          284 :                     .generate_key_retention(
    1963          284 :                         *last_key,
    1964          284 :                         &accumulated_values,
    1965          284 :                         gc_cutoff,
    1966          284 :                         &retain_lsns_below_horizon,
    1967          284 :                         COMPACTION_DELTA_THRESHOLD,
    1968          284 :                         get_ancestor_image(self, *last_key, ctx).await?,
    1969              :                     )
    1970            0 :                     .await?;
    1971              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1972          284 :                 retention
    1973          284 :                     .pipe_to(
    1974          284 :                         *last_key,
    1975          284 :                         &mut delta_values,
    1976          284 :                         image_layer_writer.as_mut(),
    1977          284 :                         ctx,
    1978          284 :                     )
    1979          275 :                     .await?;
    1980          284 :                 delta_layers.extend(
    1981          284 :                     flush_deltas(
    1982          284 :                         &mut delta_values,
    1983          284 :                         *last_key,
    1984          284 :                         &delta_split_points,
    1985          284 :                         &mut current_delta_split_point,
    1986          284 :                         self,
    1987          284 :                         lowest_retain_lsn,
    1988          284 :                         ctx,
    1989          284 :                         false,
    1990          284 :                     )
    1991           68 :                     .await?,
    1992              :                 );
    1993          284 :                 accumulated_values.clear();
    1994          284 :                 *last_key = key;
    1995          284 :                 accumulated_values.push((key, lsn, val));
    1996              :             }
    1997              :         }
    1998              : 
    1999           18 :         let last_key = last_key.expect("no keys produced during compaction");
    2000              :         // TODO: move this part to the loop body
    2001           18 :         let retention = self
    2002           18 :             .generate_key_retention(
    2003           18 :                 last_key,
    2004           18 :                 &accumulated_values,
    2005           18 :                 gc_cutoff,
    2006           18 :                 &retain_lsns_below_horizon,
    2007           18 :                 COMPACTION_DELTA_THRESHOLD,
    2008           18 :                 get_ancestor_image(self, last_key, ctx).await?,
    2009              :             )
    2010            0 :             .await?;
    2011              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    2012           18 :         retention
    2013           18 :             .pipe_to(
    2014           18 :                 last_key,
    2015           18 :                 &mut delta_values,
    2016           18 :                 image_layer_writer.as_mut(),
    2017           18 :                 ctx,
    2018           18 :             )
    2019           16 :             .await?;
    2020           18 :         delta_layers.extend(
    2021           18 :             flush_deltas(
    2022           18 :                 &mut delta_values,
    2023           18 :                 last_key,
    2024           18 :                 &delta_split_points,
    2025           18 :                 &mut current_delta_split_point,
    2026           18 :                 self,
    2027           18 :                 lowest_retain_lsn,
    2028           18 :                 ctx,
    2029           18 :                 true,
    2030           18 :             )
    2031            7 :             .await?,
    2032              :         );
    2033           18 :         assert!(delta_values.is_empty(), "unprocessed keys");
    2034              : 
    2035           18 :         let image_layer = if discard_image_layer {
    2036            6 :             None
    2037           12 :         } else if let Some(writer) = image_layer_writer {
    2038           20 :             Some(writer.finish(self, ctx).await?)
    2039              :         } else {
    2040            2 :             None
    2041              :         };
    2042           18 :         info!(
    2043            0 :             "produced {} delta layers and {} image layers",
    2044            0 :             delta_layers.len(),
    2045            0 :             if image_layer.is_some() { 1 } else { 0 }
    2046              :         );
    2047           18 :         let mut compact_to = Vec::new();
    2048           18 :         let mut keep_layers = HashSet::new();
    2049           68 :         for action in delta_layers {
    2050           50 :             match action {
    2051           24 :                 FlushDeltaResult::CreateResidentLayer(layer) => {
    2052           24 :                     compact_to.push(layer);
    2053           24 :                 }
    2054           26 :                 FlushDeltaResult::KeepLayer(l) => {
    2055           26 :                     keep_layers.insert(l);
    2056           26 :                 }
    2057              :             }
    2058              :         }
    2059           18 :         if discard_image_layer {
    2060            6 :             keep_layers.insert(image_layer_key);
    2061           12 :         }
    2062           18 :         let mut layer_selection = layer_selection;
    2063           72 :         layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
    2064           18 :         compact_to.extend(image_layer);
    2065              :         // Step 3: Place back to the layer map.
    2066              :         {
    2067           18 :             let mut guard = self.layers.write().await;
    2068           18 :             guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    2069           18 :         };
    2070           18 :         self.remote_client
    2071           18 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    2072              : 
    2073           18 :         drop(gc_lock);
    2074           18 : 
    2075           18 :         Ok(())
    2076           18 :     }
    2077              : }
    2078              : 
    2079              : struct TimelineAdaptor {
    2080              :     timeline: Arc<Timeline>,
    2081              : 
    2082              :     keyspace: (Lsn, KeySpace),
    2083              : 
    2084              :     new_deltas: Vec<ResidentLayer>,
    2085              :     new_images: Vec<ResidentLayer>,
    2086              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    2087              : }
    2088              : 
    2089              : impl TimelineAdaptor {
    2090            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    2091            0 :         Self {
    2092            0 :             timeline: timeline.clone(),
    2093            0 :             keyspace,
    2094            0 :             new_images: Vec::new(),
    2095            0 :             new_deltas: Vec::new(),
    2096            0 :             layers_to_delete: Vec::new(),
    2097            0 :         }
    2098            0 :     }
    2099              : 
    2100            0 :     pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
    2101            0 :         let layers_to_delete = {
    2102            0 :             let guard = self.timeline.layers.read().await;
    2103            0 :             self.layers_to_delete
    2104            0 :                 .iter()
    2105            0 :                 .map(|x| guard.get_from_desc(x))
    2106            0 :                 .collect::<Vec<Layer>>()
    2107            0 :         };
    2108            0 :         self.timeline
    2109            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    2110            0 :             .await?;
    2111              : 
    2112            0 :         self.timeline
    2113            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    2114              : 
    2115            0 :         self.new_deltas.clear();
    2116            0 :         self.layers_to_delete.clear();
    2117            0 :         Ok(())
    2118            0 :     }
    2119              : }
    2120              : 
    2121              : #[derive(Clone)]
    2122              : struct ResidentDeltaLayer(ResidentLayer);
    2123              : #[derive(Clone)]
    2124              : struct ResidentImageLayer(ResidentLayer);
    2125              : 
    2126              : impl CompactionJobExecutor for TimelineAdaptor {
    2127              :     type Key = crate::repository::Key;
    2128              : 
    2129              :     type Layer = OwnArc<PersistentLayerDesc>;
    2130              :     type DeltaLayer = ResidentDeltaLayer;
    2131              :     type ImageLayer = ResidentImageLayer;
    2132              : 
    2133              :     type RequestContext = crate::context::RequestContext;
    2134              : 
    2135            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    2136            0 :         self.timeline.get_shard_identity()
    2137            0 :     }
    2138              : 
    2139            0 :     async fn get_layers(
    2140            0 :         &mut self,
    2141            0 :         key_range: &Range<Key>,
    2142            0 :         lsn_range: &Range<Lsn>,
    2143            0 :         _ctx: &RequestContext,
    2144            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    2145            0 :         self.flush_updates().await?;
    2146              : 
    2147            0 :         let guard = self.timeline.layers.read().await;
    2148            0 :         let layer_map = guard.layer_map();
    2149            0 : 
    2150            0 :         let result = layer_map
    2151            0 :             .iter_historic_layers()
    2152            0 :             .filter(|l| {
    2153            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    2154            0 :             })
    2155            0 :             .map(OwnArc)
    2156            0 :             .collect();
    2157            0 :         Ok(result)
    2158            0 :     }
    2159              : 
    2160            0 :     async fn get_keyspace(
    2161            0 :         &mut self,
    2162            0 :         key_range: &Range<Key>,
    2163            0 :         lsn: Lsn,
    2164            0 :         _ctx: &RequestContext,
    2165            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    2166            0 :         if lsn == self.keyspace.0 {
    2167            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    2168            0 :                 &self.keyspace.1.ranges,
    2169            0 :                 key_range,
    2170            0 :             ))
    2171              :         } else {
    2172              :             // The current compaction implementatin only ever requests the key space
    2173              :             // at the compaction end LSN.
    2174            0 :             anyhow::bail!("keyspace not available for requested lsn");
    2175              :         }
    2176            0 :     }
    2177              : 
    2178            0 :     async fn downcast_delta_layer(
    2179            0 :         &self,
    2180            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2181            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    2182            0 :         // this is a lot more complex than a simple downcast...
    2183            0 :         if layer.is_delta() {
    2184            0 :             let l = {
    2185            0 :                 let guard = self.timeline.layers.read().await;
    2186            0 :                 guard.get_from_desc(layer)
    2187              :             };
    2188            0 :             let result = l.download_and_keep_resident().await?;
    2189              : 
    2190            0 :             Ok(Some(ResidentDeltaLayer(result)))
    2191              :         } else {
    2192            0 :             Ok(None)
    2193              :         }
    2194            0 :     }
    2195              : 
    2196            0 :     async fn create_image(
    2197            0 :         &mut self,
    2198            0 :         lsn: Lsn,
    2199            0 :         key_range: &Range<Key>,
    2200            0 :         ctx: &RequestContext,
    2201            0 :     ) -> anyhow::Result<()> {
    2202            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    2203            0 :     }
    2204              : 
    2205            0 :     async fn create_delta(
    2206            0 :         &mut self,
    2207            0 :         lsn_range: &Range<Lsn>,
    2208            0 :         key_range: &Range<Key>,
    2209            0 :         input_layers: &[ResidentDeltaLayer],
    2210            0 :         ctx: &RequestContext,
    2211            0 :     ) -> anyhow::Result<()> {
    2212            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    2213              : 
    2214            0 :         let mut all_entries = Vec::new();
    2215            0 :         for dl in input_layers.iter() {
    2216            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    2217              :         }
    2218              : 
    2219              :         // The current stdlib sorting implementation is designed in a way where it is
    2220              :         // particularly fast where the slice is made up of sorted sub-ranges.
    2221            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    2222              : 
    2223            0 :         let mut writer = DeltaLayerWriter::new(
    2224            0 :             self.timeline.conf,
    2225            0 :             self.timeline.timeline_id,
    2226            0 :             self.timeline.tenant_shard_id,
    2227            0 :             key_range.start,
    2228            0 :             lsn_range.clone(),
    2229            0 :             ctx,
    2230            0 :         )
    2231            0 :         .await?;
    2232              : 
    2233            0 :         let mut dup_values = 0;
    2234            0 : 
    2235            0 :         // This iterator walks through all key-value pairs from all the layers
    2236            0 :         // we're compacting, in key, LSN order.
    2237            0 :         let mut prev: Option<(Key, Lsn)> = None;
    2238              :         for &DeltaEntry {
    2239            0 :             key, lsn, ref val, ..
    2240            0 :         } in all_entries.iter()
    2241              :         {
    2242            0 :             if prev == Some((key, lsn)) {
    2243              :                 // This is a duplicate. Skip it.
    2244              :                 //
    2245              :                 // It can happen if compaction is interrupted after writing some
    2246              :                 // layers but not all, and we are compacting the range again.
    2247              :                 // The calculations in the algorithm assume that there are no
    2248              :                 // duplicates, so the math on targeted file size is likely off,
    2249              :                 // and we will create smaller files than expected.
    2250            0 :                 dup_values += 1;
    2251            0 :                 continue;
    2252            0 :             }
    2253              : 
    2254            0 :             let value = val.load(ctx).await?;
    2255              : 
    2256            0 :             writer.put_value(key, lsn, value, ctx).await?;
    2257              : 
    2258            0 :             prev = Some((key, lsn));
    2259              :         }
    2260              : 
    2261            0 :         if dup_values > 0 {
    2262            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    2263            0 :         }
    2264              : 
    2265            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    2266            0 :             Err(anyhow::anyhow!(
    2267            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    2268            0 :             ))
    2269            0 :         });
    2270              : 
    2271            0 :         let new_delta_layer = writer
    2272            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    2273            0 :             .await?;
    2274              : 
    2275            0 :         self.new_deltas.push(new_delta_layer);
    2276            0 :         Ok(())
    2277            0 :     }
    2278              : 
    2279            0 :     async fn delete_layer(
    2280            0 :         &mut self,
    2281            0 :         layer: &OwnArc<PersistentLayerDesc>,
    2282            0 :         _ctx: &RequestContext,
    2283            0 :     ) -> anyhow::Result<()> {
    2284            0 :         self.layers_to_delete.push(layer.clone().0);
    2285            0 :         Ok(())
    2286            0 :     }
    2287              : }
    2288              : 
    2289              : impl TimelineAdaptor {
    2290            0 :     async fn create_image_impl(
    2291            0 :         &mut self,
    2292            0 :         lsn: Lsn,
    2293            0 :         key_range: &Range<Key>,
    2294            0 :         ctx: &RequestContext,
    2295            0 :     ) -> Result<(), CreateImageLayersError> {
    2296            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    2297              : 
    2298            0 :         let image_layer_writer = ImageLayerWriter::new(
    2299            0 :             self.timeline.conf,
    2300            0 :             self.timeline.timeline_id,
    2301            0 :             self.timeline.tenant_shard_id,
    2302            0 :             key_range,
    2303            0 :             lsn,
    2304            0 :             ctx,
    2305            0 :         )
    2306            0 :         .await?;
    2307              : 
    2308            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    2309            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    2310            0 :                 "failpoint image-layer-writer-fail-before-finish"
    2311            0 :             )))
    2312            0 :         });
    2313              : 
    2314            0 :         let keyspace = KeySpace {
    2315            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    2316              :         };
    2317              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    2318            0 :         let start = Key::MIN;
    2319              :         let ImageLayerCreationOutcome {
    2320            0 :             image,
    2321              :             next_start_key: _,
    2322            0 :         } = self
    2323            0 :             .timeline
    2324            0 :             .create_image_layer_for_rel_blocks(
    2325            0 :                 &keyspace,
    2326            0 :                 image_layer_writer,
    2327            0 :                 lsn,
    2328            0 :                 ctx,
    2329            0 :                 key_range.clone(),
    2330            0 :                 start,
    2331            0 :             )
    2332            0 :             .await?;
    2333              : 
    2334            0 :         if let Some(image_layer) = image {
    2335            0 :             self.new_images.push(image_layer);
    2336            0 :         }
    2337              : 
    2338            0 :         timer.stop_and_record();
    2339            0 : 
    2340            0 :         Ok(())
    2341            0 :     }
    2342              : }
    2343              : 
    2344              : impl CompactionRequestContext for crate::context::RequestContext {}
    2345              : 
    2346              : #[derive(Debug, Clone)]
    2347              : pub struct OwnArc<T>(pub Arc<T>);
    2348              : 
    2349              : impl<T> Deref for OwnArc<T> {
    2350              :     type Target = <Arc<T> as Deref>::Target;
    2351            0 :     fn deref(&self) -> &Self::Target {
    2352            0 :         &self.0
    2353            0 :     }
    2354              : }
    2355              : 
    2356              : impl<T> AsRef<T> for OwnArc<T> {
    2357            0 :     fn as_ref(&self) -> &T {
    2358            0 :         self.0.as_ref()
    2359            0 :     }
    2360              : }
    2361              : 
    2362              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    2363            0 :     fn key_range(&self) -> &Range<Key> {
    2364            0 :         &self.key_range
    2365            0 :     }
    2366            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2367            0 :         &self.lsn_range
    2368            0 :     }
    2369            0 :     fn file_size(&self) -> u64 {
    2370            0 :         self.file_size
    2371            0 :     }
    2372            0 :     fn short_id(&self) -> std::string::String {
    2373            0 :         self.as_ref().short_id().to_string()
    2374            0 :     }
    2375            0 :     fn is_delta(&self) -> bool {
    2376            0 :         self.as_ref().is_delta()
    2377            0 :     }
    2378              : }
    2379              : 
    2380              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    2381            0 :     fn key_range(&self) -> &Range<Key> {
    2382            0 :         &self.layer_desc().key_range
    2383            0 :     }
    2384            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2385            0 :         &self.layer_desc().lsn_range
    2386            0 :     }
    2387            0 :     fn file_size(&self) -> u64 {
    2388            0 :         self.layer_desc().file_size
    2389            0 :     }
    2390            0 :     fn short_id(&self) -> std::string::String {
    2391            0 :         self.layer_desc().short_id().to_string()
    2392            0 :     }
    2393            0 :     fn is_delta(&self) -> bool {
    2394            0 :         true
    2395            0 :     }
    2396              : }
    2397              : 
    2398              : use crate::tenant::timeline::DeltaEntry;
    2399              : 
    2400              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    2401            0 :     fn key_range(&self) -> &Range<Key> {
    2402            0 :         &self.0.layer_desc().key_range
    2403            0 :     }
    2404            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2405            0 :         &self.0.layer_desc().lsn_range
    2406            0 :     }
    2407            0 :     fn file_size(&self) -> u64 {
    2408            0 :         self.0.layer_desc().file_size
    2409            0 :     }
    2410            0 :     fn short_id(&self) -> std::string::String {
    2411            0 :         self.0.layer_desc().short_id().to_string()
    2412            0 :     }
    2413            0 :     fn is_delta(&self) -> bool {
    2414            0 :         true
    2415            0 :     }
    2416              : }
    2417              : 
    2418              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    2419              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    2420              : 
    2421            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    2422            0 :         self.0.load_keys(ctx).await
    2423            0 :     }
    2424              : }
    2425              : 
    2426              : impl CompactionLayer<Key> for ResidentImageLayer {
    2427            0 :     fn key_range(&self) -> &Range<Key> {
    2428            0 :         &self.0.layer_desc().key_range
    2429            0 :     }
    2430            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    2431            0 :         &self.0.layer_desc().lsn_range
    2432            0 :     }
    2433            0 :     fn file_size(&self) -> u64 {
    2434            0 :         self.0.layer_desc().file_size
    2435            0 :     }
    2436            0 :     fn short_id(&self) -> std::string::String {
    2437            0 :         self.0.layer_desc().short_id().to_string()
    2438            0 :     }
    2439            0 :     fn is_delta(&self) -> bool {
    2440            0 :         false
    2441            0 :     }
    2442              : }
    2443              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta