LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 02e8c57acd6e2b986849f552ca30280d54699b79.info Lines: 55.4 % 1140 632
Test Date: 2024-06-26 17:13:54 Functions: 25.3 % 95 24

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context};
      18              : use enumset::EnumSet;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::keyspace::ShardedRange;
      22              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      25              : use utils::id::TimelineId;
      26              : 
      27              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache;
      29              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      30              : use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
      31              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      32              : use crate::tenant::timeline::{Layer, ResidentLayer};
      33              : use crate::tenant::DeltaLayer;
      34              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      35              : 
      36              : use crate::keyspace::KeySpace;
      37              : use crate::repository::Key;
      38              : 
      39              : use utils::lsn::Lsn;
      40              : 
      41              : use pageserver_compaction::helpers::overlaps_with;
      42              : use pageserver_compaction::interface::*;
      43              : 
      44              : use super::CompactionError;
      45              : 
      46              : impl Timeline {
      47              :     /// TODO: cancellation
      48          364 :     pub(crate) async fn compact_legacy(
      49          364 :         self: &Arc<Self>,
      50          364 :         cancel: &CancellationToken,
      51          364 :         flags: EnumSet<CompactFlags>,
      52          364 :         ctx: &RequestContext,
      53          364 :     ) -> Result<(), CompactionError> {
      54          364 :         if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
      55            0 :             return self.compact_with_gc(cancel, ctx).await;
      56          364 :         }
      57          364 : 
      58          364 :         // High level strategy for compaction / image creation:
      59          364 :         //
      60          364 :         // 1. First, calculate the desired "partitioning" of the
      61          364 :         // currently in-use key space. The goal is to partition the
      62          364 :         // key space into roughly fixed-size chunks, but also take into
      63          364 :         // account any existing image layers, and try to align the
      64          364 :         // chunk boundaries with the existing image layers to avoid
      65          364 :         // too much churn. Also try to align chunk boundaries with
      66          364 :         // relation boundaries.  In principle, we don't know about
      67          364 :         // relation boundaries here, we just deal with key-value
      68          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      69          364 :         // map relations into key-value pairs. But in practice we know
      70          364 :         // that 'field6' is the block number, and the fields 1-5
      71          364 :         // identify a relation. This is just an optimization,
      72          364 :         // though.
      73          364 :         //
      74          364 :         // 2. Once we know the partitioning, for each partition,
      75          364 :         // decide if it's time to create a new image layer. The
      76          364 :         // criteria is: there has been too much "churn" since the last
      77          364 :         // image layer? The "churn" is fuzzy concept, it's a
      78          364 :         // combination of too many delta files, or too much WAL in
      79          364 :         // total in the delta file. Or perhaps: if creating an image
      80          364 :         // file would allow to delete some older files.
      81          364 :         //
      82          364 :         // 3. After that, we compact all level0 delta files if there
      83          364 :         // are too many of them.  While compacting, we also garbage
      84          364 :         // collect any page versions that are no longer needed because
      85          364 :         // of the new image layers we created in step 2.
      86          364 :         //
      87          364 :         // TODO: This high level strategy hasn't been implemented yet.
      88          364 :         // Below are functions compact_level0() and create_image_layers()
      89          364 :         // but they are a bit ad hoc and don't quite work like it's explained
      90          364 :         // above. Rewrite it.
      91          364 : 
      92          364 :         // Is the timeline being deleted?
      93          364 :         if self.is_stopping() {
      94            0 :             trace!("Dropping out of compaction on timeline shutdown");
      95            0 :             return Err(CompactionError::ShuttingDown);
      96          364 :         }
      97          364 : 
      98          364 :         let target_file_size = self.get_checkpoint_distance();
      99              : 
     100              :         // Define partitioning schema if needed
     101              : 
     102              :         // FIXME: the match should only cover repartitioning, not the next steps
     103          364 :         let partition_count = match self
     104          364 :             .repartition(
     105          364 :                 self.get_last_record_lsn(),
     106          364 :                 self.get_compaction_target_size(),
     107          364 :                 flags,
     108          364 :                 ctx,
     109          364 :             )
     110        14036 :             .await
     111              :         {
     112          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     113          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     114          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     115          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     116          364 :                     .build();
     117          364 : 
     118          364 :                 // 2. Compact
     119          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     120        42655 :                 self.compact_level0(target_file_size, ctx).await?;
     121          364 :                 timer.stop_and_record();
     122          364 : 
     123          364 :                 // 3. Create new image layers for partitions that have been modified
     124          364 :                 // "enough".
     125          364 :                 let mut partitioning = dense_partitioning;
     126          364 :                 partitioning
     127          364 :                     .parts
     128          364 :                     .extend(sparse_partitioning.into_dense().parts);
     129          364 :                 let image_layers = self
     130          364 :                     .create_image_layers(
     131          364 :                         &partitioning,
     132          364 :                         lsn,
     133          364 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     134           14 :                             ImageLayerCreationMode::Force
     135              :                         } else {
     136          350 :                             ImageLayerCreationMode::Try
     137              :                         },
     138          364 :                         &image_ctx,
     139              :                     )
     140        14414 :                     .await?;
     141              : 
     142          364 :                 self.upload_new_image_layers(image_layers)?;
     143          364 :                 partitioning.parts.len()
     144              :             }
     145            0 :             Err(err) => {
     146            0 :                 // no partitioning? This is normal, if the timeline was just created
     147            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     148            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     149            0 :                 // error but continue.
     150            0 :                 //
     151            0 :                 // Suppress error when it's due to cancellation
     152            0 :                 if !self.cancel.is_cancelled() {
     153            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     154            0 :                 }
     155            0 :                 1
     156              :             }
     157              :         };
     158              : 
     159          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     160              :             // Limit the number of layer rewrites to the number of partitions: this means its
     161              :             // runtime should be comparable to a full round of image layer creations, rather than
     162              :             // being potentially much longer.
     163            0 :             let rewrite_max = partition_count;
     164            0 : 
     165            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     166          364 :         }
     167              : 
     168          364 :         Ok(())
     169          364 :     }
     170              : 
     171              :     /// Check for layers that are elegible to be rewritten:
     172              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     173              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     174              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     175              :     ///   rather not maintain compatibility with indefinitely.
     176              :     ///
     177              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     178              :     /// how much work it will try to do in each compaction pass.
     179            0 :     async fn compact_shard_ancestors(
     180            0 :         self: &Arc<Self>,
     181            0 :         rewrite_max: usize,
     182            0 :         ctx: &RequestContext,
     183            0 :     ) -> anyhow::Result<()> {
     184            0 :         let mut drop_layers = Vec::new();
     185            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     186            0 : 
     187            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     188            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     189            0 :         // pitr_interval, for example because a branchpoint references it.
     190            0 :         //
     191            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     192            0 :         // are rewriting layers.
     193            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     194            0 : 
     195            0 :         tracing::info!(
     196            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     197            0 :             *latest_gc_cutoff,
     198            0 :             self.gc_info.read().unwrap().cutoffs.pitr
     199              :         );
     200              : 
     201            0 :         let layers = self.layers.read().await;
     202            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     203            0 :             let layer = layers.get_from_desc(&layer_desc);
     204            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     205              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     206            0 :                 continue;
     207            0 :             }
     208            0 : 
     209            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     210            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     211            0 :             let layer_local_page_count = sharded_range.page_count();
     212            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     213            0 :             if layer_local_page_count == 0 {
     214              :                 // This ancestral layer only covers keys that belong to other shards.
     215              :                 // We include the full metadata in the log: if we had some critical bug that caused
     216              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     217            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     218            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     219              :                 );
     220              : 
     221            0 :                 if cfg!(debug_assertions) {
     222              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     223              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     224              :                     // should be !is_key_disposable()
     225            0 :                     let range = layer_desc.get_key_range();
     226            0 :                     let mut key = range.start;
     227            0 :                     while key < range.end {
     228            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     229            0 :                         key = key.next();
     230              :                     }
     231            0 :                 }
     232              : 
     233            0 :                 drop_layers.push(layer);
     234            0 :                 continue;
     235            0 :             } else if layer_local_page_count != u32::MAX
     236            0 :                 && layer_local_page_count == layer_raw_page_count
     237              :             {
     238            0 :                 debug!(%layer,
     239            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     240              :                     layer_local_page_count
     241              :                 );
     242            0 :                 continue;
     243            0 :             }
     244            0 : 
     245            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     246            0 :             if layer_local_page_count != u32::MAX
     247            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     248              :             {
     249            0 :                 debug!(%layer,
     250            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     251              :                     layer_local_page_count,
     252              :                     layer_raw_page_count
     253              :                 );
     254            0 :             }
     255              : 
     256              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     257              :             // without incurring the I/O cost of a rewrite.
     258            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     259            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     260            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     261            0 :                 continue;
     262            0 :             }
     263            0 : 
     264            0 :             if layer_desc.is_delta() {
     265              :                 // We do not yet implement rewrite of delta layers
     266            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     267            0 :                 continue;
     268            0 :             }
     269            0 : 
     270            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     271            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     272            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     273            0 :             if layer.metadata().generation == self.generation {
     274            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     275            0 :                 continue;
     276            0 :             }
     277            0 : 
     278            0 :             if layers_to_rewrite.len() >= rewrite_max {
     279            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     280            0 :                     layers_to_rewrite.len()
     281              :                 );
     282            0 :                 continue;
     283            0 :             }
     284            0 : 
     285            0 :             // Fall through: all our conditions for doing a rewrite passed.
     286            0 :             layers_to_rewrite.push(layer);
     287              :         }
     288              : 
     289              :         // Drop read lock on layer map before we start doing time-consuming I/O
     290            0 :         drop(layers);
     291            0 : 
     292            0 :         let mut replace_image_layers = Vec::new();
     293              : 
     294            0 :         for layer in layers_to_rewrite {
     295            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     296            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     297            0 :                 self.conf,
     298            0 :                 self.timeline_id,
     299            0 :                 self.tenant_shard_id,
     300            0 :                 &layer.layer_desc().key_range,
     301            0 :                 layer.layer_desc().image_layer_lsn(),
     302            0 :                 ctx,
     303            0 :             )
     304            0 :             .await?;
     305              : 
     306              :             // Safety of layer rewrites:
     307              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     308              :             //   cannot interfere with the new one.
     309              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     310              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     311              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     312              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     313              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     314              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     315              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     316              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     317              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     318              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     319            0 :             let resident = layer.download_and_keep_resident().await?;
     320              : 
     321            0 :             let keys_written = resident
     322            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     323            0 :                 .await?;
     324              : 
     325            0 :             if keys_written > 0 {
     326            0 :                 let new_layer = image_layer_writer.finish(self, ctx).await?;
     327            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     328            0 :                     layer.metadata().file_size,
     329            0 :                     new_layer.metadata().file_size);
     330              : 
     331            0 :                 replace_image_layers.push((layer, new_layer));
     332            0 :             } else {
     333            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     334            0 :                 // the layer has no data for us with the ShardedRange check above, but
     335            0 :                 drop_layers.push(layer);
     336            0 :             }
     337              :         }
     338              : 
     339              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     340              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     341              :         // to remote index) and be removed. This is inefficient but safe.
     342              :         fail::fail_point!("compact-shard-ancestors-localonly");
     343              : 
     344              :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     345            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     346            0 :             .await?;
     347              : 
     348              :         fail::fail_point!("compact-shard-ancestors-enqueued");
     349              : 
     350              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     351              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     352              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     353              :         // load.
     354            0 :         self.remote_client.wait_completion().await?;
     355              : 
     356              :         fail::fail_point!("compact-shard-ancestors-persistent");
     357              : 
     358            0 :         Ok(())
     359            0 :     }
     360              : 
     361              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     362              :     /// as Level 1 files.
     363          364 :     async fn compact_level0(
     364          364 :         self: &Arc<Self>,
     365          364 :         target_file_size: u64,
     366          364 :         ctx: &RequestContext,
     367          364 :     ) -> Result<(), CompactionError> {
     368              :         let CompactLevel0Phase1Result {
     369          364 :             new_layers,
     370          364 :             deltas_to_compact,
     371              :         } = {
     372          364 :             let phase1_span = info_span!("compact_level0_phase1");
     373          364 :             let ctx = ctx.attached_child();
     374          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     375          364 :                 version: Some(2),
     376          364 :                 tenant_id: Some(self.tenant_shard_id),
     377          364 :                 timeline_id: Some(self.timeline_id),
     378          364 :                 ..Default::default()
     379          364 :             };
     380          364 : 
     381          364 :             let begin = tokio::time::Instant::now();
     382          364 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     383          364 :             let now = tokio::time::Instant::now();
     384          364 :             stats.read_lock_acquisition_micros =
     385          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     386          364 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     387          364 :                 .instrument(phase1_span)
     388        42655 :                 .await?
     389              :         };
     390              : 
     391          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     392              :             // nothing to do
     393          336 :             return Ok(());
     394           28 :         }
     395           28 : 
     396           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     397            0 :             .await?;
     398           28 :         Ok(())
     399          364 :     }
     400              : 
     401              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     402          364 :     async fn compact_level0_phase1(
     403          364 :         self: &Arc<Self>,
     404          364 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     405          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     406          364 :         target_file_size: u64,
     407          364 :         ctx: &RequestContext,
     408          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     409          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     410          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     411          364 :         let layers = guard.layer_map();
     412          364 :         let level0_deltas = layers.get_level0_deltas()?;
     413          364 :         let mut level0_deltas = level0_deltas
     414          364 :             .into_iter()
     415         1616 :             .map(|x| guard.get_from_desc(&x))
     416          364 :             .collect_vec();
     417          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     418          364 :         // Only compact if enough layers have accumulated.
     419          364 :         let threshold = self.get_compaction_threshold();
     420          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     421          336 :             debug!(
     422            0 :                 level0_deltas = level0_deltas.len(),
     423            0 :                 threshold, "too few deltas to compact"
     424              :             );
     425          336 :             return Ok(CompactLevel0Phase1Result::default());
     426           28 :         }
     427           28 : 
     428           28 :         // Gather the files to compact in this iteration.
     429           28 :         //
     430           28 :         // Start with the oldest Level 0 delta file, and collect any other
     431           28 :         // level 0 files that form a contiguous sequence, such that the end
     432           28 :         // LSN of previous file matches the start LSN of the next file.
     433           28 :         //
     434           28 :         // Note that if the files don't form such a sequence, we might
     435           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     436           28 :         // us to get rid of the level 0 file, and compact the other files on
     437           28 :         // the next iteration. This could probably made smarter, but such
     438           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     439           28 :         // of a crash, partial download from cloud storage, or something like
     440           28 :         // that, so it's not a big deal in practice.
     441          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     442           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     443           28 : 
     444           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     445           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     446           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     447           28 : 
     448           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     449          402 :         for l in level0_deltas_iter {
     450          374 :             let lsn_range = &l.layer_desc().lsn_range;
     451          374 : 
     452          374 :             if lsn_range.start != prev_lsn_end {
     453            0 :                 break;
     454          374 :             }
     455          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     456          374 :             prev_lsn_end = lsn_range.end;
     457              :         }
     458           28 :         let lsn_range = Range {
     459           28 :             start: deltas_to_compact
     460           28 :                 .first()
     461           28 :                 .unwrap()
     462           28 :                 .layer_desc()
     463           28 :                 .lsn_range
     464           28 :                 .start,
     465           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     466           28 :         };
     467           28 : 
     468           28 :         info!(
     469            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     470            0 :             lsn_range.start,
     471            0 :             lsn_range.end,
     472            0 :             deltas_to_compact.len(),
     473            0 :             level0_deltas.len()
     474              :         );
     475              : 
     476          402 :         for l in deltas_to_compact.iter() {
     477          402 :             info!("compact includes {l}");
     478              :         }
     479              : 
     480              :         // We don't need the original list of layers anymore. Drop it so that
     481              :         // we don't accidentally use it later in the function.
     482           28 :         drop(level0_deltas);
     483           28 : 
     484           28 :         stats.read_lock_held_prerequisites_micros = stats
     485           28 :             .read_lock_held_spawn_blocking_startup_micros
     486           28 :             .till_now();
     487           28 : 
     488           28 :         // Determine N largest holes where N is number of compacted layers.
     489           28 :         let max_holes = deltas_to_compact.len();
     490           28 :         let last_record_lsn = self.get_last_record_lsn();
     491           28 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     492           28 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     493           28 : 
     494           28 :         // min-heap (reserve space for one more element added before eviction)
     495           28 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     496           28 :         let mut prev: Option<Key> = None;
     497           28 : 
     498           28 :         let mut all_keys = Vec::new();
     499              : 
     500          402 :         for l in deltas_to_compact.iter() {
     501         2413 :             all_keys.extend(l.load_keys(ctx).await?);
     502              :         }
     503              : 
     504              :         // FIXME: should spawn_blocking the rest of this function
     505              : 
     506              :         // The current stdlib sorting implementation is designed in a way where it is
     507              :         // particularly fast where the slice is made up of sorted sub-ranges.
     508      4431770 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     509           28 : 
     510           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     511              : 
     512      2064038 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     513      2064038 :             if let Some(prev_key) = prev {
     514              :                 // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     515              :                 // compaction is the gap between data key and metadata keys.
     516      2064010 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     517            0 :                     && !Key::is_metadata_key(&prev_key)
     518              :                 {
     519            0 :                     let key_range = prev_key..next_key;
     520            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     521            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     522            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     523            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     524            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     525            0 :                     if coverage_size >= min_hole_coverage_size {
     526            0 :                         heap.push(Hole {
     527            0 :                             key_range,
     528            0 :                             coverage_size,
     529            0 :                         });
     530            0 :                         if heap.len() > max_holes {
     531            0 :                             heap.pop(); // remove smallest hole
     532            0 :                         }
     533            0 :                     }
     534      2064010 :                 }
     535           28 :             }
     536      2064038 :             prev = Some(next_key.next());
     537              :         }
     538           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     539           28 :         drop_rlock(guard);
     540           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     541           28 :         let mut holes = heap.into_vec();
     542           28 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     543           28 :         let mut next_hole = 0; // index of next hole in holes vector
     544           28 : 
     545           28 :         // This iterator walks through all key-value pairs from all the layers
     546           28 :         // we're compacting, in key, LSN order.
     547           28 :         let all_values_iter = all_keys.iter();
     548           28 : 
     549           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
     550           28 :         let mut all_keys_iter = all_keys
     551           28 :             .iter()
     552      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     553      2064010 :             .coalesce(|mut prev, cur| {
     554      2064010 :                 // Coalesce keys that belong to the same key pair.
     555      2064010 :                 // This ensures that compaction doesn't put them
     556      2064010 :                 // into different layer files.
     557      2064010 :                 // Still limit this by the target file size,
     558      2064010 :                 // so that we keep the size of the files in
     559      2064010 :                 // check.
     560      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     561        40038 :                     prev.2 += cur.2;
     562        40038 :                     Ok(prev)
     563              :                 } else {
     564      2023972 :                     Err((prev, cur))
     565              :                 }
     566      2064010 :             });
     567           28 : 
     568           28 :         // Merge the contents of all the input delta layers into a new set
     569           28 :         // of delta layers, based on the current partitioning.
     570           28 :         //
     571           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     572           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     573           28 :         // would be too large. In that case, we also split on the LSN dimension.
     574           28 :         //
     575           28 :         // LSN
     576           28 :         //  ^
     577           28 :         //  |
     578           28 :         //  | +-----------+            +--+--+--+--+
     579           28 :         //  | |           |            |  |  |  |  |
     580           28 :         //  | +-----------+            |  |  |  |  |
     581           28 :         //  | |           |            |  |  |  |  |
     582           28 :         //  | +-----------+     ==>    |  |  |  |  |
     583           28 :         //  | |           |            |  |  |  |  |
     584           28 :         //  | +-----------+            |  |  |  |  |
     585           28 :         //  | |           |            |  |  |  |  |
     586           28 :         //  | +-----------+            +--+--+--+--+
     587           28 :         //  |
     588           28 :         //  +--------------> key
     589           28 :         //
     590           28 :         //
     591           28 :         // If one key (X) has a lot of page versions:
     592           28 :         //
     593           28 :         // LSN
     594           28 :         //  ^
     595           28 :         //  |                                 (X)
     596           28 :         //  | +-----------+            +--+--+--+--+
     597           28 :         //  | |           |            |  |  |  |  |
     598           28 :         //  | +-----------+            |  |  +--+  |
     599           28 :         //  | |           |            |  |  |  |  |
     600           28 :         //  | +-----------+     ==>    |  |  |  |  |
     601           28 :         //  | |           |            |  |  +--+  |
     602           28 :         //  | +-----------+            |  |  |  |  |
     603           28 :         //  | |           |            |  |  |  |  |
     604           28 :         //  | +-----------+            +--+--+--+--+
     605           28 :         //  |
     606           28 :         //  +--------------> key
     607           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
     608           28 :         // based on the partitioning.
     609           28 :         //
     610           28 :         // TODO: we should also opportunistically materialize and
     611           28 :         // garbage collect what we can.
     612           28 :         let mut new_layers = Vec::new();
     613           28 :         let mut prev_key: Option<Key> = None;
     614           28 :         let mut writer: Option<DeltaLayerWriter> = None;
     615           28 :         let mut key_values_total_size = 0u64;
     616           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     617           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     618              : 
     619              :         for &DeltaEntry {
     620      2064038 :             key, lsn, ref val, ..
     621      2064066 :         } in all_values_iter
     622              :         {
     623      2064038 :             let value = val.load(ctx).await?;
     624      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     625      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     626      2064038 :             if !same_key || lsn == dup_end_lsn {
     627      2024000 :                 let mut next_key_size = 0u64;
     628      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
     629      2024000 :                 dup_start_lsn = Lsn::INVALID;
     630      2024000 :                 if !same_key {
     631      2024000 :                     dup_end_lsn = Lsn::INVALID;
     632      2024000 :                 }
     633              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     634      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     635      2024000 :                     next_key_size = next_size;
     636      2024000 :                     if key != next_key {
     637      2023972 :                         if dup_end_lsn.is_valid() {
     638            0 :                             // We are writting segment with duplicates:
     639            0 :                             // place all remaining values of this key in separate segment
     640            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     641            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     642      2023972 :                         }
     643      2023972 :                         break;
     644           28 :                     }
     645           28 :                     key_values_total_size += next_size;
     646           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     647           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     648           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     649              :                         // Split key between multiple layers: such layer can contain only single key
     650            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     651            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     652              :                         } else {
     653            0 :                             lsn // start with the first LSN for this key
     654              :                         };
     655            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     656            0 :                         break;
     657           28 :                     }
     658              :                 }
     659              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     660      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     661            0 :                     dup_start_lsn = dup_end_lsn;
     662            0 :                     dup_end_lsn = lsn_range.end;
     663      2024000 :                 }
     664      2024000 :                 if writer.is_some() {
     665      2023972 :                     let written_size = writer.as_mut().unwrap().size();
     666      2023972 :                     let contains_hole =
     667      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     668              :                     // check if key cause layer overflow or contains hole...
     669      2023972 :                     if is_dup_layer
     670      2023972 :                         || dup_end_lsn.is_valid()
     671      2023972 :                         || written_size + key_values_total_size > target_file_size
     672      2023692 :                         || contains_hole
     673              :                     {
     674              :                         // ... if so, flush previous layer and prepare to write new one
     675          280 :                         new_layers.push(
     676          280 :                             writer
     677          280 :                                 .take()
     678          280 :                                 .unwrap()
     679          280 :                                 .finish(prev_key.unwrap().next(), self, ctx)
     680          720 :                                 .await?,
     681              :                         );
     682          280 :                         writer = None;
     683          280 : 
     684          280 :                         if contains_hole {
     685            0 :                             // skip hole
     686            0 :                             next_hole += 1;
     687          280 :                         }
     688      2023692 :                     }
     689           28 :                 }
     690              :                 // Remember size of key value because at next iteration we will access next item
     691      2024000 :                 key_values_total_size = next_key_size;
     692        40038 :             }
     693      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     694            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     695            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     696            0 :                 )))
     697      2064038 :             });
     698              : 
     699      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
     700      2064038 :                 if writer.is_none() {
     701          308 :                     // Create writer if not initiaized yet
     702          308 :                     writer = Some(
     703              :                         DeltaLayerWriter::new(
     704          308 :                             self.conf,
     705          308 :                             self.timeline_id,
     706          308 :                             self.tenant_shard_id,
     707          308 :                             key,
     708          308 :                             if dup_end_lsn.is_valid() {
     709              :                                 // this is a layer containing slice of values of the same key
     710            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     711            0 :                                 dup_start_lsn..dup_end_lsn
     712              :                             } else {
     713          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     714          308 :                                 lsn_range.clone()
     715              :                             },
     716          308 :                             ctx,
     717              :                         )
     718          154 :                         .await?,
     719              :                     );
     720      2063730 :                 }
     721              : 
     722      2064038 :                 writer
     723      2064038 :                     .as_mut()
     724      2064038 :                     .unwrap()
     725      2064038 :                     .put_value(key, lsn, value, ctx)
     726         1566 :                     .await?;
     727              :             } else {
     728            0 :                 debug!(
     729            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     730            0 :                     key,
     731            0 :                     self.shard_identity.get_shard_number(&key)
     732              :                 );
     733              :             }
     734              : 
     735      2064038 :             if !new_layers.is_empty() {
     736        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     737      2044252 :             }
     738              : 
     739      2064038 :             prev_key = Some(key);
     740              :         }
     741           28 :         if let Some(writer) = writer {
     742         1991 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
     743            0 :         }
     744              : 
     745              :         // Sync layers
     746           28 :         if !new_layers.is_empty() {
     747              :             // Print a warning if the created layer is larger than double the target size
     748              :             // Add two pages for potential overhead. This should in theory be already
     749              :             // accounted for in the target calculation, but for very small targets,
     750              :             // we still might easily hit the limit otherwise.
     751           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     752          308 :             for layer in new_layers.iter() {
     753          308 :                 if layer.layer_desc().file_size > warn_limit {
     754            0 :                     warn!(
     755              :                         %layer,
     756            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     757              :                     );
     758          308 :                 }
     759              :             }
     760              : 
     761              :             // The writer.finish() above already did the fsync of the inodes.
     762              :             // We just need to fsync the directory in which these inodes are linked,
     763              :             // which we know to be the timeline directory.
     764              :             //
     765              :             // We use fatal_err() below because the after writer.finish() returns with success,
     766              :             // the in-memory state of the filesystem already has the layer file in its final place,
     767              :             // and subsequent pageserver code could think it's durable while it really isn't.
     768           28 :             let timeline_dir = VirtualFile::open(
     769           28 :                 &self
     770           28 :                     .conf
     771           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     772           28 :                 ctx,
     773           28 :             )
     774           14 :             .await
     775           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     776           28 :             timeline_dir
     777           28 :                 .sync_all()
     778           14 :                 .await
     779           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     780            0 :         }
     781              : 
     782           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     783           28 :         stats.new_deltas_count = Some(new_layers.len());
     784          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     785           28 : 
     786           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     787           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     788              :         {
     789           28 :             Ok(stats_json) => {
     790           28 :                 info!(
     791            0 :                     stats_json = stats_json.as_str(),
     792            0 :                     "compact_level0_phase1 stats available"
     793              :                 )
     794              :             }
     795            0 :             Err(e) => {
     796            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     797              :             }
     798              :         }
     799              : 
     800           28 :         Ok(CompactLevel0Phase1Result {
     801           28 :             new_layers,
     802           28 :             deltas_to_compact: deltas_to_compact
     803           28 :                 .into_iter()
     804          402 :                 .map(|x| x.drop_eviction_guard())
     805           28 :                 .collect::<Vec<_>>(),
     806           28 :         })
     807          364 :     }
     808              : }
     809              : 
     810              : #[derive(Default)]
     811              : struct CompactLevel0Phase1Result {
     812              :     new_layers: Vec<ResidentLayer>,
     813              :     deltas_to_compact: Vec<Layer>,
     814              : }
     815              : 
     816              : #[derive(Default)]
     817              : struct CompactLevel0Phase1StatsBuilder {
     818              :     version: Option<u64>,
     819              :     tenant_id: Option<TenantShardId>,
     820              :     timeline_id: Option<TimelineId>,
     821              :     read_lock_acquisition_micros: DurationRecorder,
     822              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     823              :     read_lock_held_key_sort_micros: DurationRecorder,
     824              :     read_lock_held_prerequisites_micros: DurationRecorder,
     825              :     read_lock_held_compute_holes_micros: DurationRecorder,
     826              :     read_lock_drop_micros: DurationRecorder,
     827              :     write_layer_files_micros: DurationRecorder,
     828              :     level0_deltas_count: Option<usize>,
     829              :     new_deltas_count: Option<usize>,
     830              :     new_deltas_size: Option<u64>,
     831              : }
     832              : 
     833              : #[derive(serde::Serialize)]
     834              : struct CompactLevel0Phase1Stats {
     835              :     version: u64,
     836              :     tenant_id: TenantShardId,
     837              :     timeline_id: TimelineId,
     838              :     read_lock_acquisition_micros: RecordedDuration,
     839              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     840              :     read_lock_held_key_sort_micros: RecordedDuration,
     841              :     read_lock_held_prerequisites_micros: RecordedDuration,
     842              :     read_lock_held_compute_holes_micros: RecordedDuration,
     843              :     read_lock_drop_micros: RecordedDuration,
     844              :     write_layer_files_micros: RecordedDuration,
     845              :     level0_deltas_count: usize,
     846              :     new_deltas_count: usize,
     847              :     new_deltas_size: u64,
     848              : }
     849              : 
     850              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     851              :     type Error = anyhow::Error;
     852              : 
     853           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     854           28 :         Ok(Self {
     855           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     856           28 :             tenant_id: value
     857           28 :                 .tenant_id
     858           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     859           28 :             timeline_id: value
     860           28 :                 .timeline_id
     861           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     862           28 :             read_lock_acquisition_micros: value
     863           28 :                 .read_lock_acquisition_micros
     864           28 :                 .into_recorded()
     865           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     866           28 :             read_lock_held_spawn_blocking_startup_micros: value
     867           28 :                 .read_lock_held_spawn_blocking_startup_micros
     868           28 :                 .into_recorded()
     869           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     870           28 :             read_lock_held_key_sort_micros: value
     871           28 :                 .read_lock_held_key_sort_micros
     872           28 :                 .into_recorded()
     873           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     874           28 :             read_lock_held_prerequisites_micros: value
     875           28 :                 .read_lock_held_prerequisites_micros
     876           28 :                 .into_recorded()
     877           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     878           28 :             read_lock_held_compute_holes_micros: value
     879           28 :                 .read_lock_held_compute_holes_micros
     880           28 :                 .into_recorded()
     881           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     882           28 :             read_lock_drop_micros: value
     883           28 :                 .read_lock_drop_micros
     884           28 :                 .into_recorded()
     885           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     886           28 :             write_layer_files_micros: value
     887           28 :                 .write_layer_files_micros
     888           28 :                 .into_recorded()
     889           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     890           28 :             level0_deltas_count: value
     891           28 :                 .level0_deltas_count
     892           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     893           28 :             new_deltas_count: value
     894           28 :                 .new_deltas_count
     895           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     896           28 :             new_deltas_size: value
     897           28 :                 .new_deltas_size
     898           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     899              :         })
     900           28 :     }
     901              : }
     902              : 
     903              : impl Timeline {
     904              :     /// Entry point for new tiered compaction algorithm.
     905              :     ///
     906              :     /// All the real work is in the implementation in the pageserver_compaction
     907              :     /// crate. The code here would apply to any algorithm implemented by the
     908              :     /// same interface, but tiered is the only one at the moment.
     909              :     ///
     910              :     /// TODO: cancellation
     911            0 :     pub(crate) async fn compact_tiered(
     912            0 :         self: &Arc<Self>,
     913            0 :         _cancel: &CancellationToken,
     914            0 :         ctx: &RequestContext,
     915            0 :     ) -> Result<(), CompactionError> {
     916            0 :         let fanout = self.get_compaction_threshold() as u64;
     917            0 :         let target_file_size = self.get_checkpoint_distance();
     918              : 
     919              :         // Find the top of the historical layers
     920            0 :         let end_lsn = {
     921            0 :             let guard = self.layers.read().await;
     922            0 :             let layers = guard.layer_map();
     923              : 
     924            0 :             let l0_deltas = layers.get_level0_deltas()?;
     925            0 :             drop(guard);
     926            0 : 
     927            0 :             // As an optimization, if we find that there are too few L0 layers,
     928            0 :             // bail out early. We know that the compaction algorithm would do
     929            0 :             // nothing in that case.
     930            0 :             if l0_deltas.len() < fanout as usize {
     931              :                 // doesn't need compacting
     932            0 :                 return Ok(());
     933            0 :             }
     934            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     935            0 :         };
     936            0 : 
     937            0 :         // Is the timeline being deleted?
     938            0 :         if self.is_stopping() {
     939            0 :             trace!("Dropping out of compaction on timeline shutdown");
     940            0 :             return Err(CompactionError::ShuttingDown);
     941            0 :         }
     942              : 
     943            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
     944              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
     945            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
     946            0 : 
     947            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     948            0 :             &mut adaptor,
     949            0 :             end_lsn,
     950            0 :             target_file_size,
     951            0 :             fanout,
     952            0 :             ctx,
     953            0 :         )
     954            0 :         .await?;
     955              : 
     956            0 :         adaptor.flush_updates().await?;
     957            0 :         Ok(())
     958            0 :     }
     959              : 
     960              :     /// An experimental compaction building block that combines compaction with garbage collection.
     961              :     ///
     962              :     /// The current implementation picks all delta + image layers that are below or intersecting with
     963              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
     964              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
     965              :     /// and create delta layers with all deltas >= gc horizon.
     966            4 :     pub(crate) async fn compact_with_gc(
     967            4 :         self: &Arc<Self>,
     968            4 :         _cancel: &CancellationToken,
     969            4 :         ctx: &RequestContext,
     970            4 :     ) -> Result<(), CompactionError> {
     971            4 :         use crate::tenant::storage_layer::ValueReconstructState;
     972            4 :         use std::collections::BTreeSet;
     973            4 : 
     974            4 :         info!("running enhanced gc bottom-most compaction");
     975              : 
     976              :         scopeguard::defer! {
     977              :             info!("done enhanced gc bottom-most compaction");
     978              :         };
     979              : 
     980              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
     981              :         // The layer selection has the following properties:
     982              :         // 1. If a layer is in the selection, all layers below it are in the selection.
     983              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
     984            4 :         let (layer_selection, gc_cutoff) = {
     985            4 :             let guard = self.layers.read().await;
     986            4 :             let layers = guard.layer_map();
     987            4 :             let gc_info = self.gc_info.read().unwrap();
     988            4 :             if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
     989            0 :                 return Err(CompactionError::Other(anyhow!(
     990            0 :                     "enhanced legacy compaction currently does not support retain_lsns (branches)"
     991            0 :                 )));
     992            4 :             }
     993            4 :             let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
     994            4 :             let mut selected_layers = Vec::new();
     995            4 :             // TODO: consider retain_lsns
     996            4 :             drop(gc_info);
     997           20 :             for desc in layers.iter_historic_layers() {
     998           20 :                 if desc.get_lsn_range().start <= gc_cutoff {
     999           16 :                     selected_layers.push(guard.get_from_desc(&desc));
    1000           16 :                 }
    1001              :             }
    1002            4 :             (selected_layers, gc_cutoff)
    1003            4 :         };
    1004            4 :         info!(
    1005            0 :             "picked {} layers for compaction with gc_cutoff={}",
    1006            0 :             layer_selection.len(),
    1007              :             gc_cutoff
    1008              :         );
    1009              :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
    1010              :         // Also, collect the layer information to decide when to split the new delta layers.
    1011            4 :         let mut all_key_values = Vec::new();
    1012            4 :         let mut delta_split_points = BTreeSet::new();
    1013           20 :         for layer in &layer_selection {
    1014           16 :             all_key_values.extend(layer.load_key_values(ctx).await?);
    1015           16 :             let desc = layer.layer_desc();
    1016           16 :             if desc.is_delta() {
    1017            8 :                 // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
    1018            8 :                 // so that we can avoid having too many small delta layers.
    1019            8 :                 let key_range = desc.get_key_range();
    1020            8 :                 delta_split_points.insert(key_range.start);
    1021            8 :                 delta_split_points.insert(key_range.end);
    1022            8 :             }
    1023              :         }
    1024              :         // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
    1025              :         // image layers, make image appear before than delta.
    1026              :         struct ValueWrapper<'a>(&'a crate::repository::Value);
    1027              :         impl Ord for ValueWrapper<'_> {
    1028            0 :             fn cmp(&self, other: &Self) -> std::cmp::Ordering {
    1029            0 :                 use crate::repository::Value;
    1030            0 :                 use std::cmp::Ordering;
    1031            0 :                 match (self.0, other.0) {
    1032            0 :                     (Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
    1033            0 :                     (Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
    1034            0 :                     _ => Ordering::Equal,
    1035              :                 }
    1036            0 :             }
    1037              :         }
    1038              :         impl PartialOrd for ValueWrapper<'_> {
    1039            0 :             fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
    1040            0 :                 Some(self.cmp(other))
    1041            0 :             }
    1042              :         }
    1043              :         impl PartialEq for ValueWrapper<'_> {
    1044            0 :             fn eq(&self, other: &Self) -> bool {
    1045            0 :                 self.cmp(other) == std::cmp::Ordering::Equal
    1046            0 :             }
    1047              :         }
    1048              :         impl Eq for ValueWrapper<'_> {}
    1049          180 :         all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
    1050          180 :             (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
    1051          180 :         });
    1052            4 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1053            4 :         // Data of the same key.
    1054            4 :         let mut accumulated_values = Vec::new();
    1055            4 :         let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
    1056              : 
    1057              :         /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
    1058           72 :         async fn flush_accumulated_states(
    1059           72 :             tline: &Arc<Timeline>,
    1060           72 :             key: Key,
    1061           72 :             accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
    1062           72 :             horizon: Lsn,
    1063           72 :         ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
    1064           72 :             let mut base_image = None;
    1065           72 :             let mut keys_above_horizon = Vec::new();
    1066           72 :             let mut delta_above_base_image = Vec::new();
    1067              :             // We have a list of deltas/images. We want to create image layers while collect garbages.
    1068           88 :             for (key, lsn, val) in accumulated_values.iter().rev() {
    1069           88 :                 if *lsn > horizon {
    1070            4 :                     if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
    1071            0 :                         if *prev_lsn == *lsn {
    1072              :                             // The case that we have an LSN with both data from the delta layer and the image layer. As
    1073              :                             // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1074              :                             // drop this delta and keep the image.
    1075              :                             //
    1076              :                             // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1077              :                             // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1078              :                             // dropped.
    1079            0 :                             continue;
    1080            0 :                         }
    1081            4 :                     }
    1082            4 :                     keys_above_horizon.push((*key, *lsn, val.clone()));
    1083           84 :                 } else if *lsn <= horizon {
    1084           84 :                     match val {
    1085           72 :                         crate::repository::Value::Image(image) => {
    1086           72 :                             base_image = Some((*lsn, image.clone()));
    1087           72 :                             break;
    1088              :                         }
    1089           12 :                         crate::repository::Value::WalRecord(wal) => {
    1090           12 :                             delta_above_base_image.push((*lsn, wal.clone()));
    1091           12 :                         }
    1092              :                     }
    1093            0 :                 }
    1094              :             }
    1095              :             // do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
    1096           72 :             keys_above_horizon.reverse();
    1097           72 :             let state = ValueReconstructState {
    1098           72 :                 img: base_image,
    1099           72 :                 records: delta_above_base_image,
    1100           72 :             };
    1101           72 :             let img = tline.reconstruct_value(key, horizon, state).await?;
    1102           72 :             Ok((keys_above_horizon, img))
    1103           72 :         }
    1104              : 
    1105           72 :         async fn flush_deltas(
    1106           72 :             deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
    1107           72 :             last_key: Key,
    1108           72 :             delta_split_points: &[Key],
    1109           72 :             current_delta_split_point: &mut usize,
    1110           72 :             tline: &Arc<Timeline>,
    1111           72 :             gc_cutoff: Lsn,
    1112           72 :             ctx: &RequestContext,
    1113           72 :         ) -> anyhow::Result<Option<ResidentLayer>> {
    1114           72 :             // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
    1115           72 :             // overlapping layers.
    1116           72 :             //
    1117           72 :             // If we have a structure like this:
    1118           72 :             //
    1119           72 :             // | Delta 1 |         | Delta 4 |
    1120           72 :             // |---------| Delta 2 |---------|
    1121           72 :             // | Delta 3 |         | Delta 5 |
    1122           72 :             //
    1123           72 :             // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
    1124           72 :             // A simple solution here is to split the delta layers using the original boundary, while this
    1125           72 :             // might produce a lot of small layers. This should be improved and fixed in the future.
    1126           72 :             let mut need_split = false;
    1127           88 :             while *current_delta_split_point < delta_split_points.len()
    1128           76 :                 && last_key >= delta_split_points[*current_delta_split_point]
    1129           16 :             {
    1130           16 :                 *current_delta_split_point += 1;
    1131           16 :                 need_split = true;
    1132           16 :             }
    1133           72 :             if !need_split {
    1134           56 :                 return Ok(None);
    1135           16 :             }
    1136           16 :             let deltas = std::mem::take(deltas);
    1137           16 :             if deltas.is_empty() {
    1138           12 :                 return Ok(None);
    1139            4 :             }
    1140            4 :             let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
    1141            4 :             let mut delta_layer_writer = DeltaLayerWriter::new(
    1142            4 :                 tline.conf,
    1143            4 :                 tline.timeline_id,
    1144            4 :                 tline.tenant_shard_id,
    1145            4 :                 deltas.first().unwrap().0,
    1146            4 :                 gc_cutoff..end_lsn,
    1147            4 :                 ctx,
    1148            4 :             )
    1149            2 :             .await?;
    1150            4 :             let key_end = deltas.last().unwrap().0.next();
    1151            8 :             for (key, lsn, val) in deltas {
    1152            4 :                 delta_layer_writer.put_value(key, lsn, val, ctx).await?;
    1153              :             }
    1154           10 :             let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
    1155            4 :             Ok(Some(delta_layer))
    1156           72 :         }
    1157              : 
    1158            4 :         let mut image_layer_writer = ImageLayerWriter::new(
    1159            4 :             self.conf,
    1160            4 :             self.timeline_id,
    1161            4 :             self.tenant_shard_id,
    1162            4 :             &(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
    1163            4 :             gc_cutoff,
    1164            4 :             ctx,
    1165            4 :         )
    1166            2 :         .await?;
    1167              : 
    1168            4 :         let mut delta_values = Vec::new();
    1169            4 :         let delta_split_points = delta_split_points.into_iter().collect_vec();
    1170            4 :         let mut current_delta_split_point = 0;
    1171            4 :         let mut delta_layers = Vec::new();
    1172          100 :         for item @ (key, _, _) in &all_key_values {
    1173           96 :             if &last_key == key {
    1174           28 :                 accumulated_values.push(item);
    1175           28 :             } else {
    1176           68 :                 let (deltas, image) =
    1177           68 :                     flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
    1178            0 :                         .await?;
    1179              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1180           70 :                 image_layer_writer.put_image(last_key, image, ctx).await?;
    1181           68 :                 delta_values.extend(deltas);
    1182           68 :                 delta_layers.extend(
    1183           68 :                     flush_deltas(
    1184           68 :                         &mut delta_values,
    1185           68 :                         last_key,
    1186           68 :                         &delta_split_points,
    1187           68 :                         &mut current_delta_split_point,
    1188           68 :                         self,
    1189           68 :                         gc_cutoff,
    1190           68 :                         ctx,
    1191           68 :                     )
    1192           12 :                     .await?,
    1193              :                 );
    1194           68 :                 accumulated_values.clear();
    1195           68 :                 accumulated_values.push(item);
    1196           68 :                 last_key = *key;
    1197              :             }
    1198              :         }
    1199              : 
    1200              :         // TODO: move this part to the loop body
    1201            4 :         let (deltas, image) =
    1202            4 :             flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
    1203              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1204            4 :         image_layer_writer.put_image(last_key, image, ctx).await?;
    1205            4 :         delta_values.extend(deltas);
    1206            4 :         delta_layers.extend(
    1207            4 :             flush_deltas(
    1208            4 :                 &mut delta_values,
    1209            4 :                 last_key,
    1210            4 :                 &delta_split_points,
    1211            4 :                 &mut current_delta_split_point,
    1212            4 :                 self,
    1213            4 :                 gc_cutoff,
    1214            4 :                 ctx,
    1215            4 :             )
    1216            0 :             .await?,
    1217              :         );
    1218              : 
    1219            8 :         let image_layer = image_layer_writer.finish(self, ctx).await?;
    1220            4 :         info!(
    1221            0 :             "produced {} delta layers and {} image layers",
    1222            0 :             delta_layers.len(),
    1223              :             1
    1224              :         );
    1225            4 :         let mut compact_to = Vec::new();
    1226            4 :         compact_to.extend(delta_layers);
    1227            4 :         compact_to.push(image_layer);
    1228              :         // Step 3: Place back to the layer map.
    1229              :         {
    1230            4 :             let mut guard = self.layers.write().await;
    1231            4 :             guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    1232            4 :         };
    1233            4 : 
    1234            4 :         self.remote_client
    1235            4 :             .schedule_compaction_update(&layer_selection, &compact_to)?;
    1236            4 :         Ok(())
    1237            4 :     }
    1238              : }
    1239              : 
    1240              : struct TimelineAdaptor {
    1241              :     timeline: Arc<Timeline>,
    1242              : 
    1243              :     keyspace: (Lsn, KeySpace),
    1244              : 
    1245              :     new_deltas: Vec<ResidentLayer>,
    1246              :     new_images: Vec<ResidentLayer>,
    1247              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    1248              : }
    1249              : 
    1250              : impl TimelineAdaptor {
    1251            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    1252            0 :         Self {
    1253            0 :             timeline: timeline.clone(),
    1254            0 :             keyspace,
    1255            0 :             new_images: Vec::new(),
    1256            0 :             new_deltas: Vec::new(),
    1257            0 :             layers_to_delete: Vec::new(),
    1258            0 :         }
    1259            0 :     }
    1260              : 
    1261            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
    1262            0 :         let layers_to_delete = {
    1263            0 :             let guard = self.timeline.layers.read().await;
    1264            0 :             self.layers_to_delete
    1265            0 :                 .iter()
    1266            0 :                 .map(|x| guard.get_from_desc(x))
    1267            0 :                 .collect::<Vec<Layer>>()
    1268            0 :         };
    1269            0 :         self.timeline
    1270            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    1271            0 :             .await?;
    1272              : 
    1273            0 :         self.timeline
    1274            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    1275              : 
    1276            0 :         self.new_deltas.clear();
    1277            0 :         self.layers_to_delete.clear();
    1278            0 :         Ok(())
    1279            0 :     }
    1280              : }
    1281              : 
    1282              : #[derive(Clone)]
    1283              : struct ResidentDeltaLayer(ResidentLayer);
    1284              : #[derive(Clone)]
    1285              : struct ResidentImageLayer(ResidentLayer);
    1286              : 
    1287              : impl CompactionJobExecutor for TimelineAdaptor {
    1288              :     type Key = crate::repository::Key;
    1289              : 
    1290              :     type Layer = OwnArc<PersistentLayerDesc>;
    1291              :     type DeltaLayer = ResidentDeltaLayer;
    1292              :     type ImageLayer = ResidentImageLayer;
    1293              : 
    1294              :     type RequestContext = crate::context::RequestContext;
    1295              : 
    1296            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    1297            0 :         self.timeline.get_shard_identity()
    1298            0 :     }
    1299              : 
    1300            0 :     async fn get_layers(
    1301            0 :         &mut self,
    1302            0 :         key_range: &Range<Key>,
    1303            0 :         lsn_range: &Range<Lsn>,
    1304            0 :         _ctx: &RequestContext,
    1305            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    1306            0 :         self.flush_updates().await?;
    1307              : 
    1308            0 :         let guard = self.timeline.layers.read().await;
    1309            0 :         let layer_map = guard.layer_map();
    1310            0 : 
    1311            0 :         let result = layer_map
    1312            0 :             .iter_historic_layers()
    1313            0 :             .filter(|l| {
    1314            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    1315            0 :             })
    1316            0 :             .map(OwnArc)
    1317            0 :             .collect();
    1318            0 :         Ok(result)
    1319            0 :     }
    1320              : 
    1321            0 :     async fn get_keyspace(
    1322            0 :         &mut self,
    1323            0 :         key_range: &Range<Key>,
    1324            0 :         lsn: Lsn,
    1325            0 :         _ctx: &RequestContext,
    1326            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    1327            0 :         if lsn == self.keyspace.0 {
    1328            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    1329            0 :                 &self.keyspace.1.ranges,
    1330            0 :                 key_range,
    1331            0 :             ))
    1332              :         } else {
    1333              :             // The current compaction implementatin only ever requests the key space
    1334              :             // at the compaction end LSN.
    1335            0 :             anyhow::bail!("keyspace not available for requested lsn");
    1336              :         }
    1337            0 :     }
    1338              : 
    1339            0 :     async fn downcast_delta_layer(
    1340            0 :         &self,
    1341            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1342            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    1343            0 :         // this is a lot more complex than a simple downcast...
    1344            0 :         if layer.is_delta() {
    1345            0 :             let l = {
    1346            0 :                 let guard = self.timeline.layers.read().await;
    1347            0 :                 guard.get_from_desc(layer)
    1348              :             };
    1349            0 :             let result = l.download_and_keep_resident().await?;
    1350              : 
    1351            0 :             Ok(Some(ResidentDeltaLayer(result)))
    1352              :         } else {
    1353            0 :             Ok(None)
    1354              :         }
    1355            0 :     }
    1356              : 
    1357            0 :     async fn create_image(
    1358            0 :         &mut self,
    1359            0 :         lsn: Lsn,
    1360            0 :         key_range: &Range<Key>,
    1361            0 :         ctx: &RequestContext,
    1362            0 :     ) -> anyhow::Result<()> {
    1363            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    1364            0 :     }
    1365              : 
    1366            0 :     async fn create_delta(
    1367            0 :         &mut self,
    1368            0 :         lsn_range: &Range<Lsn>,
    1369            0 :         key_range: &Range<Key>,
    1370            0 :         input_layers: &[ResidentDeltaLayer],
    1371            0 :         ctx: &RequestContext,
    1372            0 :     ) -> anyhow::Result<()> {
    1373            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1374              : 
    1375            0 :         let mut all_entries = Vec::new();
    1376            0 :         for dl in input_layers.iter() {
    1377            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    1378              :         }
    1379              : 
    1380              :         // The current stdlib sorting implementation is designed in a way where it is
    1381              :         // particularly fast where the slice is made up of sorted sub-ranges.
    1382            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1383              : 
    1384            0 :         let mut writer = DeltaLayerWriter::new(
    1385            0 :             self.timeline.conf,
    1386            0 :             self.timeline.timeline_id,
    1387            0 :             self.timeline.tenant_shard_id,
    1388            0 :             key_range.start,
    1389            0 :             lsn_range.clone(),
    1390            0 :             ctx,
    1391            0 :         )
    1392            0 :         .await?;
    1393              : 
    1394            0 :         let mut dup_values = 0;
    1395            0 : 
    1396            0 :         // This iterator walks through all key-value pairs from all the layers
    1397            0 :         // we're compacting, in key, LSN order.
    1398            0 :         let mut prev: Option<(Key, Lsn)> = None;
    1399              :         for &DeltaEntry {
    1400            0 :             key, lsn, ref val, ..
    1401            0 :         } in all_entries.iter()
    1402              :         {
    1403            0 :             if prev == Some((key, lsn)) {
    1404              :                 // This is a duplicate. Skip it.
    1405              :                 //
    1406              :                 // It can happen if compaction is interrupted after writing some
    1407              :                 // layers but not all, and we are compacting the range again.
    1408              :                 // The calculations in the algorithm assume that there are no
    1409              :                 // duplicates, so the math on targeted file size is likely off,
    1410              :                 // and we will create smaller files than expected.
    1411            0 :                 dup_values += 1;
    1412            0 :                 continue;
    1413            0 :             }
    1414              : 
    1415            0 :             let value = val.load(ctx).await?;
    1416              : 
    1417            0 :             writer.put_value(key, lsn, value, ctx).await?;
    1418              : 
    1419            0 :             prev = Some((key, lsn));
    1420              :         }
    1421              : 
    1422            0 :         if dup_values > 0 {
    1423            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    1424            0 :         }
    1425              : 
    1426            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1427            0 :             Err(anyhow::anyhow!(
    1428            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    1429            0 :             ))
    1430            0 :         });
    1431              : 
    1432            0 :         let new_delta_layer = writer
    1433            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    1434            0 :             .await?;
    1435              : 
    1436            0 :         self.new_deltas.push(new_delta_layer);
    1437            0 :         Ok(())
    1438            0 :     }
    1439              : 
    1440            0 :     async fn delete_layer(
    1441            0 :         &mut self,
    1442            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1443            0 :         _ctx: &RequestContext,
    1444            0 :     ) -> anyhow::Result<()> {
    1445            0 :         self.layers_to_delete.push(layer.clone().0);
    1446            0 :         Ok(())
    1447            0 :     }
    1448              : }
    1449              : 
    1450              : impl TimelineAdaptor {
    1451            0 :     async fn create_image_impl(
    1452            0 :         &mut self,
    1453            0 :         lsn: Lsn,
    1454            0 :         key_range: &Range<Key>,
    1455            0 :         ctx: &RequestContext,
    1456            0 :     ) -> Result<(), CreateImageLayersError> {
    1457            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    1458              : 
    1459            0 :         let image_layer_writer = ImageLayerWriter::new(
    1460            0 :             self.timeline.conf,
    1461            0 :             self.timeline.timeline_id,
    1462            0 :             self.timeline.tenant_shard_id,
    1463            0 :             key_range,
    1464            0 :             lsn,
    1465            0 :             ctx,
    1466            0 :         )
    1467            0 :         .await?;
    1468              : 
    1469            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1470            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    1471            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1472            0 :             )))
    1473            0 :         });
    1474              : 
    1475            0 :         let keyspace = KeySpace {
    1476            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    1477              :         };
    1478              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    1479            0 :         let start = Key::MIN;
    1480              :         let ImageLayerCreationOutcome {
    1481            0 :             image,
    1482              :             next_start_key: _,
    1483            0 :         } = self
    1484            0 :             .timeline
    1485            0 :             .create_image_layer_for_rel_blocks(
    1486            0 :                 &keyspace,
    1487            0 :                 image_layer_writer,
    1488            0 :                 lsn,
    1489            0 :                 ctx,
    1490            0 :                 key_range.clone(),
    1491            0 :                 start,
    1492            0 :             )
    1493            0 :             .await?;
    1494              : 
    1495            0 :         if let Some(image_layer) = image {
    1496            0 :             self.new_images.push(image_layer);
    1497            0 :         }
    1498              : 
    1499            0 :         timer.stop_and_record();
    1500            0 : 
    1501            0 :         Ok(())
    1502            0 :     }
    1503              : }
    1504              : 
    1505              : impl CompactionRequestContext for crate::context::RequestContext {}
    1506              : 
    1507              : #[derive(Debug, Clone)]
    1508              : pub struct OwnArc<T>(pub Arc<T>);
    1509              : 
    1510              : impl<T> Deref for OwnArc<T> {
    1511              :     type Target = <Arc<T> as Deref>::Target;
    1512            0 :     fn deref(&self) -> &Self::Target {
    1513            0 :         &self.0
    1514            0 :     }
    1515              : }
    1516              : 
    1517              : impl<T> AsRef<T> for OwnArc<T> {
    1518            0 :     fn as_ref(&self) -> &T {
    1519            0 :         self.0.as_ref()
    1520            0 :     }
    1521              : }
    1522              : 
    1523              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1524            0 :     fn key_range(&self) -> &Range<Key> {
    1525            0 :         &self.key_range
    1526            0 :     }
    1527            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1528            0 :         &self.lsn_range
    1529            0 :     }
    1530            0 :     fn file_size(&self) -> u64 {
    1531            0 :         self.file_size
    1532            0 :     }
    1533            0 :     fn short_id(&self) -> std::string::String {
    1534            0 :         self.as_ref().short_id().to_string()
    1535            0 :     }
    1536            0 :     fn is_delta(&self) -> bool {
    1537            0 :         self.as_ref().is_delta()
    1538            0 :     }
    1539              : }
    1540              : 
    1541              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1542            0 :     fn key_range(&self) -> &Range<Key> {
    1543            0 :         &self.layer_desc().key_range
    1544            0 :     }
    1545            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1546            0 :         &self.layer_desc().lsn_range
    1547            0 :     }
    1548            0 :     fn file_size(&self) -> u64 {
    1549            0 :         self.layer_desc().file_size
    1550            0 :     }
    1551            0 :     fn short_id(&self) -> std::string::String {
    1552            0 :         self.layer_desc().short_id().to_string()
    1553            0 :     }
    1554            0 :     fn is_delta(&self) -> bool {
    1555            0 :         true
    1556            0 :     }
    1557              : }
    1558              : 
    1559              : use crate::tenant::timeline::DeltaEntry;
    1560              : 
    1561              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1562            0 :     fn key_range(&self) -> &Range<Key> {
    1563            0 :         &self.0.layer_desc().key_range
    1564            0 :     }
    1565            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1566            0 :         &self.0.layer_desc().lsn_range
    1567            0 :     }
    1568            0 :     fn file_size(&self) -> u64 {
    1569            0 :         self.0.layer_desc().file_size
    1570            0 :     }
    1571            0 :     fn short_id(&self) -> std::string::String {
    1572            0 :         self.0.layer_desc().short_id().to_string()
    1573            0 :     }
    1574            0 :     fn is_delta(&self) -> bool {
    1575            0 :         true
    1576            0 :     }
    1577              : }
    1578              : 
    1579              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1580              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1581              : 
    1582            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1583            0 :         self.0.load_keys(ctx).await
    1584            0 :     }
    1585              : }
    1586              : 
    1587              : impl CompactionLayer<Key> for ResidentImageLayer {
    1588            0 :     fn key_range(&self) -> &Range<Key> {
    1589            0 :         &self.0.layer_desc().key_range
    1590            0 :     }
    1591            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1592            0 :         &self.0.layer_desc().lsn_range
    1593            0 :     }
    1594            0 :     fn file_size(&self) -> u64 {
    1595            0 :         self.0.layer_desc().file_size
    1596            0 :     }
    1597            0 :     fn short_id(&self) -> std::string::String {
    1598            0 :         self.0.layer_desc().short_id().to_string()
    1599            0 :     }
    1600            0 :     fn is_delta(&self) -> bool {
    1601            0 :         false
    1602            0 :     }
    1603              : }
    1604              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta