LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 57.2 % 1165 666
Test Date: 2024-07-21 16:16:09 Functions: 25.3 % 91 23

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context};
      18              : use enumset::EnumSet;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::keyspace::ShardedRange;
      22              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      25              : use utils::id::TimelineId;
      26              : 
      27              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache;
      29              : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
      30              : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
      31              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      32              : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
      33              : use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
      34              : use crate::tenant::timeline::{Layer, ResidentLayer};
      35              : use crate::tenant::DeltaLayer;
      36              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      37              : 
      38              : use crate::keyspace::KeySpace;
      39              : use crate::repository::Key;
      40              : 
      41              : use utils::lsn::Lsn;
      42              : 
      43              : use pageserver_compaction::helpers::overlaps_with;
      44              : use pageserver_compaction::interface::*;
      45              : 
      46              : use super::CompactionError;
      47              : 
      48              : impl Timeline {
      49              :     /// TODO: cancellation
      50          364 :     pub(crate) async fn compact_legacy(
      51          364 :         self: &Arc<Self>,
      52          364 :         cancel: &CancellationToken,
      53          364 :         flags: EnumSet<CompactFlags>,
      54          364 :         ctx: &RequestContext,
      55          364 :     ) -> Result<(), CompactionError> {
      56          364 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
      57            0 :             return self.compact_with_gc(cancel, ctx).await;
      58          364 :         }
      59          364 : 
      60          364 :         // High level strategy for compaction / image creation:
      61          364 :         //
      62          364 :         // 1. First, calculate the desired "partitioning" of the
      63          364 :         // currently in-use key space. The goal is to partition the
      64          364 :         // key space into roughly fixed-size chunks, but also take into
      65          364 :         // account any existing image layers, and try to align the
      66          364 :         // chunk boundaries with the existing image layers to avoid
      67          364 :         // too much churn. Also try to align chunk boundaries with
      68          364 :         // relation boundaries.  In principle, we don't know about
      69          364 :         // relation boundaries here, we just deal with key-value
      70          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      71          364 :         // map relations into key-value pairs. But in practice we know
      72          364 :         // that 'field6' is the block number, and the fields 1-5
      73          364 :         // identify a relation. This is just an optimization,
      74          364 :         // though.
      75          364 :         //
      76          364 :         // 2. Once we know the partitioning, for each partition,
      77          364 :         // decide if it's time to create a new image layer. The
      78          364 :         // criteria is: there has been too much "churn" since the last
      79          364 :         // image layer? The "churn" is fuzzy concept, it's a
      80          364 :         // combination of too many delta files, or too much WAL in
      81          364 :         // total in the delta file. Or perhaps: if creating an image
      82          364 :         // file would allow to delete some older files.
      83          364 :         //
      84          364 :         // 3. After that, we compact all level0 delta files if there
      85          364 :         // are too many of them.  While compacting, we also garbage
      86          364 :         // collect any page versions that are no longer needed because
      87          364 :         // of the new image layers we created in step 2.
      88          364 :         //
      89          364 :         // TODO: This high level strategy hasn't been implemented yet.
      90          364 :         // Below are functions compact_level0() and create_image_layers()
      91          364 :         // but they are a bit ad hoc and don't quite work like it's explained
      92          364 :         // above. Rewrite it.
      93          364 : 
      94          364 :         // Is the timeline being deleted?
      95          364 :         if self.is_stopping() {
      96            0 :             trace!("Dropping out of compaction on timeline shutdown");
      97            0 :             return Err(CompactionError::ShuttingDown);
      98          364 :         }
      99          364 : 
     100          364 :         let target_file_size = self.get_checkpoint_distance();
     101              : 
     102              :         // Define partitioning schema if needed
     103              : 
     104              :         // FIXME: the match should only cover repartitioning, not the next steps
     105          364 :         let partition_count = match self
     106          364 :             .repartition(
     107          364 :                 self.get_last_record_lsn(),
     108          364 :                 self.get_compaction_target_size(),
     109          364 :                 flags,
     110          364 :                 ctx,
     111          364 :             )
     112        14135 :             .await
     113              :         {
     114          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     115          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     116          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     117          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     118          364 :                     .build();
     119          364 : 
     120          364 :                 // 2. Compact
     121          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     122        42647 :                 self.compact_level0(target_file_size, ctx).await?;
     123          364 :                 timer.stop_and_record();
     124          364 : 
     125          364 :                 // 3. Create new image layers for partitions that have been modified
     126          364 :                 // "enough".
     127          364 :                 let mut partitioning = dense_partitioning;
     128          364 :                 partitioning
     129          364 :                     .parts
     130          364 :                     .extend(sparse_partitioning.into_dense().parts);
     131          364 :                 let image_layers = self
     132          364 :                     .create_image_layers(
     133          364 :                         &partitioning,
     134          364 :                         lsn,
     135          364 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     136           14 :                             ImageLayerCreationMode::Force
     137              :                         } else {
     138          350 :                             ImageLayerCreationMode::Try
     139              :                         },
     140          364 :                         &image_ctx,
     141              :                     )
     142        14415 :                     .await?;
     143              : 
     144          364 :                 self.upload_new_image_layers(image_layers)?;
     145          364 :                 partitioning.parts.len()
     146              :             }
     147            0 :             Err(err) => {
     148            0 :                 // no partitioning? This is normal, if the timeline was just created
     149            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     150            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     151            0 :                 // error but continue.
     152            0 :                 //
     153            0 :                 // Suppress error when it's due to cancellation
     154            0 :                 if !self.cancel.is_cancelled() {
     155            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     156            0 :                 }
     157            0 :                 1
     158              :             }
     159              :         };
     160              : 
     161          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     162              :             // Limit the number of layer rewrites to the number of partitions: this means its
     163              :             // runtime should be comparable to a full round of image layer creations, rather than
     164              :             // being potentially much longer.
     165            0 :             let rewrite_max = partition_count;
     166            0 : 
     167            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     168          364 :         }
     169              : 
     170          364 :         Ok(())
     171          364 :     }
     172              : 
     173              :     /// Check for layers that are elegible to be rewritten:
     174              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     175              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     176              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     177              :     ///   rather not maintain compatibility with indefinitely.
     178              :     ///
     179              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     180              :     /// how much work it will try to do in each compaction pass.
     181            0 :     async fn compact_shard_ancestors(
     182            0 :         self: &Arc<Self>,
     183            0 :         rewrite_max: usize,
     184            0 :         ctx: &RequestContext,
     185            0 :     ) -> anyhow::Result<()> {
     186            0 :         let mut drop_layers = Vec::new();
     187            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     188            0 : 
     189            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     190            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     191            0 :         // pitr_interval, for example because a branchpoint references it.
     192            0 :         //
     193            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     194            0 :         // are rewriting layers.
     195            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     196            0 : 
     197            0 :         tracing::info!(
     198            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     199            0 :             *latest_gc_cutoff,
     200            0 :             self.gc_info.read().unwrap().cutoffs.time
     201              :         );
     202              : 
     203            0 :         let layers = self.layers.read().await;
     204            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     205            0 :             let layer = layers.get_from_desc(&layer_desc);
     206            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     207              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     208            0 :                 continue;
     209            0 :             }
     210            0 : 
     211            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     212            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     213            0 :             let layer_local_page_count = sharded_range.page_count();
     214            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     215            0 :             if layer_local_page_count == 0 {
     216              :                 // This ancestral layer only covers keys that belong to other shards.
     217              :                 // We include the full metadata in the log: if we had some critical bug that caused
     218              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     219            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     220            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     221              :                 );
     222              : 
     223            0 :                 if cfg!(debug_assertions) {
     224              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     225              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     226              :                     // should be !is_key_disposable()
     227            0 :                     let range = layer_desc.get_key_range();
     228            0 :                     let mut key = range.start;
     229            0 :                     while key < range.end {
     230            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     231            0 :                         key = key.next();
     232              :                     }
     233            0 :                 }
     234              : 
     235            0 :                 drop_layers.push(layer);
     236            0 :                 continue;
     237            0 :             } else if layer_local_page_count != u32::MAX
     238            0 :                 && layer_local_page_count == layer_raw_page_count
     239              :             {
     240            0 :                 debug!(%layer,
     241            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     242              :                     layer_local_page_count
     243              :                 );
     244            0 :                 continue;
     245            0 :             }
     246            0 : 
     247            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     248            0 :             if layer_local_page_count != u32::MAX
     249            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     250              :             {
     251            0 :                 debug!(%layer,
     252            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     253              :                     layer_local_page_count,
     254              :                     layer_raw_page_count
     255              :                 );
     256            0 :             }
     257              : 
     258              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     259              :             // without incurring the I/O cost of a rewrite.
     260            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     261            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     262            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     263            0 :                 continue;
     264            0 :             }
     265            0 : 
     266            0 :             if layer_desc.is_delta() {
     267              :                 // We do not yet implement rewrite of delta layers
     268            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     269            0 :                 continue;
     270            0 :             }
     271            0 : 
     272            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     273            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     274            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     275            0 :             if layer.metadata().generation == self.generation {
     276            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     277            0 :                 continue;
     278            0 :             }
     279            0 : 
     280            0 :             if layers_to_rewrite.len() >= rewrite_max {
     281            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     282            0 :                     layers_to_rewrite.len()
     283              :                 );
     284            0 :                 continue;
     285            0 :             }
     286            0 : 
     287            0 :             // Fall through: all our conditions for doing a rewrite passed.
     288            0 :             layers_to_rewrite.push(layer);
     289              :         }
     290              : 
     291              :         // Drop read lock on layer map before we start doing time-consuming I/O
     292            0 :         drop(layers);
     293            0 : 
     294            0 :         let mut replace_image_layers = Vec::new();
     295              : 
     296            0 :         for layer in layers_to_rewrite {
     297            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     298            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     299            0 :                 self.conf,
     300            0 :                 self.timeline_id,
     301            0 :                 self.tenant_shard_id,
     302            0 :                 &layer.layer_desc().key_range,
     303            0 :                 layer.layer_desc().image_layer_lsn(),
     304            0 :                 ctx,
     305            0 :             )
     306            0 :             .await?;
     307              : 
     308              :             // Safety of layer rewrites:
     309              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     310              :             //   cannot interfere with the new one.
     311              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     312              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     313              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     314              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     315              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     316              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     317              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     318              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     319              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     320              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     321            0 :             let resident = layer.download_and_keep_resident().await?;
     322              : 
     323            0 :             let keys_written = resident
     324            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     325            0 :                 .await?;
     326              : 
     327            0 :             if keys_written > 0 {
     328            0 :                 let new_layer = image_layer_writer.finish(self, ctx).await?;
     329            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     330            0 :                     layer.metadata().file_size,
     331            0 :                     new_layer.metadata().file_size);
     332              : 
     333            0 :                 replace_image_layers.push((layer, new_layer));
     334            0 :             } else {
     335            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     336            0 :                 // the layer has no data for us with the ShardedRange check above, but
     337            0 :                 drop_layers.push(layer);
     338            0 :             }
     339              :         }
     340              : 
     341              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     342              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     343              :         // to remote index) and be removed. This is inefficient but safe.
     344              :         fail::fail_point!("compact-shard-ancestors-localonly");
     345              : 
     346              :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     347            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     348            0 :             .await?;
     349              : 
     350              :         fail::fail_point!("compact-shard-ancestors-enqueued");
     351              : 
     352              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     353              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     354              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     355              :         // load.
     356            0 :         self.remote_client.wait_completion().await?;
     357              : 
     358              :         fail::fail_point!("compact-shard-ancestors-persistent");
     359              : 
     360            0 :         Ok(())
     361            0 :     }
     362              : 
     363              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     364              :     /// as Level 1 files.
     365          364 :     async fn compact_level0(
     366          364 :         self: &Arc<Self>,
     367          364 :         target_file_size: u64,
     368          364 :         ctx: &RequestContext,
     369          364 :     ) -> Result<(), CompactionError> {
     370              :         let CompactLevel0Phase1Result {
     371          364 :             new_layers,
     372          364 :             deltas_to_compact,
     373              :         } = {
     374          364 :             let phase1_span = info_span!("compact_level0_phase1");
     375          364 :             let ctx = ctx.attached_child();
     376          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     377          364 :                 version: Some(2),
     378          364 :                 tenant_id: Some(self.tenant_shard_id),
     379          364 :                 timeline_id: Some(self.timeline_id),
     380          364 :                 ..Default::default()
     381          364 :             };
     382          364 : 
     383          364 :             let begin = tokio::time::Instant::now();
     384          364 :             let phase1_layers_locked = self.layers.read().await;
     385          364 :             let now = tokio::time::Instant::now();
     386          364 :             stats.read_lock_acquisition_micros =
     387          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     388          364 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     389          364 :                 .instrument(phase1_span)
     390        42647 :                 .await?
     391              :         };
     392              : 
     393          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     394              :             // nothing to do
     395          336 :             return Ok(());
     396           28 :         }
     397           28 : 
     398           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     399            0 :             .await?;
     400           28 :         Ok(())
     401          364 :     }
     402              : 
     403              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     404          364 :     async fn compact_level0_phase1<'a>(
     405          364 :         self: &'a Arc<Self>,
     406          364 :         guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
     407          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     408          364 :         target_file_size: u64,
     409          364 :         ctx: &RequestContext,
     410          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     411          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     412          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     413          364 :         let layers = guard.layer_map();
     414          364 :         let level0_deltas = layers.get_level0_deltas()?;
     415          364 :         let mut level0_deltas = level0_deltas
     416          364 :             .into_iter()
     417         1616 :             .map(|x| guard.get_from_desc(&x))
     418          364 :             .collect_vec();
     419          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     420          364 : 
     421          364 :         // Only compact if enough layers have accumulated.
     422          364 :         let threshold = self.get_compaction_threshold();
     423          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     424          336 :             debug!(
     425            0 :                 level0_deltas = level0_deltas.len(),
     426            0 :                 threshold, "too few deltas to compact"
     427              :             );
     428          336 :             return Ok(CompactLevel0Phase1Result::default());
     429           28 :         }
     430           28 : 
     431           28 :         // Gather the files to compact in this iteration.
     432           28 :         //
     433           28 :         // Start with the oldest Level 0 delta file, and collect any other
     434           28 :         // level 0 files that form a contiguous sequence, such that the end
     435           28 :         // LSN of previous file matches the start LSN of the next file.
     436           28 :         //
     437           28 :         // Note that if the files don't form such a sequence, we might
     438           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     439           28 :         // us to get rid of the level 0 file, and compact the other files on
     440           28 :         // the next iteration. This could probably made smarter, but such
     441           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     442           28 :         // of a crash, partial download from cloud storage, or something like
     443           28 :         // that, so it's not a big deal in practice.
     444          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     445           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     446           28 : 
     447           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     448           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     449           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     450           28 : 
     451           28 :         // Accumulate the size of layers in `deltas_to_compact`
     452           28 :         let mut deltas_to_compact_bytes = 0;
     453           28 : 
     454           28 :         // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
     455           28 :         // checkpoint_distance each.  To avoid edge cases using extra system resources, bound our
     456           28 :         // work in this function to only operate on this much delta data at once.
     457           28 :         //
     458           28 :         // Take the max of the configured value & the default, so that tests that configure tiny values
     459           28 :         // can still use a sensible amount of memory, but if a deployed system configures bigger values we
     460           28 :         // still let them compact a full stack of L0s in one go.
     461           28 :         let delta_size_limit = std::cmp::max(
     462           28 :             self.get_compaction_threshold(),
     463           28 :             DEFAULT_COMPACTION_THRESHOLD,
     464           28 :         ) as u64
     465           28 :             * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
     466           28 : 
     467           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     468          402 :         for l in level0_deltas_iter {
     469          374 :             let lsn_range = &l.layer_desc().lsn_range;
     470          374 : 
     471          374 :             if lsn_range.start != prev_lsn_end {
     472            0 :                 break;
     473          374 :             }
     474          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     475          374 :             deltas_to_compact_bytes += l.metadata().file_size;
     476          374 :             prev_lsn_end = lsn_range.end;
     477          374 : 
     478          374 :             if deltas_to_compact_bytes >= delta_size_limit {
     479            0 :                 info!(
     480            0 :                     l0_deltas_selected = deltas_to_compact.len(),
     481            0 :                     l0_deltas_total = level0_deltas.len(),
     482            0 :                     "L0 compaction picker hit max delta layer size limit: {}",
     483              :                     delta_size_limit
     484              :                 );
     485              : 
     486              :                 // Proceed with compaction, but only a subset of L0s
     487            0 :                 break;
     488          374 :             }
     489              :         }
     490           28 :         let lsn_range = Range {
     491           28 :             start: deltas_to_compact
     492           28 :                 .first()
     493           28 :                 .unwrap()
     494           28 :                 .layer_desc()
     495           28 :                 .lsn_range
     496           28 :                 .start,
     497           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     498           28 :         };
     499           28 : 
     500           28 :         info!(
     501            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     502            0 :             lsn_range.start,
     503            0 :             lsn_range.end,
     504            0 :             deltas_to_compact.len(),
     505            0 :             level0_deltas.len()
     506              :         );
     507              : 
     508          402 :         for l in deltas_to_compact.iter() {
     509          402 :             info!("compact includes {l}");
     510              :         }
     511              : 
     512              :         // We don't need the original list of layers anymore. Drop it so that
     513              :         // we don't accidentally use it later in the function.
     514           28 :         drop(level0_deltas);
     515           28 : 
     516           28 :         stats.read_lock_held_prerequisites_micros = stats
     517           28 :             .read_lock_held_spawn_blocking_startup_micros
     518           28 :             .till_now();
     519           28 : 
     520           28 :         // Determine N largest holes where N is number of compacted layers.
     521           28 :         let max_holes = deltas_to_compact.len();
     522           28 :         let last_record_lsn = self.get_last_record_lsn();
     523           28 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     524           28 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     525           28 : 
     526           28 :         // min-heap (reserve space for one more element added before eviction)
     527           28 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     528           28 :         let mut prev: Option<Key> = None;
     529           28 : 
     530           28 :         let mut all_keys = Vec::new();
     531              : 
     532          402 :         for l in deltas_to_compact.iter() {
     533         2413 :             all_keys.extend(l.load_keys(ctx).await?);
     534              :         }
     535              : 
     536              :         // FIXME: should spawn_blocking the rest of this function
     537              : 
     538              :         // The current stdlib sorting implementation is designed in a way where it is
     539              :         // particularly fast where the slice is made up of sorted sub-ranges.
     540      4431792 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     541           28 : 
     542           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     543              : 
     544      2064038 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     545      2064038 :             if let Some(prev_key) = prev {
     546              :                 // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     547              :                 // compaction is the gap between data key and metadata keys.
     548      2064010 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     549            0 :                     && !Key::is_metadata_key(&prev_key)
     550              :                 {
     551            0 :                     let key_range = prev_key..next_key;
     552            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     553            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     554            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     555            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     556            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     557            0 :                     if coverage_size >= min_hole_coverage_size {
     558            0 :                         heap.push(Hole {
     559            0 :                             key_range,
     560            0 :                             coverage_size,
     561            0 :                         });
     562            0 :                         if heap.len() > max_holes {
     563            0 :                             heap.pop(); // remove smallest hole
     564            0 :                         }
     565            0 :                     }
     566      2064010 :                 }
     567           28 :             }
     568      2064038 :             prev = Some(next_key.next());
     569              :         }
     570           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     571           28 :         drop_rlock(guard);
     572           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     573           28 :         let mut holes = heap.into_vec();
     574           28 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     575           28 :         let mut next_hole = 0; // index of next hole in holes vector
     576           28 : 
     577           28 :         // This iterator walks through all key-value pairs from all the layers
     578           28 :         // we're compacting, in key, LSN order.
     579           28 :         let all_values_iter = all_keys.iter();
     580           28 : 
     581           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
     582           28 :         let mut all_keys_iter = all_keys
     583           28 :             .iter()
     584      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     585      2064010 :             .coalesce(|mut prev, cur| {
     586      2064010 :                 // Coalesce keys that belong to the same key pair.
     587      2064010 :                 // This ensures that compaction doesn't put them
     588      2064010 :                 // into different layer files.
     589      2064010 :                 // Still limit this by the target file size,
     590      2064010 :                 // so that we keep the size of the files in
     591      2064010 :                 // check.
     592      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     593        40038 :                     prev.2 += cur.2;
     594        40038 :                     Ok(prev)
     595              :                 } else {
     596      2023972 :                     Err((prev, cur))
     597              :                 }
     598      2064010 :             });
     599           28 : 
     600           28 :         // Merge the contents of all the input delta layers into a new set
     601           28 :         // of delta layers, based on the current partitioning.
     602           28 :         //
     603           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     604           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     605           28 :         // would be too large. In that case, we also split on the LSN dimension.
     606           28 :         //
     607           28 :         // LSN
     608           28 :         //  ^
     609           28 :         //  |
     610           28 :         //  | +-----------+            +--+--+--+--+
     611           28 :         //  | |           |            |  |  |  |  |
     612           28 :         //  | +-----------+            |  |  |  |  |
     613           28 :         //  | |           |            |  |  |  |  |
     614           28 :         //  | +-----------+     ==>    |  |  |  |  |
     615           28 :         //  | |           |            |  |  |  |  |
     616           28 :         //  | +-----------+            |  |  |  |  |
     617           28 :         //  | |           |            |  |  |  |  |
     618           28 :         //  | +-----------+            +--+--+--+--+
     619           28 :         //  |
     620           28 :         //  +--------------> key
     621           28 :         //
     622           28 :         //
     623           28 :         // If one key (X) has a lot of page versions:
     624           28 :         //
     625           28 :         // LSN
     626           28 :         //  ^
     627           28 :         //  |                                 (X)
     628           28 :         //  | +-----------+            +--+--+--+--+
     629           28 :         //  | |           |            |  |  |  |  |
     630           28 :         //  | +-----------+            |  |  +--+  |
     631           28 :         //  | |           |            |  |  |  |  |
     632           28 :         //  | +-----------+     ==>    |  |  |  |  |
     633           28 :         //  | |           |            |  |  +--+  |
     634           28 :         //  | +-----------+            |  |  |  |  |
     635           28 :         //  | |           |            |  |  |  |  |
     636           28 :         //  | +-----------+            +--+--+--+--+
     637           28 :         //  |
     638           28 :         //  +--------------> key
     639           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
     640           28 :         // based on the partitioning.
     641           28 :         //
     642           28 :         // TODO: we should also opportunistically materialize and
     643           28 :         // garbage collect what we can.
     644           28 :         let mut new_layers = Vec::new();
     645           28 :         let mut prev_key: Option<Key> = None;
     646           28 :         let mut writer: Option<DeltaLayerWriter> = None;
     647           28 :         let mut key_values_total_size = 0u64;
     648           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     649           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     650              : 
     651              :         for &DeltaEntry {
     652      2064038 :             key, lsn, ref val, ..
     653      2064066 :         } in all_values_iter
     654              :         {
     655      2064038 :             let value = val.load(ctx).await?;
     656      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     657      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     658      2064038 :             if !same_key || lsn == dup_end_lsn {
     659      2024000 :                 let mut next_key_size = 0u64;
     660      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
     661      2024000 :                 dup_start_lsn = Lsn::INVALID;
     662      2024000 :                 if !same_key {
     663      2024000 :                     dup_end_lsn = Lsn::INVALID;
     664      2024000 :                 }
     665              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     666      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     667      2024000 :                     next_key_size = next_size;
     668      2024000 :                     if key != next_key {
     669      2023972 :                         if dup_end_lsn.is_valid() {
     670            0 :                             // We are writting segment with duplicates:
     671            0 :                             // place all remaining values of this key in separate segment
     672            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     673            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     674      2023972 :                         }
     675      2023972 :                         break;
     676           28 :                     }
     677           28 :                     key_values_total_size += next_size;
     678           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     679           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     680           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     681              :                         // Split key between multiple layers: such layer can contain only single key
     682            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     683            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     684              :                         } else {
     685            0 :                             lsn // start with the first LSN for this key
     686              :                         };
     687            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     688            0 :                         break;
     689           28 :                     }
     690              :                 }
     691              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     692      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     693            0 :                     dup_start_lsn = dup_end_lsn;
     694            0 :                     dup_end_lsn = lsn_range.end;
     695      2024000 :                 }
     696      2024000 :                 if writer.is_some() {
     697      2023972 :                     let written_size = writer.as_mut().unwrap().size();
     698      2023972 :                     let contains_hole =
     699      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     700              :                     // check if key cause layer overflow or contains hole...
     701      2023972 :                     if is_dup_layer
     702      2023972 :                         || dup_end_lsn.is_valid()
     703      2023972 :                         || written_size + key_values_total_size > target_file_size
     704      2023692 :                         || contains_hole
     705              :                     {
     706              :                         // ... if so, flush previous layer and prepare to write new one
     707          280 :                         new_layers.push(
     708          280 :                             writer
     709          280 :                                 .take()
     710          280 :                                 .unwrap()
     711          280 :                                 .finish(prev_key.unwrap().next(), self, ctx)
     712          720 :                                 .await?,
     713              :                         );
     714          280 :                         writer = None;
     715          280 : 
     716          280 :                         if contains_hole {
     717            0 :                             // skip hole
     718            0 :                             next_hole += 1;
     719          280 :                         }
     720      2023692 :                     }
     721           28 :                 }
     722              :                 // Remember size of key value because at next iteration we will access next item
     723      2024000 :                 key_values_total_size = next_key_size;
     724        40038 :             }
     725      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     726            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     727            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     728            0 :                 )))
     729      2064038 :             });
     730              : 
     731      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
     732      2064038 :                 if writer.is_none() {
     733          308 :                     // Create writer if not initiaized yet
     734          308 :                     writer = Some(
     735              :                         DeltaLayerWriter::new(
     736          308 :                             self.conf,
     737          308 :                             self.timeline_id,
     738          308 :                             self.tenant_shard_id,
     739          308 :                             key,
     740          308 :                             if dup_end_lsn.is_valid() {
     741              :                                 // this is a layer containing slice of values of the same key
     742            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     743            0 :                                 dup_start_lsn..dup_end_lsn
     744              :                             } else {
     745          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     746          308 :                                 lsn_range.clone()
     747              :                             },
     748          308 :                             ctx,
     749              :                         )
     750          154 :                         .await?,
     751              :                     );
     752      2063730 :                 }
     753              : 
     754      2064038 :                 writer
     755      2064038 :                     .as_mut()
     756      2064038 :                     .unwrap()
     757      2064038 :                     .put_value(key, lsn, value, ctx)
     758         1566 :                     .await?;
     759              :             } else {
     760            0 :                 debug!(
     761            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     762            0 :                     key,
     763            0 :                     self.shard_identity.get_shard_number(&key)
     764              :                 );
     765              :             }
     766              : 
     767      2064038 :             if !new_layers.is_empty() {
     768        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     769      2044252 :             }
     770              : 
     771      2064038 :             prev_key = Some(key);
     772              :         }
     773           28 :         if let Some(writer) = writer {
     774         1991 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
     775            0 :         }
     776              : 
     777              :         // Sync layers
     778           28 :         if !new_layers.is_empty() {
     779              :             // Print a warning if the created layer is larger than double the target size
     780              :             // Add two pages for potential overhead. This should in theory be already
     781              :             // accounted for in the target calculation, but for very small targets,
     782              :             // we still might easily hit the limit otherwise.
     783           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     784          308 :             for layer in new_layers.iter() {
     785          308 :                 if layer.layer_desc().file_size > warn_limit {
     786            0 :                     warn!(
     787              :                         %layer,
     788            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     789              :                     );
     790          308 :                 }
     791              :             }
     792              : 
     793              :             // The writer.finish() above already did the fsync of the inodes.
     794              :             // We just need to fsync the directory in which these inodes are linked,
     795              :             // which we know to be the timeline directory.
     796              :             //
     797              :             // We use fatal_err() below because the after writer.finish() returns with success,
     798              :             // the in-memory state of the filesystem already has the layer file in its final place,
     799              :             // and subsequent pageserver code could think it's durable while it really isn't.
     800           28 :             let timeline_dir = VirtualFile::open(
     801           28 :                 &self
     802           28 :                     .conf
     803           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     804           28 :                 ctx,
     805           28 :             )
     806           14 :             .await
     807           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     808           28 :             timeline_dir
     809           28 :                 .sync_all()
     810           14 :                 .await
     811           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     812            0 :         }
     813              : 
     814           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     815           28 :         stats.new_deltas_count = Some(new_layers.len());
     816          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     817           28 : 
     818           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     819           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     820              :         {
     821           28 :             Ok(stats_json) => {
     822           28 :                 info!(
     823            0 :                     stats_json = stats_json.as_str(),
     824            0 :                     "compact_level0_phase1 stats available"
     825              :                 )
     826              :             }
     827            0 :             Err(e) => {
     828            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     829              :             }
     830              :         }
     831              : 
     832           28 :         Ok(CompactLevel0Phase1Result {
     833           28 :             new_layers,
     834           28 :             deltas_to_compact: deltas_to_compact
     835           28 :                 .into_iter()
     836          402 :                 .map(|x| x.drop_eviction_guard())
     837           28 :                 .collect::<Vec<_>>(),
     838           28 :         })
     839          364 :     }
     840              : }
     841              : 
     842              : #[derive(Default)]
     843              : struct CompactLevel0Phase1Result {
     844              :     new_layers: Vec<ResidentLayer>,
     845              :     deltas_to_compact: Vec<Layer>,
     846              : }
     847              : 
     848              : #[derive(Default)]
     849              : struct CompactLevel0Phase1StatsBuilder {
     850              :     version: Option<u64>,
     851              :     tenant_id: Option<TenantShardId>,
     852              :     timeline_id: Option<TimelineId>,
     853              :     read_lock_acquisition_micros: DurationRecorder,
     854              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     855              :     read_lock_held_key_sort_micros: DurationRecorder,
     856              :     read_lock_held_prerequisites_micros: DurationRecorder,
     857              :     read_lock_held_compute_holes_micros: DurationRecorder,
     858              :     read_lock_drop_micros: DurationRecorder,
     859              :     write_layer_files_micros: DurationRecorder,
     860              :     level0_deltas_count: Option<usize>,
     861              :     new_deltas_count: Option<usize>,
     862              :     new_deltas_size: Option<u64>,
     863              : }
     864              : 
     865              : #[derive(serde::Serialize)]
     866              : struct CompactLevel0Phase1Stats {
     867              :     version: u64,
     868              :     tenant_id: TenantShardId,
     869              :     timeline_id: TimelineId,
     870              :     read_lock_acquisition_micros: RecordedDuration,
     871              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     872              :     read_lock_held_key_sort_micros: RecordedDuration,
     873              :     read_lock_held_prerequisites_micros: RecordedDuration,
     874              :     read_lock_held_compute_holes_micros: RecordedDuration,
     875              :     read_lock_drop_micros: RecordedDuration,
     876              :     write_layer_files_micros: RecordedDuration,
     877              :     level0_deltas_count: usize,
     878              :     new_deltas_count: usize,
     879              :     new_deltas_size: u64,
     880              : }
     881              : 
     882              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     883              :     type Error = anyhow::Error;
     884              : 
     885           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     886           28 :         Ok(Self {
     887           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     888           28 :             tenant_id: value
     889           28 :                 .tenant_id
     890           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     891           28 :             timeline_id: value
     892           28 :                 .timeline_id
     893           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     894           28 :             read_lock_acquisition_micros: value
     895           28 :                 .read_lock_acquisition_micros
     896           28 :                 .into_recorded()
     897           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     898           28 :             read_lock_held_spawn_blocking_startup_micros: value
     899           28 :                 .read_lock_held_spawn_blocking_startup_micros
     900           28 :                 .into_recorded()
     901           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     902           28 :             read_lock_held_key_sort_micros: value
     903           28 :                 .read_lock_held_key_sort_micros
     904           28 :                 .into_recorded()
     905           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     906           28 :             read_lock_held_prerequisites_micros: value
     907           28 :                 .read_lock_held_prerequisites_micros
     908           28 :                 .into_recorded()
     909           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     910           28 :             read_lock_held_compute_holes_micros: value
     911           28 :                 .read_lock_held_compute_holes_micros
     912           28 :                 .into_recorded()
     913           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     914           28 :             read_lock_drop_micros: value
     915           28 :                 .read_lock_drop_micros
     916           28 :                 .into_recorded()
     917           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     918           28 :             write_layer_files_micros: value
     919           28 :                 .write_layer_files_micros
     920           28 :                 .into_recorded()
     921           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     922           28 :             level0_deltas_count: value
     923           28 :                 .level0_deltas_count
     924           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     925           28 :             new_deltas_count: value
     926           28 :                 .new_deltas_count
     927           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     928           28 :             new_deltas_size: value
     929           28 :                 .new_deltas_size
     930           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     931              :         })
     932           28 :     }
     933              : }
     934              : 
     935              : impl Timeline {
     936              :     /// Entry point for new tiered compaction algorithm.
     937              :     ///
     938              :     /// All the real work is in the implementation in the pageserver_compaction
     939              :     /// crate. The code here would apply to any algorithm implemented by the
     940              :     /// same interface, but tiered is the only one at the moment.
     941              :     ///
     942              :     /// TODO: cancellation
     943            0 :     pub(crate) async fn compact_tiered(
     944            0 :         self: &Arc<Self>,
     945            0 :         _cancel: &CancellationToken,
     946            0 :         ctx: &RequestContext,
     947            0 :     ) -> Result<(), CompactionError> {
     948            0 :         let fanout = self.get_compaction_threshold() as u64;
     949            0 :         let target_file_size = self.get_checkpoint_distance();
     950              : 
     951              :         // Find the top of the historical layers
     952            0 :         let end_lsn = {
     953            0 :             let guard = self.layers.read().await;
     954            0 :             let layers = guard.layer_map();
     955              : 
     956            0 :             let l0_deltas = layers.get_level0_deltas()?;
     957            0 :             drop(guard);
     958            0 : 
     959            0 :             // As an optimization, if we find that there are too few L0 layers,
     960            0 :             // bail out early. We know that the compaction algorithm would do
     961            0 :             // nothing in that case.
     962            0 :             if l0_deltas.len() < fanout as usize {
     963              :                 // doesn't need compacting
     964            0 :                 return Ok(());
     965            0 :             }
     966            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     967            0 :         };
     968            0 : 
     969            0 :         // Is the timeline being deleted?
     970            0 :         if self.is_stopping() {
     971            0 :             trace!("Dropping out of compaction on timeline shutdown");
     972            0 :             return Err(CompactionError::ShuttingDown);
     973            0 :         }
     974              : 
     975            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
     976              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
     977            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
     978            0 : 
     979            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     980            0 :             &mut adaptor,
     981            0 :             end_lsn,
     982            0 :             target_file_size,
     983            0 :             fanout,
     984            0 :             ctx,
     985            0 :         )
     986            0 :         .await?;
     987              : 
     988            0 :         adaptor.flush_updates().await?;
     989            0 :         Ok(())
     990            0 :     }
     991              : 
     992              :     /// An experimental compaction building block that combines compaction with garbage collection.
     993              :     ///
     994              :     /// The current implementation picks all delta + image layers that are below or intersecting with
     995              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
     996              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
     997              :     /// and create delta layers with all deltas >= gc horizon.
     998            4 :     pub(crate) async fn compact_with_gc(
     999            4 :         self: &Arc<Self>,
    1000            4 :         _cancel: &CancellationToken,
    1001            4 :         ctx: &RequestContext,
    1002            4 :     ) -> Result<(), CompactionError> {
    1003            4 :         use crate::tenant::storage_layer::ValueReconstructState;
    1004            4 :         use std::collections::BTreeSet;
    1005            4 : 
    1006            4 :         info!("running enhanced gc bottom-most compaction");
    1007              : 
    1008              :         scopeguard::defer! {
    1009              :             info!("done enhanced gc bottom-most compaction");
    1010              :         };
    1011              : 
    1012              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
    1013              :         // The layer selection has the following properties:
    1014              :         // 1. If a layer is in the selection, all layers below it are in the selection.
    1015              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
    1016            4 :         let (layer_selection, gc_cutoff) = {
    1017            4 :             let guard = self.layers.read().await;
    1018            4 :             let layers = guard.layer_map();
    1019            4 :             let gc_info = self.gc_info.read().unwrap();
    1020            4 :             if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
    1021            0 :                 return Err(CompactionError::Other(anyhow!(
    1022            0 :                     "enhanced legacy compaction currently does not support retain_lsns (branches)"
    1023            0 :                 )));
    1024            4 :             }
    1025            4 :             let gc_cutoff = gc_info.cutoffs.select_min();
    1026            4 :             let mut selected_layers = Vec::new();
    1027            4 :             // TODO: consider retain_lsns
    1028            4 :             drop(gc_info);
    1029           20 :             for desc in layers.iter_historic_layers() {
    1030           20 :                 if desc.get_lsn_range().start <= gc_cutoff {
    1031           16 :                     selected_layers.push(guard.get_from_desc(&desc));
    1032           16 :                 }
    1033              :             }
    1034            4 :             (selected_layers, gc_cutoff)
    1035            4 :         };
    1036            4 :         info!(
    1037            0 :             "picked {} layers for compaction with gc_cutoff={}",
    1038            0 :             layer_selection.len(),
    1039              :             gc_cutoff
    1040              :         );
    1041              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1042              :         // Also, collect the layer information to decide when to split the new delta layers.
    1043            4 :         let mut downloaded_layers = Vec::new();
    1044            4 :         let mut delta_split_points = BTreeSet::new();
    1045           20 :         for layer in &layer_selection {
    1046           16 :             let resident_layer = layer.download_and_keep_resident().await?;
    1047           16 :             downloaded_layers.push(resident_layer);
    1048           16 : 
    1049           16 :             let desc = layer.layer_desc();
    1050           16 :             if desc.is_delta() {
    1051            8 :                 // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
    1052            8 :                 // so that we can avoid having too many small delta layers.
    1053            8 :                 let key_range = desc.get_key_range();
    1054            8 :                 delta_split_points.insert(key_range.start);
    1055            8 :                 delta_split_points.insert(key_range.end);
    1056            8 :             }
    1057              :         }
    1058            4 :         let mut delta_layers = Vec::new();
    1059            4 :         let mut image_layers = Vec::new();
    1060           20 :         for resident_layer in &downloaded_layers {
    1061           16 :             if resident_layer.layer_desc().is_delta() {
    1062            8 :                 let layer = resident_layer.get_as_delta(ctx).await?;
    1063            8 :                 delta_layers.push(layer);
    1064            8 :             } else {
    1065            8 :                 let layer = resident_layer.get_as_image(ctx).await?;
    1066            8 :                 image_layers.push(layer);
    1067              :             }
    1068              :         }
    1069            4 :         let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
    1070            4 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1071            4 :         // Data of the same key.
    1072            4 :         let mut accumulated_values = Vec::new();
    1073            4 :         let mut last_key: Option<Key> = None;
    1074              : 
    1075              :         /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
    1076           72 :         async fn flush_accumulated_states(
    1077           72 :             tline: &Arc<Timeline>,
    1078           72 :             key: Key,
    1079           72 :             accumulated_values: &[(Key, Lsn, crate::repository::Value)],
    1080           72 :             horizon: Lsn,
    1081           72 :         ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
    1082           72 :             let mut base_image = None;
    1083           72 :             let mut keys_above_horizon = Vec::new();
    1084           72 :             let mut delta_above_base_image = Vec::new();
    1085              :             // We have a list of deltas/images. We want to create image layers while collect garbages.
    1086           88 :             for (key, lsn, val) in accumulated_values.iter().rev() {
    1087           88 :                 if *lsn > horizon {
    1088            4 :                     if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
    1089            0 :                         if *prev_lsn == *lsn {
    1090              :                             // The case that we have an LSN with both data from the delta layer and the image layer. As
    1091              :                             // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1092              :                             // drop this delta and keep the image.
    1093              :                             //
    1094              :                             // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1095              :                             // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1096              :                             // dropped.
    1097            0 :                             continue;
    1098            0 :                         }
    1099            4 :                     }
    1100            4 :                     keys_above_horizon.push((*key, *lsn, val.clone()));
    1101           84 :                 } else if *lsn <= horizon {
    1102           84 :                     match val {
    1103           72 :                         crate::repository::Value::Image(image) => {
    1104           72 :                             base_image = Some((*lsn, image.clone()));
    1105           72 :                             break;
    1106              :                         }
    1107           12 :                         crate::repository::Value::WalRecord(wal) => {
    1108           12 :                             delta_above_base_image.push((*lsn, wal.clone()));
    1109           12 :                         }
    1110              :                     }
    1111            0 :                 }
    1112              :             }
    1113              :             // do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
    1114           72 :             keys_above_horizon.reverse();
    1115           72 :             let state = ValueReconstructState {
    1116           72 :                 img: base_image,
    1117           72 :                 records: delta_above_base_image,
    1118           72 :             };
    1119           72 :             let img = tline.reconstruct_value(key, horizon, state).await?;
    1120           72 :             Ok((keys_above_horizon, img))
    1121           72 :         }
    1122              : 
    1123           72 :         async fn flush_deltas(
    1124           72 :             deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
    1125           72 :             last_key: Key,
    1126           72 :             delta_split_points: &[Key],
    1127           72 :             current_delta_split_point: &mut usize,
    1128           72 :             tline: &Arc<Timeline>,
    1129           72 :             gc_cutoff: Lsn,
    1130           72 :             ctx: &RequestContext,
    1131           72 :         ) -> anyhow::Result<Option<ResidentLayer>> {
    1132           72 :             // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
    1133           72 :             // overlapping layers.
    1134           72 :             //
    1135           72 :             // If we have a structure like this:
    1136           72 :             //
    1137           72 :             // | Delta 1 |         | Delta 4 |
    1138           72 :             // |---------| Delta 2 |---------|
    1139           72 :             // | Delta 3 |         | Delta 5 |
    1140           72 :             //
    1141           72 :             // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
    1142           72 :             // A simple solution here is to split the delta layers using the original boundary, while this
    1143           72 :             // might produce a lot of small layers. This should be improved and fixed in the future.
    1144           72 :             let mut need_split = false;
    1145           88 :             while *current_delta_split_point < delta_split_points.len()
    1146           76 :                 && last_key >= delta_split_points[*current_delta_split_point]
    1147           16 :             {
    1148           16 :                 *current_delta_split_point += 1;
    1149           16 :                 need_split = true;
    1150           16 :             }
    1151           72 :             if !need_split {
    1152           56 :                 return Ok(None);
    1153           16 :             }
    1154           16 :             let deltas = std::mem::take(deltas);
    1155           16 :             if deltas.is_empty() {
    1156           12 :                 return Ok(None);
    1157            4 :             }
    1158            4 :             let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
    1159            4 :             let mut delta_layer_writer = DeltaLayerWriter::new(
    1160            4 :                 tline.conf,
    1161            4 :                 tline.timeline_id,
    1162            4 :                 tline.tenant_shard_id,
    1163            4 :                 deltas.first().unwrap().0,
    1164            4 :                 gc_cutoff..end_lsn,
    1165            4 :                 ctx,
    1166            4 :             )
    1167            2 :             .await?;
    1168            4 :             let key_end = deltas.last().unwrap().0.next();
    1169            8 :             for (key, lsn, val) in deltas {
    1170            4 :                 delta_layer_writer.put_value(key, lsn, val, ctx).await?;
    1171              :             }
    1172           10 :             let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
    1173            4 :             Ok(Some(delta_layer))
    1174           72 :         }
    1175              : 
    1176            4 :         let mut image_layer_writer = ImageLayerWriter::new(
    1177            4 :             self.conf,
    1178            4 :             self.timeline_id,
    1179            4 :             self.tenant_shard_id,
    1180            4 :             &(Key::MIN..Key::MAX), // covers the full key range
    1181            4 :             gc_cutoff,
    1182            4 :             ctx,
    1183            4 :         )
    1184            2 :         .await?;
    1185              : 
    1186            4 :         let mut delta_values = Vec::new();
    1187            4 :         let delta_split_points = delta_split_points.into_iter().collect_vec();
    1188            4 :         let mut current_delta_split_point = 0;
    1189            4 :         let mut delta_layers = Vec::new();
    1190          100 :         while let Some((key, lsn, val)) = merge_iter.next().await? {
    1191           96 :             if last_key.is_none() || last_key.as_ref() == Some(&key) {
    1192           28 :                 if last_key.is_none() {
    1193            4 :                     last_key = Some(key);
    1194           24 :                 }
    1195           28 :                 accumulated_values.push((key, lsn, val));
    1196              :             } else {
    1197           68 :                 let last_key = last_key.as_mut().unwrap();
    1198           68 :                 let (deltas, image) =
    1199           68 :                     flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff)
    1200            0 :                         .await?;
    1201              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1202           69 :                 image_layer_writer.put_image(*last_key, image, ctx).await?;
    1203           68 :                 delta_values.extend(deltas);
    1204           68 :                 delta_layers.extend(
    1205           68 :                     flush_deltas(
    1206           68 :                         &mut delta_values,
    1207           68 :                         *last_key,
    1208           68 :                         &delta_split_points,
    1209           68 :                         &mut current_delta_split_point,
    1210           68 :                         self,
    1211           68 :                         gc_cutoff,
    1212           68 :                         ctx,
    1213           68 :                     )
    1214           12 :                     .await?,
    1215              :                 );
    1216           68 :                 accumulated_values.clear();
    1217           68 :                 *last_key = key;
    1218           68 :                 accumulated_values.push((key, lsn, val));
    1219              :             }
    1220              :         }
    1221              : 
    1222            4 :         let last_key = last_key.expect("no keys produced during compaction");
    1223              :         // TODO: move this part to the loop body
    1224            4 :         let (deltas, image) =
    1225            4 :             flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
    1226              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1227            4 :         image_layer_writer.put_image(last_key, image, ctx).await?;
    1228            4 :         delta_values.extend(deltas);
    1229            4 :         delta_layers.extend(
    1230            4 :             flush_deltas(
    1231            4 :                 &mut delta_values,
    1232            4 :                 last_key,
    1233            4 :                 &delta_split_points,
    1234            4 :                 &mut current_delta_split_point,
    1235            4 :                 self,
    1236            4 :                 gc_cutoff,
    1237            4 :                 ctx,
    1238            4 :             )
    1239            0 :             .await?,
    1240              :         );
    1241              : 
    1242            9 :         let image_layer = image_layer_writer.finish(self, ctx).await?;
    1243            4 :         info!(
    1244            0 :             "produced {} delta layers and {} image layers",
    1245            0 :             delta_layers.len(),
    1246              :             1
    1247              :         );
    1248            4 :         let mut compact_to = Vec::new();
    1249            4 :         compact_to.extend(delta_layers);
    1250            4 :         compact_to.push(image_layer);
    1251              :         // Step 3: Place back to the layer map.
    1252              :         {
    1253            4 :             let mut guard = self.layers.write().await;
    1254            4 :             guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    1255            4 :         };
    1256            4 : 
    1257            4 :         self.remote_client
    1258            4 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    1259            4 :         Ok(())
    1260            4 :     }
    1261              : }
    1262              : 
    1263              : struct TimelineAdaptor {
    1264              :     timeline: Arc<Timeline>,
    1265              : 
    1266              :     keyspace: (Lsn, KeySpace),
    1267              : 
    1268              :     new_deltas: Vec<ResidentLayer>,
    1269              :     new_images: Vec<ResidentLayer>,
    1270              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    1271              : }
    1272              : 
    1273              : impl TimelineAdaptor {
    1274            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    1275            0 :         Self {
    1276            0 :             timeline: timeline.clone(),
    1277            0 :             keyspace,
    1278            0 :             new_images: Vec::new(),
    1279            0 :             new_deltas: Vec::new(),
    1280            0 :             layers_to_delete: Vec::new(),
    1281            0 :         }
    1282            0 :     }
    1283              : 
    1284            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
    1285            0 :         let layers_to_delete = {
    1286            0 :             let guard = self.timeline.layers.read().await;
    1287            0 :             self.layers_to_delete
    1288            0 :                 .iter()
    1289            0 :                 .map(|x| guard.get_from_desc(x))
    1290            0 :                 .collect::<Vec<Layer>>()
    1291            0 :         };
    1292            0 :         self.timeline
    1293            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    1294            0 :             .await?;
    1295              : 
    1296            0 :         self.timeline
    1297            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    1298              : 
    1299            0 :         self.new_deltas.clear();
    1300            0 :         self.layers_to_delete.clear();
    1301            0 :         Ok(())
    1302            0 :     }
    1303              : }
    1304              : 
    1305              : #[derive(Clone)]
    1306              : struct ResidentDeltaLayer(ResidentLayer);
    1307              : #[derive(Clone)]
    1308              : struct ResidentImageLayer(ResidentLayer);
    1309              : 
    1310              : impl CompactionJobExecutor for TimelineAdaptor {
    1311              :     type Key = crate::repository::Key;
    1312              : 
    1313              :     type Layer = OwnArc<PersistentLayerDesc>;
    1314              :     type DeltaLayer = ResidentDeltaLayer;
    1315              :     type ImageLayer = ResidentImageLayer;
    1316              : 
    1317              :     type RequestContext = crate::context::RequestContext;
    1318              : 
    1319            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    1320            0 :         self.timeline.get_shard_identity()
    1321            0 :     }
    1322              : 
    1323            0 :     async fn get_layers(
    1324            0 :         &mut self,
    1325            0 :         key_range: &Range<Key>,
    1326            0 :         lsn_range: &Range<Lsn>,
    1327            0 :         _ctx: &RequestContext,
    1328            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    1329            0 :         self.flush_updates().await?;
    1330              : 
    1331            0 :         let guard = self.timeline.layers.read().await;
    1332            0 :         let layer_map = guard.layer_map();
    1333            0 : 
    1334            0 :         let result = layer_map
    1335            0 :             .iter_historic_layers()
    1336            0 :             .filter(|l| {
    1337            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    1338            0 :             })
    1339            0 :             .map(OwnArc)
    1340            0 :             .collect();
    1341            0 :         Ok(result)
    1342            0 :     }
    1343              : 
    1344            0 :     async fn get_keyspace(
    1345            0 :         &mut self,
    1346            0 :         key_range: &Range<Key>,
    1347            0 :         lsn: Lsn,
    1348            0 :         _ctx: &RequestContext,
    1349            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    1350            0 :         if lsn == self.keyspace.0 {
    1351            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    1352            0 :                 &self.keyspace.1.ranges,
    1353            0 :                 key_range,
    1354            0 :             ))
    1355              :         } else {
    1356              :             // The current compaction implementatin only ever requests the key space
    1357              :             // at the compaction end LSN.
    1358            0 :             anyhow::bail!("keyspace not available for requested lsn");
    1359              :         }
    1360            0 :     }
    1361              : 
    1362            0 :     async fn downcast_delta_layer(
    1363            0 :         &self,
    1364            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1365            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    1366            0 :         // this is a lot more complex than a simple downcast...
    1367            0 :         if layer.is_delta() {
    1368            0 :             let l = {
    1369            0 :                 let guard = self.timeline.layers.read().await;
    1370            0 :                 guard.get_from_desc(layer)
    1371              :             };
    1372            0 :             let result = l.download_and_keep_resident().await?;
    1373              : 
    1374            0 :             Ok(Some(ResidentDeltaLayer(result)))
    1375              :         } else {
    1376            0 :             Ok(None)
    1377              :         }
    1378            0 :     }
    1379              : 
    1380            0 :     async fn create_image(
    1381            0 :         &mut self,
    1382            0 :         lsn: Lsn,
    1383            0 :         key_range: &Range<Key>,
    1384            0 :         ctx: &RequestContext,
    1385            0 :     ) -> anyhow::Result<()> {
    1386            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    1387            0 :     }
    1388              : 
    1389            0 :     async fn create_delta(
    1390            0 :         &mut self,
    1391            0 :         lsn_range: &Range<Lsn>,
    1392            0 :         key_range: &Range<Key>,
    1393            0 :         input_layers: &[ResidentDeltaLayer],
    1394            0 :         ctx: &RequestContext,
    1395            0 :     ) -> anyhow::Result<()> {
    1396            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1397              : 
    1398            0 :         let mut all_entries = Vec::new();
    1399            0 :         for dl in input_layers.iter() {
    1400            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    1401              :         }
    1402              : 
    1403              :         // The current stdlib sorting implementation is designed in a way where it is
    1404              :         // particularly fast where the slice is made up of sorted sub-ranges.
    1405            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1406              : 
    1407            0 :         let mut writer = DeltaLayerWriter::new(
    1408            0 :             self.timeline.conf,
    1409            0 :             self.timeline.timeline_id,
    1410            0 :             self.timeline.tenant_shard_id,
    1411            0 :             key_range.start,
    1412            0 :             lsn_range.clone(),
    1413            0 :             ctx,
    1414            0 :         )
    1415            0 :         .await?;
    1416              : 
    1417            0 :         let mut dup_values = 0;
    1418            0 : 
    1419            0 :         // This iterator walks through all key-value pairs from all the layers
    1420            0 :         // we're compacting, in key, LSN order.
    1421            0 :         let mut prev: Option<(Key, Lsn)> = None;
    1422              :         for &DeltaEntry {
    1423            0 :             key, lsn, ref val, ..
    1424            0 :         } in all_entries.iter()
    1425              :         {
    1426            0 :             if prev == Some((key, lsn)) {
    1427              :                 // This is a duplicate. Skip it.
    1428              :                 //
    1429              :                 // It can happen if compaction is interrupted after writing some
    1430              :                 // layers but not all, and we are compacting the range again.
    1431              :                 // The calculations in the algorithm assume that there are no
    1432              :                 // duplicates, so the math on targeted file size is likely off,
    1433              :                 // and we will create smaller files than expected.
    1434            0 :                 dup_values += 1;
    1435            0 :                 continue;
    1436            0 :             }
    1437              : 
    1438            0 :             let value = val.load(ctx).await?;
    1439              : 
    1440            0 :             writer.put_value(key, lsn, value, ctx).await?;
    1441              : 
    1442            0 :             prev = Some((key, lsn));
    1443              :         }
    1444              : 
    1445            0 :         if dup_values > 0 {
    1446            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    1447            0 :         }
    1448              : 
    1449            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1450            0 :             Err(anyhow::anyhow!(
    1451            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    1452            0 :             ))
    1453            0 :         });
    1454              : 
    1455            0 :         let new_delta_layer = writer
    1456            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    1457            0 :             .await?;
    1458              : 
    1459            0 :         self.new_deltas.push(new_delta_layer);
    1460            0 :         Ok(())
    1461            0 :     }
    1462              : 
    1463            0 :     async fn delete_layer(
    1464            0 :         &mut self,
    1465            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1466            0 :         _ctx: &RequestContext,
    1467            0 :     ) -> anyhow::Result<()> {
    1468            0 :         self.layers_to_delete.push(layer.clone().0);
    1469            0 :         Ok(())
    1470            0 :     }
    1471              : }
    1472              : 
    1473              : impl TimelineAdaptor {
    1474            0 :     async fn create_image_impl(
    1475            0 :         &mut self,
    1476            0 :         lsn: Lsn,
    1477            0 :         key_range: &Range<Key>,
    1478            0 :         ctx: &RequestContext,
    1479            0 :     ) -> Result<(), CreateImageLayersError> {
    1480            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    1481              : 
    1482            0 :         let image_layer_writer = ImageLayerWriter::new(
    1483            0 :             self.timeline.conf,
    1484            0 :             self.timeline.timeline_id,
    1485            0 :             self.timeline.tenant_shard_id,
    1486            0 :             key_range,
    1487            0 :             lsn,
    1488            0 :             ctx,
    1489            0 :         )
    1490            0 :         .await?;
    1491              : 
    1492            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1493            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    1494            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1495            0 :             )))
    1496            0 :         });
    1497              : 
    1498            0 :         let keyspace = KeySpace {
    1499            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    1500              :         };
    1501              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    1502            0 :         let start = Key::MIN;
    1503              :         let ImageLayerCreationOutcome {
    1504            0 :             image,
    1505              :             next_start_key: _,
    1506            0 :         } = self
    1507            0 :             .timeline
    1508            0 :             .create_image_layer_for_rel_blocks(
    1509            0 :                 &keyspace,
    1510            0 :                 image_layer_writer,
    1511            0 :                 lsn,
    1512            0 :                 ctx,
    1513            0 :                 key_range.clone(),
    1514            0 :                 start,
    1515            0 :             )
    1516            0 :             .await?;
    1517              : 
    1518            0 :         if let Some(image_layer) = image {
    1519            0 :             self.new_images.push(image_layer);
    1520            0 :         }
    1521              : 
    1522            0 :         timer.stop_and_record();
    1523            0 : 
    1524            0 :         Ok(())
    1525            0 :     }
    1526              : }
    1527              : 
    1528              : impl CompactionRequestContext for crate::context::RequestContext {}
    1529              : 
    1530              : #[derive(Debug, Clone)]
    1531              : pub struct OwnArc<T>(pub Arc<T>);
    1532              : 
    1533              : impl<T> Deref for OwnArc<T> {
    1534              :     type Target = <Arc<T> as Deref>::Target;
    1535            0 :     fn deref(&self) -> &Self::Target {
    1536            0 :         &self.0
    1537            0 :     }
    1538              : }
    1539              : 
    1540              : impl<T> AsRef<T> for OwnArc<T> {
    1541            0 :     fn as_ref(&self) -> &T {
    1542            0 :         self.0.as_ref()
    1543            0 :     }
    1544              : }
    1545              : 
    1546              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1547            0 :     fn key_range(&self) -> &Range<Key> {
    1548            0 :         &self.key_range
    1549            0 :     }
    1550            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1551            0 :         &self.lsn_range
    1552            0 :     }
    1553            0 :     fn file_size(&self) -> u64 {
    1554            0 :         self.file_size
    1555            0 :     }
    1556            0 :     fn short_id(&self) -> std::string::String {
    1557            0 :         self.as_ref().short_id().to_string()
    1558            0 :     }
    1559            0 :     fn is_delta(&self) -> bool {
    1560            0 :         self.as_ref().is_delta()
    1561            0 :     }
    1562              : }
    1563              : 
    1564              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1565            0 :     fn key_range(&self) -> &Range<Key> {
    1566            0 :         &self.layer_desc().key_range
    1567            0 :     }
    1568            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1569            0 :         &self.layer_desc().lsn_range
    1570            0 :     }
    1571            0 :     fn file_size(&self) -> u64 {
    1572            0 :         self.layer_desc().file_size
    1573            0 :     }
    1574            0 :     fn short_id(&self) -> std::string::String {
    1575            0 :         self.layer_desc().short_id().to_string()
    1576            0 :     }
    1577            0 :     fn is_delta(&self) -> bool {
    1578            0 :         true
    1579            0 :     }
    1580              : }
    1581              : 
    1582              : use crate::tenant::timeline::DeltaEntry;
    1583              : 
    1584              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1585            0 :     fn key_range(&self) -> &Range<Key> {
    1586            0 :         &self.0.layer_desc().key_range
    1587            0 :     }
    1588            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1589            0 :         &self.0.layer_desc().lsn_range
    1590            0 :     }
    1591            0 :     fn file_size(&self) -> u64 {
    1592            0 :         self.0.layer_desc().file_size
    1593            0 :     }
    1594            0 :     fn short_id(&self) -> std::string::String {
    1595            0 :         self.0.layer_desc().short_id().to_string()
    1596            0 :     }
    1597            0 :     fn is_delta(&self) -> bool {
    1598            0 :         true
    1599            0 :     }
    1600              : }
    1601              : 
    1602              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1603              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1604              : 
    1605            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1606            0 :         self.0.load_keys(ctx).await
    1607            0 :     }
    1608              : }
    1609              : 
    1610              : impl CompactionLayer<Key> for ResidentImageLayer {
    1611            0 :     fn key_range(&self) -> &Range<Key> {
    1612            0 :         &self.0.layer_desc().key_range
    1613            0 :     }
    1614            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1615            0 :         &self.0.layer_desc().lsn_range
    1616            0 :     }
    1617            0 :     fn file_size(&self) -> u64 {
    1618            0 :         self.0.layer_desc().file_size
    1619            0 :     }
    1620            0 :     fn short_id(&self) -> std::string::String {
    1621            0 :         self.0.layer_desc().short_id().to_string()
    1622            0 :     }
    1623            0 :     fn is_delta(&self) -> bool {
    1624            0 :         false
    1625            0 :     }
    1626              : }
    1627              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta