LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: fabb29a6339542ee130cd1d32b534fafdc0be240.info Lines: 55.4 % 1123 622
Test Date: 2024-06-25 13:20:00 Functions: 25.3 % 95 24

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{
      13              :     CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
      14              :     RecordedDuration, Timeline,
      15              : };
      16              : 
      17              : use anyhow::{anyhow, Context};
      18              : use enumset::EnumSet;
      19              : use fail::fail_point;
      20              : use itertools::Itertools;
      21              : use pageserver_api::keyspace::ShardedRange;
      22              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      25              : use utils::id::TimelineId;
      26              : 
      27              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache;
      29              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      30              : use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
      31              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      32              : use crate::tenant::timeline::{Layer, ResidentLayer};
      33              : use crate::tenant::DeltaLayer;
      34              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      35              : 
      36              : use crate::keyspace::KeySpace;
      37              : use crate::repository::Key;
      38              : 
      39              : use utils::lsn::Lsn;
      40              : 
      41              : use pageserver_compaction::helpers::overlaps_with;
      42              : use pageserver_compaction::interface::*;
      43              : 
      44              : use super::CompactionError;
      45              : 
      46              : impl Timeline {
      47              :     /// TODO: cancellation
      48          364 :     pub(crate) async fn compact_legacy(
      49          364 :         self: &Arc<Self>,
      50          364 :         _cancel: &CancellationToken,
      51          364 :         flags: EnumSet<CompactFlags>,
      52          364 :         ctx: &RequestContext,
      53          364 :     ) -> Result<(), CompactionError> {
      54          364 :         // High level strategy for compaction / image creation:
      55          364 :         //
      56          364 :         // 1. First, calculate the desired "partitioning" of the
      57          364 :         // currently in-use key space. The goal is to partition the
      58          364 :         // key space into roughly fixed-size chunks, but also take into
      59          364 :         // account any existing image layers, and try to align the
      60          364 :         // chunk boundaries with the existing image layers to avoid
      61          364 :         // too much churn. Also try to align chunk boundaries with
      62          364 :         // relation boundaries.  In principle, we don't know about
      63          364 :         // relation boundaries here, we just deal with key-value
      64          364 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      65          364 :         // map relations into key-value pairs. But in practice we know
      66          364 :         // that 'field6' is the block number, and the fields 1-5
      67          364 :         // identify a relation. This is just an optimization,
      68          364 :         // though.
      69          364 :         //
      70          364 :         // 2. Once we know the partitioning, for each partition,
      71          364 :         // decide if it's time to create a new image layer. The
      72          364 :         // criteria is: there has been too much "churn" since the last
      73          364 :         // image layer? The "churn" is fuzzy concept, it's a
      74          364 :         // combination of too many delta files, or too much WAL in
      75          364 :         // total in the delta file. Or perhaps: if creating an image
      76          364 :         // file would allow to delete some older files.
      77          364 :         //
      78          364 :         // 3. After that, we compact all level0 delta files if there
      79          364 :         // are too many of them.  While compacting, we also garbage
      80          364 :         // collect any page versions that are no longer needed because
      81          364 :         // of the new image layers we created in step 2.
      82          364 :         //
      83          364 :         // TODO: This high level strategy hasn't been implemented yet.
      84          364 :         // Below are functions compact_level0() and create_image_layers()
      85          364 :         // but they are a bit ad hoc and don't quite work like it's explained
      86          364 :         // above. Rewrite it.
      87          364 : 
      88          364 :         // Is the timeline being deleted?
      89          364 :         if self.is_stopping() {
      90            0 :             trace!("Dropping out of compaction on timeline shutdown");
      91            0 :             return Err(CompactionError::ShuttingDown);
      92          364 :         }
      93          364 : 
      94          364 :         let target_file_size = self.get_checkpoint_distance();
      95              : 
      96              :         // Define partitioning schema if needed
      97              : 
      98              :         // FIXME: the match should only cover repartitioning, not the next steps
      99          364 :         let partition_count = match self
     100          364 :             .repartition(
     101          364 :                 self.get_last_record_lsn(),
     102          364 :                 self.get_compaction_target_size(),
     103          364 :                 flags,
     104          364 :                 ctx,
     105          364 :             )
     106        14166 :             .await
     107              :         {
     108          364 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     109          364 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     110          364 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     111          364 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     112          364 :                     .build();
     113          364 : 
     114          364 :                 // 2. Compact
     115          364 :                 let timer = self.metrics.compact_time_histo.start_timer();
     116        42648 :                 self.compact_level0(target_file_size, ctx).await?;
     117          364 :                 timer.stop_and_record();
     118          364 : 
     119          364 :                 // 3. Create new image layers for partitions that have been modified
     120          364 :                 // "enough".
     121          364 :                 let mut partitioning = dense_partitioning;
     122          364 :                 partitioning
     123          364 :                     .parts
     124          364 :                     .extend(sparse_partitioning.into_dense().parts);
     125          364 :                 let image_layers = self
     126          364 :                     .create_image_layers(
     127          364 :                         &partitioning,
     128          364 :                         lsn,
     129          364 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     130           14 :                             ImageLayerCreationMode::Force
     131              :                         } else {
     132          350 :                             ImageLayerCreationMode::Try
     133              :                         },
     134          364 :                         &image_ctx,
     135              :                     )
     136        14421 :                     .await?;
     137              : 
     138          364 :                 self.upload_new_image_layers(image_layers)?;
     139          364 :                 partitioning.parts.len()
     140              :             }
     141            0 :             Err(err) => {
     142            0 :                 // no partitioning? This is normal, if the timeline was just created
     143            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     144            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     145            0 :                 // error but continue.
     146            0 :                 //
     147            0 :                 // Suppress error when it's due to cancellation
     148            0 :                 if !self.cancel.is_cancelled() {
     149            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     150            0 :                 }
     151            0 :                 1
     152              :             }
     153              :         };
     154              : 
     155          364 :         if self.shard_identity.count >= ShardCount::new(2) {
     156              :             // Limit the number of layer rewrites to the number of partitions: this means its
     157              :             // runtime should be comparable to a full round of image layer creations, rather than
     158              :             // being potentially much longer.
     159            0 :             let rewrite_max = partition_count;
     160            0 : 
     161            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     162          364 :         }
     163              : 
     164          364 :         Ok(())
     165          364 :     }
     166              : 
     167              :     /// Check for layers that are elegible to be rewritten:
     168              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     169              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     170              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     171              :     ///   rather not maintain compatibility with indefinitely.
     172              :     ///
     173              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     174              :     /// how much work it will try to do in each compaction pass.
     175            0 :     async fn compact_shard_ancestors(
     176            0 :         self: &Arc<Self>,
     177            0 :         rewrite_max: usize,
     178            0 :         ctx: &RequestContext,
     179            0 :     ) -> anyhow::Result<()> {
     180            0 :         let mut drop_layers = Vec::new();
     181            0 :         let mut layers_to_rewrite: Vec<Layer> = Vec::new();
     182            0 : 
     183            0 :         // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
     184            0 :         // layer is behind this Lsn, it indicates that the layer is being retained beyond the
     185            0 :         // pitr_interval, for example because a branchpoint references it.
     186            0 :         //
     187            0 :         // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
     188            0 :         // are rewriting layers.
     189            0 :         let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
     190            0 : 
     191            0 :         tracing::info!(
     192            0 :             "latest_gc_cutoff: {}, pitr cutoff {}",
     193            0 :             *latest_gc_cutoff,
     194            0 :             self.gc_info.read().unwrap().cutoffs.pitr
     195              :         );
     196              : 
     197            0 :         let layers = self.layers.read().await;
     198            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     199            0 :             let layer = layers.get_from_desc(&layer_desc);
     200            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     201              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     202            0 :                 continue;
     203            0 :             }
     204            0 : 
     205            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     206            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     207            0 :             let layer_local_page_count = sharded_range.page_count();
     208            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     209            0 :             if layer_local_page_count == 0 {
     210              :                 // This ancestral layer only covers keys that belong to other shards.
     211              :                 // We include the full metadata in the log: if we had some critical bug that caused
     212              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     213            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     214            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     215              :                 );
     216              : 
     217            0 :                 if cfg!(debug_assertions) {
     218              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     219              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     220              :                     // should be !is_key_disposable()
     221            0 :                     let range = layer_desc.get_key_range();
     222            0 :                     let mut key = range.start;
     223            0 :                     while key < range.end {
     224            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     225            0 :                         key = key.next();
     226              :                     }
     227            0 :                 }
     228              : 
     229            0 :                 drop_layers.push(layer);
     230            0 :                 continue;
     231            0 :             } else if layer_local_page_count != u32::MAX
     232            0 :                 && layer_local_page_count == layer_raw_page_count
     233              :             {
     234            0 :                 debug!(%layer,
     235            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     236              :                     layer_local_page_count
     237              :                 );
     238            0 :                 continue;
     239            0 :             }
     240            0 : 
     241            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     242            0 :             if layer_local_page_count != u32::MAX
     243            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     244              :             {
     245            0 :                 debug!(%layer,
     246            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     247              :                     layer_local_page_count,
     248              :                     layer_raw_page_count
     249              :                 );
     250            0 :             }
     251              : 
     252              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     253              :             // without incurring the I/O cost of a rewrite.
     254            0 :             if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
     255            0 :                 debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
     256            0 :                     layer_desc.get_lsn_range().end, *latest_gc_cutoff);
     257            0 :                 continue;
     258            0 :             }
     259            0 : 
     260            0 :             if layer_desc.is_delta() {
     261              :                 // We do not yet implement rewrite of delta layers
     262            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     263            0 :                 continue;
     264            0 :             }
     265            0 : 
     266            0 :             // Only rewrite layers if their generations differ.  This guarantees:
     267            0 :             //  - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
     268            0 :             //  - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
     269            0 :             if layer.metadata().generation == self.generation {
     270            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     271            0 :                 continue;
     272            0 :             }
     273            0 : 
     274            0 :             if layers_to_rewrite.len() >= rewrite_max {
     275            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     276            0 :                     layers_to_rewrite.len()
     277              :                 );
     278            0 :                 continue;
     279            0 :             }
     280            0 : 
     281            0 :             // Fall through: all our conditions for doing a rewrite passed.
     282            0 :             layers_to_rewrite.push(layer);
     283              :         }
     284              : 
     285              :         // Drop read lock on layer map before we start doing time-consuming I/O
     286            0 :         drop(layers);
     287            0 : 
     288            0 :         let mut replace_image_layers = Vec::new();
     289              : 
     290            0 :         for layer in layers_to_rewrite {
     291            0 :             tracing::info!(layer=%layer, "Rewriting layer after shard split...");
     292            0 :             let mut image_layer_writer = ImageLayerWriter::new(
     293            0 :                 self.conf,
     294            0 :                 self.timeline_id,
     295            0 :                 self.tenant_shard_id,
     296            0 :                 &layer.layer_desc().key_range,
     297            0 :                 layer.layer_desc().image_layer_lsn(),
     298            0 :                 ctx,
     299            0 :             )
     300            0 :             .await?;
     301              : 
     302              :             // Safety of layer rewrites:
     303              :             // - We are writing to a different local file path than we are reading from, so the old Layer
     304              :             //   cannot interfere with the new one.
     305              :             // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
     306              :             //   is different for two layers with the same name (in `ImageLayerInner::new` we always
     307              :             //   acquire a fresh id from [`crate::page_cache::next_file_id`].  So readers do not risk
     308              :             //   reading the index from one layer file, and then data blocks from the rewritten layer file.
     309              :             // - Any readers that have a reference to the old layer will keep it alive until they are done
     310              :             //   with it. If they are trying to promote from remote storage, that will fail, but this is the same
     311              :             //   as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
     312              :             // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
     313              :             //    - GC, which at worst witnesses us "undelete" a layer that they just deleted.
     314              :             //    - ingestion, which only inserts layers, therefore cannot collide with us.
     315            0 :             let resident = layer.download_and_keep_resident().await?;
     316              : 
     317            0 :             let keys_written = resident
     318            0 :                 .filter(&self.shard_identity, &mut image_layer_writer, ctx)
     319            0 :                 .await?;
     320              : 
     321            0 :             if keys_written > 0 {
     322            0 :                 let new_layer = image_layer_writer.finish(self, ctx).await?;
     323            0 :                 tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
     324            0 :                     layer.metadata().file_size,
     325            0 :                     new_layer.metadata().file_size);
     326              : 
     327            0 :                 replace_image_layers.push((layer, new_layer));
     328            0 :             } else {
     329            0 :                 // Drop the old layer.  Usually for this case we would already have noticed that
     330            0 :                 // the layer has no data for us with the ShardedRange check above, but
     331            0 :                 drop_layers.push(layer);
     332            0 :             }
     333              :         }
     334              : 
     335              :         // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
     336              :         // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
     337              :         // to remote index) and be removed. This is inefficient but safe.
     338              :         fail::fail_point!("compact-shard-ancestors-localonly");
     339              : 
     340              :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     341            0 :         self.rewrite_layers(replace_image_layers, drop_layers)
     342            0 :             .await?;
     343              : 
     344              :         fail::fail_point!("compact-shard-ancestors-enqueued");
     345              : 
     346              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     347              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     348              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     349              :         // load.
     350            0 :         self.remote_client.wait_completion().await?;
     351              : 
     352              :         fail::fail_point!("compact-shard-ancestors-persistent");
     353              : 
     354            0 :         Ok(())
     355            0 :     }
     356              : 
     357              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     358              :     /// as Level 1 files.
     359          364 :     async fn compact_level0(
     360          364 :         self: &Arc<Self>,
     361          364 :         target_file_size: u64,
     362          364 :         ctx: &RequestContext,
     363          364 :     ) -> Result<(), CompactionError> {
     364              :         let CompactLevel0Phase1Result {
     365          364 :             new_layers,
     366          364 :             deltas_to_compact,
     367              :         } = {
     368          364 :             let phase1_span = info_span!("compact_level0_phase1");
     369          364 :             let ctx = ctx.attached_child();
     370          364 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     371          364 :                 version: Some(2),
     372          364 :                 tenant_id: Some(self.tenant_shard_id),
     373          364 :                 timeline_id: Some(self.timeline_id),
     374          364 :                 ..Default::default()
     375          364 :             };
     376          364 : 
     377          364 :             let begin = tokio::time::Instant::now();
     378          364 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     379          364 :             let now = tokio::time::Instant::now();
     380          364 :             stats.read_lock_acquisition_micros =
     381          364 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     382          364 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     383          364 :                 .instrument(phase1_span)
     384        42648 :                 .await?
     385              :         };
     386              : 
     387          364 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     388              :             // nothing to do
     389          336 :             return Ok(());
     390           28 :         }
     391           28 : 
     392           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     393            0 :             .await?;
     394           28 :         Ok(())
     395          364 :     }
     396              : 
     397              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     398          364 :     async fn compact_level0_phase1(
     399          364 :         self: &Arc<Self>,
     400          364 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     401          364 :         mut stats: CompactLevel0Phase1StatsBuilder,
     402          364 :         target_file_size: u64,
     403          364 :         ctx: &RequestContext,
     404          364 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     405          364 :         stats.read_lock_held_spawn_blocking_startup_micros =
     406          364 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     407          364 :         let layers = guard.layer_map();
     408          364 :         let level0_deltas = layers.get_level0_deltas()?;
     409          364 :         let mut level0_deltas = level0_deltas
     410          364 :             .into_iter()
     411         1616 :             .map(|x| guard.get_from_desc(&x))
     412          364 :             .collect_vec();
     413          364 :         stats.level0_deltas_count = Some(level0_deltas.len());
     414          364 :         // Only compact if enough layers have accumulated.
     415          364 :         let threshold = self.get_compaction_threshold();
     416          364 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     417          336 :             debug!(
     418            0 :                 level0_deltas = level0_deltas.len(),
     419            0 :                 threshold, "too few deltas to compact"
     420              :             );
     421          336 :             return Ok(CompactLevel0Phase1Result::default());
     422           28 :         }
     423           28 : 
     424           28 :         // Gather the files to compact in this iteration.
     425           28 :         //
     426           28 :         // Start with the oldest Level 0 delta file, and collect any other
     427           28 :         // level 0 files that form a contiguous sequence, such that the end
     428           28 :         // LSN of previous file matches the start LSN of the next file.
     429           28 :         //
     430           28 :         // Note that if the files don't form such a sequence, we might
     431           28 :         // "compact" just a single file. That's a bit pointless, but it allows
     432           28 :         // us to get rid of the level 0 file, and compact the other files on
     433           28 :         // the next iteration. This could probably made smarter, but such
     434           28 :         // "gaps" in the sequence of level 0 files should only happen in case
     435           28 :         // of a crash, partial download from cloud storage, or something like
     436           28 :         // that, so it's not a big deal in practice.
     437          748 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     438           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     439           28 : 
     440           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     441           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     442           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     443           28 : 
     444           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     445          402 :         for l in level0_deltas_iter {
     446          374 :             let lsn_range = &l.layer_desc().lsn_range;
     447          374 : 
     448          374 :             if lsn_range.start != prev_lsn_end {
     449            0 :                 break;
     450          374 :             }
     451          374 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     452          374 :             prev_lsn_end = lsn_range.end;
     453              :         }
     454           28 :         let lsn_range = Range {
     455           28 :             start: deltas_to_compact
     456           28 :                 .first()
     457           28 :                 .unwrap()
     458           28 :                 .layer_desc()
     459           28 :                 .lsn_range
     460           28 :                 .start,
     461           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     462           28 :         };
     463           28 : 
     464           28 :         info!(
     465            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     466            0 :             lsn_range.start,
     467            0 :             lsn_range.end,
     468            0 :             deltas_to_compact.len(),
     469            0 :             level0_deltas.len()
     470              :         );
     471              : 
     472          402 :         for l in deltas_to_compact.iter() {
     473          402 :             info!("compact includes {l}");
     474              :         }
     475              : 
     476              :         // We don't need the original list of layers anymore. Drop it so that
     477              :         // we don't accidentally use it later in the function.
     478           28 :         drop(level0_deltas);
     479           28 : 
     480           28 :         stats.read_lock_held_prerequisites_micros = stats
     481           28 :             .read_lock_held_spawn_blocking_startup_micros
     482           28 :             .till_now();
     483           28 : 
     484           28 :         // Determine N largest holes where N is number of compacted layers.
     485           28 :         let max_holes = deltas_to_compact.len();
     486           28 :         let last_record_lsn = self.get_last_record_lsn();
     487           28 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     488           28 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     489           28 : 
     490           28 :         // min-heap (reserve space for one more element added before eviction)
     491           28 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     492           28 :         let mut prev: Option<Key> = None;
     493           28 : 
     494           28 :         let mut all_keys = Vec::new();
     495              : 
     496          402 :         for l in deltas_to_compact.iter() {
     497         2413 :             all_keys.extend(l.load_keys(ctx).await?);
     498              :         }
     499              : 
     500              :         // FIXME: should spawn_blocking the rest of this function
     501              : 
     502              :         // The current stdlib sorting implementation is designed in a way where it is
     503              :         // particularly fast where the slice is made up of sorted sub-ranges.
     504      4431806 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     505           28 : 
     506           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     507              : 
     508      2064038 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     509      2064038 :             if let Some(prev_key) = prev {
     510              :                 // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     511              :                 // compaction is the gap between data key and metadata keys.
     512      2064010 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     513            0 :                     && !Key::is_metadata_key(&prev_key)
     514              :                 {
     515            0 :                     let key_range = prev_key..next_key;
     516            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     517            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     518            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     519            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     520            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     521            0 :                     if coverage_size >= min_hole_coverage_size {
     522            0 :                         heap.push(Hole {
     523            0 :                             key_range,
     524            0 :                             coverage_size,
     525            0 :                         });
     526            0 :                         if heap.len() > max_holes {
     527            0 :                             heap.pop(); // remove smallest hole
     528            0 :                         }
     529            0 :                     }
     530      2064010 :                 }
     531           28 :             }
     532      2064038 :             prev = Some(next_key.next());
     533              :         }
     534           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     535           28 :         drop_rlock(guard);
     536           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     537           28 :         let mut holes = heap.into_vec();
     538           28 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     539           28 :         let mut next_hole = 0; // index of next hole in holes vector
     540           28 : 
     541           28 :         // This iterator walks through all key-value pairs from all the layers
     542           28 :         // we're compacting, in key, LSN order.
     543           28 :         let all_values_iter = all_keys.iter();
     544           28 : 
     545           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
     546           28 :         let mut all_keys_iter = all_keys
     547           28 :             .iter()
     548      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     549      2064010 :             .coalesce(|mut prev, cur| {
     550      2064010 :                 // Coalesce keys that belong to the same key pair.
     551      2064010 :                 // This ensures that compaction doesn't put them
     552      2064010 :                 // into different layer files.
     553      2064010 :                 // Still limit this by the target file size,
     554      2064010 :                 // so that we keep the size of the files in
     555      2064010 :                 // check.
     556      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     557        40038 :                     prev.2 += cur.2;
     558        40038 :                     Ok(prev)
     559              :                 } else {
     560      2023972 :                     Err((prev, cur))
     561              :                 }
     562      2064010 :             });
     563           28 : 
     564           28 :         // Merge the contents of all the input delta layers into a new set
     565           28 :         // of delta layers, based on the current partitioning.
     566           28 :         //
     567           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     568           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     569           28 :         // would be too large. In that case, we also split on the LSN dimension.
     570           28 :         //
     571           28 :         // LSN
     572           28 :         //  ^
     573           28 :         //  |
     574           28 :         //  | +-----------+            +--+--+--+--+
     575           28 :         //  | |           |            |  |  |  |  |
     576           28 :         //  | +-----------+            |  |  |  |  |
     577           28 :         //  | |           |            |  |  |  |  |
     578           28 :         //  | +-----------+     ==>    |  |  |  |  |
     579           28 :         //  | |           |            |  |  |  |  |
     580           28 :         //  | +-----------+            |  |  |  |  |
     581           28 :         //  | |           |            |  |  |  |  |
     582           28 :         //  | +-----------+            +--+--+--+--+
     583           28 :         //  |
     584           28 :         //  +--------------> key
     585           28 :         //
     586           28 :         //
     587           28 :         // If one key (X) has a lot of page versions:
     588           28 :         //
     589           28 :         // LSN
     590           28 :         //  ^
     591           28 :         //  |                                 (X)
     592           28 :         //  | +-----------+            +--+--+--+--+
     593           28 :         //  | |           |            |  |  |  |  |
     594           28 :         //  | +-----------+            |  |  +--+  |
     595           28 :         //  | |           |            |  |  |  |  |
     596           28 :         //  | +-----------+     ==>    |  |  |  |  |
     597           28 :         //  | |           |            |  |  +--+  |
     598           28 :         //  | +-----------+            |  |  |  |  |
     599           28 :         //  | |           |            |  |  |  |  |
     600           28 :         //  | +-----------+            +--+--+--+--+
     601           28 :         //  |
     602           28 :         //  +--------------> key
     603           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
     604           28 :         // based on the partitioning.
     605           28 :         //
     606           28 :         // TODO: we should also opportunistically materialize and
     607           28 :         // garbage collect what we can.
     608           28 :         let mut new_layers = Vec::new();
     609           28 :         let mut prev_key: Option<Key> = None;
     610           28 :         let mut writer: Option<DeltaLayerWriter> = None;
     611           28 :         let mut key_values_total_size = 0u64;
     612           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     613           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     614              : 
     615              :         for &DeltaEntry {
     616      2064038 :             key, lsn, ref val, ..
     617      2064066 :         } in all_values_iter
     618              :         {
     619      2064038 :             let value = val.load(ctx).await?;
     620      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     621      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     622      2064038 :             if !same_key || lsn == dup_end_lsn {
     623      2024000 :                 let mut next_key_size = 0u64;
     624      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
     625      2024000 :                 dup_start_lsn = Lsn::INVALID;
     626      2024000 :                 if !same_key {
     627      2024000 :                     dup_end_lsn = Lsn::INVALID;
     628      2024000 :                 }
     629              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     630      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     631      2024000 :                     next_key_size = next_size;
     632      2024000 :                     if key != next_key {
     633      2023972 :                         if dup_end_lsn.is_valid() {
     634            0 :                             // We are writting segment with duplicates:
     635            0 :                             // place all remaining values of this key in separate segment
     636            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     637            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     638      2023972 :                         }
     639      2023972 :                         break;
     640           28 :                     }
     641           28 :                     key_values_total_size += next_size;
     642           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     643           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     644           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     645              :                         // Split key between multiple layers: such layer can contain only single key
     646            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     647            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     648              :                         } else {
     649            0 :                             lsn // start with the first LSN for this key
     650              :                         };
     651            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     652            0 :                         break;
     653           28 :                     }
     654              :                 }
     655              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     656      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     657            0 :                     dup_start_lsn = dup_end_lsn;
     658            0 :                     dup_end_lsn = lsn_range.end;
     659      2024000 :                 }
     660      2024000 :                 if writer.is_some() {
     661      2023972 :                     let written_size = writer.as_mut().unwrap().size();
     662      2023972 :                     let contains_hole =
     663      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     664              :                     // check if key cause layer overflow or contains hole...
     665      2023972 :                     if is_dup_layer
     666      2023972 :                         || dup_end_lsn.is_valid()
     667      2023972 :                         || written_size + key_values_total_size > target_file_size
     668      2023692 :                         || contains_hole
     669              :                     {
     670              :                         // ... if so, flush previous layer and prepare to write new one
     671          280 :                         new_layers.push(
     672          280 :                             writer
     673          280 :                                 .take()
     674          280 :                                 .unwrap()
     675          280 :                                 .finish(prev_key.unwrap().next(), self, ctx)
     676          720 :                                 .await?,
     677              :                         );
     678          280 :                         writer = None;
     679          280 : 
     680          280 :                         if contains_hole {
     681            0 :                             // skip hole
     682            0 :                             next_hole += 1;
     683          280 :                         }
     684      2023692 :                     }
     685           28 :                 }
     686              :                 // Remember size of key value because at next iteration we will access next item
     687      2024000 :                 key_values_total_size = next_key_size;
     688        40038 :             }
     689      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     690            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     691            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     692            0 :                 )))
     693      2064038 :             });
     694              : 
     695      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
     696      2064038 :                 if writer.is_none() {
     697          308 :                     // Create writer if not initiaized yet
     698          308 :                     writer = Some(
     699              :                         DeltaLayerWriter::new(
     700          308 :                             self.conf,
     701          308 :                             self.timeline_id,
     702          308 :                             self.tenant_shard_id,
     703          308 :                             key,
     704          308 :                             if dup_end_lsn.is_valid() {
     705              :                                 // this is a layer containing slice of values of the same key
     706            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     707            0 :                                 dup_start_lsn..dup_end_lsn
     708              :                             } else {
     709          308 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     710          308 :                                 lsn_range.clone()
     711              :                             },
     712          308 :                             ctx,
     713              :                         )
     714          154 :                         .await?,
     715              :                     );
     716      2063730 :                 }
     717              : 
     718      2064038 :                 writer
     719      2064038 :                     .as_mut()
     720      2064038 :                     .unwrap()
     721      2064038 :                     .put_value(key, lsn, value, ctx)
     722         1565 :                     .await?;
     723              :             } else {
     724            0 :                 debug!(
     725            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     726            0 :                     key,
     727            0 :                     self.shard_identity.get_shard_number(&key)
     728              :                 );
     729              :             }
     730              : 
     731      2064038 :             if !new_layers.is_empty() {
     732        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     733      2044252 :             }
     734              : 
     735      2064038 :             prev_key = Some(key);
     736              :         }
     737           28 :         if let Some(writer) = writer {
     738         1990 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
     739            0 :         }
     740              : 
     741              :         // Sync layers
     742           28 :         if !new_layers.is_empty() {
     743              :             // Print a warning if the created layer is larger than double the target size
     744              :             // Add two pages for potential overhead. This should in theory be already
     745              :             // accounted for in the target calculation, but for very small targets,
     746              :             // we still might easily hit the limit otherwise.
     747           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     748          308 :             for layer in new_layers.iter() {
     749          308 :                 if layer.layer_desc().file_size > warn_limit {
     750            0 :                     warn!(
     751              :                         %layer,
     752            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     753              :                     );
     754          308 :                 }
     755              :             }
     756              : 
     757              :             // The writer.finish() above already did the fsync of the inodes.
     758              :             // We just need to fsync the directory in which these inodes are linked,
     759              :             // which we know to be the timeline directory.
     760              :             //
     761              :             // We use fatal_err() below because the after writer.finish() returns with success,
     762              :             // the in-memory state of the filesystem already has the layer file in its final place,
     763              :             // and subsequent pageserver code could think it's durable while it really isn't.
     764           28 :             let timeline_dir = VirtualFile::open(
     765           28 :                 &self
     766           28 :                     .conf
     767           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     768           28 :                 ctx,
     769           28 :             )
     770           14 :             .await
     771           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     772           28 :             timeline_dir
     773           28 :                 .sync_all()
     774           15 :                 .await
     775           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     776            0 :         }
     777              : 
     778           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     779           28 :         stats.new_deltas_count = Some(new_layers.len());
     780          308 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     781           28 : 
     782           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     783           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     784              :         {
     785           28 :             Ok(stats_json) => {
     786           28 :                 info!(
     787            0 :                     stats_json = stats_json.as_str(),
     788            0 :                     "compact_level0_phase1 stats available"
     789              :                 )
     790              :             }
     791            0 :             Err(e) => {
     792            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     793              :             }
     794              :         }
     795              : 
     796           28 :         Ok(CompactLevel0Phase1Result {
     797           28 :             new_layers,
     798           28 :             deltas_to_compact: deltas_to_compact
     799           28 :                 .into_iter()
     800          402 :                 .map(|x| x.drop_eviction_guard())
     801           28 :                 .collect::<Vec<_>>(),
     802           28 :         })
     803          364 :     }
     804              : }
     805              : 
     806              : #[derive(Default)]
     807              : struct CompactLevel0Phase1Result {
     808              :     new_layers: Vec<ResidentLayer>,
     809              :     deltas_to_compact: Vec<Layer>,
     810              : }
     811              : 
     812              : #[derive(Default)]
     813              : struct CompactLevel0Phase1StatsBuilder {
     814              :     version: Option<u64>,
     815              :     tenant_id: Option<TenantShardId>,
     816              :     timeline_id: Option<TimelineId>,
     817              :     read_lock_acquisition_micros: DurationRecorder,
     818              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     819              :     read_lock_held_key_sort_micros: DurationRecorder,
     820              :     read_lock_held_prerequisites_micros: DurationRecorder,
     821              :     read_lock_held_compute_holes_micros: DurationRecorder,
     822              :     read_lock_drop_micros: DurationRecorder,
     823              :     write_layer_files_micros: DurationRecorder,
     824              :     level0_deltas_count: Option<usize>,
     825              :     new_deltas_count: Option<usize>,
     826              :     new_deltas_size: Option<u64>,
     827              : }
     828              : 
     829              : #[derive(serde::Serialize)]
     830              : struct CompactLevel0Phase1Stats {
     831              :     version: u64,
     832              :     tenant_id: TenantShardId,
     833              :     timeline_id: TimelineId,
     834              :     read_lock_acquisition_micros: RecordedDuration,
     835              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     836              :     read_lock_held_key_sort_micros: RecordedDuration,
     837              :     read_lock_held_prerequisites_micros: RecordedDuration,
     838              :     read_lock_held_compute_holes_micros: RecordedDuration,
     839              :     read_lock_drop_micros: RecordedDuration,
     840              :     write_layer_files_micros: RecordedDuration,
     841              :     level0_deltas_count: usize,
     842              :     new_deltas_count: usize,
     843              :     new_deltas_size: u64,
     844              : }
     845              : 
     846              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     847              :     type Error = anyhow::Error;
     848              : 
     849           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     850           28 :         Ok(Self {
     851           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     852           28 :             tenant_id: value
     853           28 :                 .tenant_id
     854           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     855           28 :             timeline_id: value
     856           28 :                 .timeline_id
     857           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     858           28 :             read_lock_acquisition_micros: value
     859           28 :                 .read_lock_acquisition_micros
     860           28 :                 .into_recorded()
     861           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     862           28 :             read_lock_held_spawn_blocking_startup_micros: value
     863           28 :                 .read_lock_held_spawn_blocking_startup_micros
     864           28 :                 .into_recorded()
     865           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     866           28 :             read_lock_held_key_sort_micros: value
     867           28 :                 .read_lock_held_key_sort_micros
     868           28 :                 .into_recorded()
     869           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     870           28 :             read_lock_held_prerequisites_micros: value
     871           28 :                 .read_lock_held_prerequisites_micros
     872           28 :                 .into_recorded()
     873           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     874           28 :             read_lock_held_compute_holes_micros: value
     875           28 :                 .read_lock_held_compute_holes_micros
     876           28 :                 .into_recorded()
     877           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     878           28 :             read_lock_drop_micros: value
     879           28 :                 .read_lock_drop_micros
     880           28 :                 .into_recorded()
     881           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     882           28 :             write_layer_files_micros: value
     883           28 :                 .write_layer_files_micros
     884           28 :                 .into_recorded()
     885           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     886           28 :             level0_deltas_count: value
     887           28 :                 .level0_deltas_count
     888           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     889           28 :             new_deltas_count: value
     890           28 :                 .new_deltas_count
     891           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     892           28 :             new_deltas_size: value
     893           28 :                 .new_deltas_size
     894           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     895              :         })
     896           28 :     }
     897              : }
     898              : 
     899              : impl Timeline {
     900              :     /// Entry point for new tiered compaction algorithm.
     901              :     ///
     902              :     /// All the real work is in the implementation in the pageserver_compaction
     903              :     /// crate. The code here would apply to any algorithm implemented by the
     904              :     /// same interface, but tiered is the only one at the moment.
     905              :     ///
     906              :     /// TODO: cancellation
     907            0 :     pub(crate) async fn compact_tiered(
     908            0 :         self: &Arc<Self>,
     909            0 :         _cancel: &CancellationToken,
     910            0 :         ctx: &RequestContext,
     911            0 :     ) -> Result<(), CompactionError> {
     912            0 :         let fanout = self.get_compaction_threshold() as u64;
     913            0 :         let target_file_size = self.get_checkpoint_distance();
     914              : 
     915              :         // Find the top of the historical layers
     916            0 :         let end_lsn = {
     917            0 :             let guard = self.layers.read().await;
     918            0 :             let layers = guard.layer_map();
     919              : 
     920            0 :             let l0_deltas = layers.get_level0_deltas()?;
     921            0 :             drop(guard);
     922            0 : 
     923            0 :             // As an optimization, if we find that there are too few L0 layers,
     924            0 :             // bail out early. We know that the compaction algorithm would do
     925            0 :             // nothing in that case.
     926            0 :             if l0_deltas.len() < fanout as usize {
     927              :                 // doesn't need compacting
     928            0 :                 return Ok(());
     929            0 :             }
     930            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     931            0 :         };
     932            0 : 
     933            0 :         // Is the timeline being deleted?
     934            0 :         if self.is_stopping() {
     935            0 :             trace!("Dropping out of compaction on timeline shutdown");
     936            0 :             return Err(CompactionError::ShuttingDown);
     937            0 :         }
     938              : 
     939            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
     940              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
     941            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
     942            0 : 
     943            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     944            0 :             &mut adaptor,
     945            0 :             end_lsn,
     946            0 :             target_file_size,
     947            0 :             fanout,
     948            0 :             ctx,
     949            0 :         )
     950            0 :         .await?;
     951              : 
     952            0 :         adaptor.flush_updates().await?;
     953            0 :         Ok(())
     954            0 :     }
     955              : 
     956              :     /// An experimental compaction building block that combines compaction with garbage collection.
     957              :     ///
     958              :     /// The current implementation picks all delta + image layers that are below or intersecting with
     959              :     /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
     960              :     /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
     961              :     /// and create delta layers with all deltas >= gc horizon.
     962              :     #[cfg(test)]
     963            4 :     pub(crate) async fn compact_with_gc(
     964            4 :         self: &Arc<Self>,
     965            4 :         _cancel: &CancellationToken,
     966            4 :         ctx: &RequestContext,
     967            4 :     ) -> Result<(), CompactionError> {
     968              :         use std::collections::BTreeSet;
     969              : 
     970              :         use crate::tenant::storage_layer::ValueReconstructState;
     971              :         // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
     972              :         // The layer selection has the following properties:
     973              :         // 1. If a layer is in the selection, all layers below it are in the selection.
     974              :         // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
     975            4 :         let (layer_selection, gc_cutoff) = {
     976            4 :             let guard = self.layers.read().await;
     977            4 :             let layers = guard.layer_map();
     978            4 :             let gc_info = self.gc_info.read().unwrap();
     979            4 :             let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
     980            4 :             let mut selected_layers = Vec::new();
     981            4 :             // TODO: consider retain_lsns
     982            4 :             drop(gc_info);
     983           20 :             for desc in layers.iter_historic_layers() {
     984           20 :                 if desc.get_lsn_range().start <= gc_cutoff {
     985           16 :                     selected_layers.push(guard.get_from_desc(&desc));
     986           16 :                 }
     987              :             }
     988            4 :             (selected_layers, gc_cutoff)
     989            4 :         };
     990            4 :         // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
     991            4 :         // Also, collect the layer information to decide when to split the new delta layers.
     992            4 :         let mut all_key_values = Vec::new();
     993            4 :         let mut delta_split_points = BTreeSet::new();
     994           20 :         for layer in &layer_selection {
     995           16 :             all_key_values.extend(layer.load_key_values(ctx).await?);
     996           16 :             let desc = layer.layer_desc();
     997           16 :             if desc.is_delta() {
     998            8 :                 // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
     999            8 :                 // so that we can avoid having too many small delta layers.
    1000            8 :                 let key_range = desc.get_key_range();
    1001            8 :                 delta_split_points.insert(key_range.start);
    1002            8 :                 delta_split_points.insert(key_range.end);
    1003            8 :             }
    1004              :         }
    1005              :         // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
    1006              :         // image layers, make image appear before than delta.
    1007              :         struct ValueWrapper<'a>(&'a crate::repository::Value);
    1008              :         impl Ord for ValueWrapper<'_> {
    1009            0 :             fn cmp(&self, other: &Self) -> std::cmp::Ordering {
    1010            0 :                 use crate::repository::Value;
    1011            0 :                 use std::cmp::Ordering;
    1012            0 :                 match (self.0, other.0) {
    1013            0 :                     (Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
    1014            0 :                     (Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
    1015            0 :                     _ => Ordering::Equal,
    1016              :                 }
    1017            0 :             }
    1018              :         }
    1019              :         impl PartialOrd for ValueWrapper<'_> {
    1020            0 :             fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
    1021            0 :                 Some(self.cmp(other))
    1022            0 :             }
    1023              :         }
    1024              :         impl PartialEq for ValueWrapper<'_> {
    1025            0 :             fn eq(&self, other: &Self) -> bool {
    1026            0 :                 self.cmp(other) == std::cmp::Ordering::Equal
    1027            0 :             }
    1028              :         }
    1029              :         impl Eq for ValueWrapper<'_> {}
    1030          164 :         all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
    1031          164 :             (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
    1032          164 :         });
    1033            4 :         // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
    1034            4 :         // Data of the same key.
    1035            4 :         let mut accumulated_values = Vec::new();
    1036            4 :         let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
    1037              : 
    1038              :         /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
    1039           72 :         async fn flush_accumulated_states(
    1040           72 :             tline: &Arc<Timeline>,
    1041           72 :             key: Key,
    1042           72 :             accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
    1043           72 :             horizon: Lsn,
    1044           72 :         ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
    1045           72 :             let mut base_image = None;
    1046           72 :             let mut keys_above_horizon = Vec::new();
    1047           72 :             let mut delta_above_base_image = Vec::new();
    1048              :             // We have a list of deltas/images. We want to create image layers while collect garbages.
    1049           84 :             for (key, lsn, val) in accumulated_values.iter().rev() {
    1050           84 :                 if *lsn > horizon {
    1051            4 :                     if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
    1052            0 :                         if *prev_lsn == *lsn {
    1053              :                             // The case that we have an LSN with both data from the delta layer and the image layer. As
    1054              :                             // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
    1055              :                             // drop this delta and keep the image.
    1056              :                             //
    1057              :                             // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
    1058              :                             // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
    1059              :                             // dropped.
    1060            0 :                             continue;
    1061            0 :                         }
    1062            4 :                     }
    1063            4 :                     keys_above_horizon.push((*key, *lsn, val.clone()));
    1064           80 :                 } else if *lsn <= horizon {
    1065           80 :                     match val {
    1066           72 :                         crate::repository::Value::Image(image) => {
    1067           72 :                             if lsn <= &horizon {
    1068           72 :                                 base_image = Some((*lsn, image.clone()));
    1069           72 :                                 break;
    1070            0 :                             }
    1071              :                         }
    1072            8 :                         crate::repository::Value::WalRecord(wal) => {
    1073            8 :                             delta_above_base_image.push((*lsn, wal.clone()));
    1074            8 :                         }
    1075              :                     }
    1076            0 :                 }
    1077              :             }
    1078           72 :             delta_above_base_image.reverse();
    1079           72 :             keys_above_horizon.reverse();
    1080           72 :             let state = ValueReconstructState {
    1081           72 :                 img: base_image,
    1082           72 :                 records: delta_above_base_image,
    1083           72 :             };
    1084           72 :             let img = tline.reconstruct_value(key, horizon, state).await?;
    1085           72 :             Ok((keys_above_horizon, img))
    1086           72 :         }
    1087              : 
    1088           72 :         async fn flush_deltas(
    1089           72 :             deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
    1090           72 :             last_key: Key,
    1091           72 :             delta_split_points: &[Key],
    1092           72 :             current_delta_split_point: &mut usize,
    1093           72 :             tline: &Arc<Timeline>,
    1094           72 :             gc_cutoff: Lsn,
    1095           72 :             ctx: &RequestContext,
    1096           72 :         ) -> anyhow::Result<Option<ResidentLayer>> {
    1097           72 :             // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
    1098           72 :             // overlapping layers.
    1099           72 :             //
    1100           72 :             // If we have a structure like this:
    1101           72 :             //
    1102           72 :             // | Delta 1 |         | Delta 4 |
    1103           72 :             // |---------| Delta 2 |---------|
    1104           72 :             // | Delta 3 |         | Delta 5 |
    1105           72 :             //
    1106           72 :             // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
    1107           72 :             // A simple solution here is to split the delta layers using the original boundary, while this
    1108           72 :             // might produce a lot of small layers. This should be improved and fixed in the future.
    1109           72 :             let mut need_split = false;
    1110           88 :             while *current_delta_split_point < delta_split_points.len()
    1111           76 :                 && last_key >= delta_split_points[*current_delta_split_point]
    1112           16 :             {
    1113           16 :                 *current_delta_split_point += 1;
    1114           16 :                 need_split = true;
    1115           16 :             }
    1116           72 :             if !need_split {
    1117           56 :                 return Ok(None);
    1118           16 :             }
    1119           16 :             let deltas = std::mem::take(deltas);
    1120           16 :             if deltas.is_empty() {
    1121           12 :                 return Ok(None);
    1122            4 :             }
    1123            4 :             let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
    1124            4 :             let mut delta_layer_writer = DeltaLayerWriter::new(
    1125            4 :                 tline.conf,
    1126            4 :                 tline.timeline_id,
    1127            4 :                 tline.tenant_shard_id,
    1128            4 :                 deltas.first().unwrap().0,
    1129            4 :                 gc_cutoff..end_lsn,
    1130            4 :                 ctx,
    1131            4 :             )
    1132            2 :             .await?;
    1133            4 :             let key_end = deltas.last().unwrap().0.next();
    1134            8 :             for (key, lsn, val) in deltas {
    1135            4 :                 delta_layer_writer.put_value(key, lsn, val, ctx).await?;
    1136              :             }
    1137           11 :             let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
    1138            4 :             Ok(Some(delta_layer))
    1139           72 :         }
    1140              : 
    1141            4 :         let mut image_layer_writer = ImageLayerWriter::new(
    1142            4 :             self.conf,
    1143            4 :             self.timeline_id,
    1144            4 :             self.tenant_shard_id,
    1145            4 :             &(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
    1146            4 :             gc_cutoff,
    1147            4 :             ctx,
    1148            4 :         )
    1149            2 :         .await?;
    1150              : 
    1151            4 :         let mut delta_values = Vec::new();
    1152            4 :         let delta_split_points = delta_split_points.into_iter().collect_vec();
    1153            4 :         let mut current_delta_split_point = 0;
    1154            4 :         let mut delta_layers = Vec::new();
    1155           96 :         for item @ (key, _, _) in &all_key_values {
    1156           92 :             if &last_key == key {
    1157           24 :                 accumulated_values.push(item);
    1158           24 :             } else {
    1159           68 :                 let (deltas, image) =
    1160           68 :                     flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
    1161            0 :                         .await?;
    1162              :                 // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1163           69 :                 image_layer_writer.put_image(last_key, image, ctx).await?;
    1164           68 :                 delta_values.extend(deltas);
    1165           68 :                 delta_layers.extend(
    1166           68 :                     flush_deltas(
    1167           68 :                         &mut delta_values,
    1168           68 :                         last_key,
    1169           68 :                         &delta_split_points,
    1170           68 :                         &mut current_delta_split_point,
    1171           68 :                         self,
    1172           68 :                         gc_cutoff,
    1173           68 :                         ctx,
    1174           68 :                     )
    1175           13 :                     .await?,
    1176              :                 );
    1177           68 :                 accumulated_values.clear();
    1178           68 :                 accumulated_values.push(item);
    1179           68 :                 last_key = *key;
    1180              :             }
    1181              :         }
    1182              : 
    1183              :         // TODO: move this part to the loop body
    1184            4 :         let (deltas, image) =
    1185            4 :             flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
    1186              :         // Put the image into the image layer. Currently we have a single big layer for the compaction.
    1187            4 :         image_layer_writer.put_image(last_key, image, ctx).await?;
    1188            4 :         delta_values.extend(deltas);
    1189            4 :         delta_layers.extend(
    1190            4 :             flush_deltas(
    1191            4 :                 &mut delta_values,
    1192            4 :                 last_key,
    1193            4 :                 &delta_split_points,
    1194            4 :                 &mut current_delta_split_point,
    1195            4 :                 self,
    1196            4 :                 gc_cutoff,
    1197            4 :                 ctx,
    1198            4 :             )
    1199            0 :             .await?,
    1200              :         );
    1201              : 
    1202            8 :         let image_layer = image_layer_writer.finish(self, ctx).await?;
    1203            4 :         let mut compact_to = Vec::new();
    1204            4 :         compact_to.extend(delta_layers);
    1205            4 :         compact_to.push(image_layer);
    1206              :         // Step 3: Place back to the layer map.
    1207              :         {
    1208            4 :             let mut guard = self.layers.write().await;
    1209            4 :             guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
    1210            4 :         };
    1211            4 :         Ok(())
    1212            4 :     }
    1213              : }
    1214              : 
    1215              : struct TimelineAdaptor {
    1216              :     timeline: Arc<Timeline>,
    1217              : 
    1218              :     keyspace: (Lsn, KeySpace),
    1219              : 
    1220              :     new_deltas: Vec<ResidentLayer>,
    1221              :     new_images: Vec<ResidentLayer>,
    1222              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
    1223              : }
    1224              : 
    1225              : impl TimelineAdaptor {
    1226            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
    1227            0 :         Self {
    1228            0 :             timeline: timeline.clone(),
    1229            0 :             keyspace,
    1230            0 :             new_images: Vec::new(),
    1231            0 :             new_deltas: Vec::new(),
    1232            0 :             layers_to_delete: Vec::new(),
    1233            0 :         }
    1234            0 :     }
    1235              : 
    1236            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
    1237            0 :         let layers_to_delete = {
    1238            0 :             let guard = self.timeline.layers.read().await;
    1239            0 :             self.layers_to_delete
    1240            0 :                 .iter()
    1241            0 :                 .map(|x| guard.get_from_desc(x))
    1242            0 :                 .collect::<Vec<Layer>>()
    1243            0 :         };
    1244            0 :         self.timeline
    1245            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
    1246            0 :             .await?;
    1247              : 
    1248            0 :         self.timeline
    1249            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
    1250              : 
    1251            0 :         self.new_deltas.clear();
    1252            0 :         self.layers_to_delete.clear();
    1253            0 :         Ok(())
    1254            0 :     }
    1255              : }
    1256              : 
    1257              : #[derive(Clone)]
    1258              : struct ResidentDeltaLayer(ResidentLayer);
    1259              : #[derive(Clone)]
    1260              : struct ResidentImageLayer(ResidentLayer);
    1261              : 
    1262              : impl CompactionJobExecutor for TimelineAdaptor {
    1263              :     type Key = crate::repository::Key;
    1264              : 
    1265              :     type Layer = OwnArc<PersistentLayerDesc>;
    1266              :     type DeltaLayer = ResidentDeltaLayer;
    1267              :     type ImageLayer = ResidentImageLayer;
    1268              : 
    1269              :     type RequestContext = crate::context::RequestContext;
    1270              : 
    1271            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    1272            0 :         self.timeline.get_shard_identity()
    1273            0 :     }
    1274              : 
    1275            0 :     async fn get_layers(
    1276            0 :         &mut self,
    1277            0 :         key_range: &Range<Key>,
    1278            0 :         lsn_range: &Range<Lsn>,
    1279            0 :         _ctx: &RequestContext,
    1280            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    1281            0 :         self.flush_updates().await?;
    1282              : 
    1283            0 :         let guard = self.timeline.layers.read().await;
    1284            0 :         let layer_map = guard.layer_map();
    1285            0 : 
    1286            0 :         let result = layer_map
    1287            0 :             .iter_historic_layers()
    1288            0 :             .filter(|l| {
    1289            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    1290            0 :             })
    1291            0 :             .map(OwnArc)
    1292            0 :             .collect();
    1293            0 :         Ok(result)
    1294            0 :     }
    1295              : 
    1296            0 :     async fn get_keyspace(
    1297            0 :         &mut self,
    1298            0 :         key_range: &Range<Key>,
    1299            0 :         lsn: Lsn,
    1300            0 :         _ctx: &RequestContext,
    1301            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    1302            0 :         if lsn == self.keyspace.0 {
    1303            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    1304            0 :                 &self.keyspace.1.ranges,
    1305            0 :                 key_range,
    1306            0 :             ))
    1307              :         } else {
    1308              :             // The current compaction implementatin only ever requests the key space
    1309              :             // at the compaction end LSN.
    1310            0 :             anyhow::bail!("keyspace not available for requested lsn");
    1311              :         }
    1312            0 :     }
    1313              : 
    1314            0 :     async fn downcast_delta_layer(
    1315            0 :         &self,
    1316            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1317            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    1318            0 :         // this is a lot more complex than a simple downcast...
    1319            0 :         if layer.is_delta() {
    1320            0 :             let l = {
    1321            0 :                 let guard = self.timeline.layers.read().await;
    1322            0 :                 guard.get_from_desc(layer)
    1323              :             };
    1324            0 :             let result = l.download_and_keep_resident().await?;
    1325              : 
    1326            0 :             Ok(Some(ResidentDeltaLayer(result)))
    1327              :         } else {
    1328            0 :             Ok(None)
    1329              :         }
    1330            0 :     }
    1331              : 
    1332            0 :     async fn create_image(
    1333            0 :         &mut self,
    1334            0 :         lsn: Lsn,
    1335            0 :         key_range: &Range<Key>,
    1336            0 :         ctx: &RequestContext,
    1337            0 :     ) -> anyhow::Result<()> {
    1338            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    1339            0 :     }
    1340              : 
    1341            0 :     async fn create_delta(
    1342            0 :         &mut self,
    1343            0 :         lsn_range: &Range<Lsn>,
    1344            0 :         key_range: &Range<Key>,
    1345            0 :         input_layers: &[ResidentDeltaLayer],
    1346            0 :         ctx: &RequestContext,
    1347            0 :     ) -> anyhow::Result<()> {
    1348            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1349              : 
    1350            0 :         let mut all_entries = Vec::new();
    1351            0 :         for dl in input_layers.iter() {
    1352            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    1353              :         }
    1354              : 
    1355              :         // The current stdlib sorting implementation is designed in a way where it is
    1356              :         // particularly fast where the slice is made up of sorted sub-ranges.
    1357            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1358              : 
    1359            0 :         let mut writer = DeltaLayerWriter::new(
    1360            0 :             self.timeline.conf,
    1361            0 :             self.timeline.timeline_id,
    1362            0 :             self.timeline.tenant_shard_id,
    1363            0 :             key_range.start,
    1364            0 :             lsn_range.clone(),
    1365            0 :             ctx,
    1366            0 :         )
    1367            0 :         .await?;
    1368              : 
    1369            0 :         let mut dup_values = 0;
    1370            0 : 
    1371            0 :         // This iterator walks through all key-value pairs from all the layers
    1372            0 :         // we're compacting, in key, LSN order.
    1373            0 :         let mut prev: Option<(Key, Lsn)> = None;
    1374              :         for &DeltaEntry {
    1375            0 :             key, lsn, ref val, ..
    1376            0 :         } in all_entries.iter()
    1377              :         {
    1378            0 :             if prev == Some((key, lsn)) {
    1379              :                 // This is a duplicate. Skip it.
    1380              :                 //
    1381              :                 // It can happen if compaction is interrupted after writing some
    1382              :                 // layers but not all, and we are compacting the range again.
    1383              :                 // The calculations in the algorithm assume that there are no
    1384              :                 // duplicates, so the math on targeted file size is likely off,
    1385              :                 // and we will create smaller files than expected.
    1386            0 :                 dup_values += 1;
    1387            0 :                 continue;
    1388            0 :             }
    1389              : 
    1390            0 :             let value = val.load(ctx).await?;
    1391              : 
    1392            0 :             writer.put_value(key, lsn, value, ctx).await?;
    1393              : 
    1394            0 :             prev = Some((key, lsn));
    1395              :         }
    1396              : 
    1397            0 :         if dup_values > 0 {
    1398            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    1399            0 :         }
    1400              : 
    1401            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1402            0 :             Err(anyhow::anyhow!(
    1403            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    1404            0 :             ))
    1405            0 :         });
    1406              : 
    1407            0 :         let new_delta_layer = writer
    1408            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    1409            0 :             .await?;
    1410              : 
    1411            0 :         self.new_deltas.push(new_delta_layer);
    1412            0 :         Ok(())
    1413            0 :     }
    1414              : 
    1415            0 :     async fn delete_layer(
    1416            0 :         &mut self,
    1417            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1418            0 :         _ctx: &RequestContext,
    1419            0 :     ) -> anyhow::Result<()> {
    1420            0 :         self.layers_to_delete.push(layer.clone().0);
    1421            0 :         Ok(())
    1422            0 :     }
    1423              : }
    1424              : 
    1425              : impl TimelineAdaptor {
    1426            0 :     async fn create_image_impl(
    1427            0 :         &mut self,
    1428            0 :         lsn: Lsn,
    1429            0 :         key_range: &Range<Key>,
    1430            0 :         ctx: &RequestContext,
    1431            0 :     ) -> Result<(), CreateImageLayersError> {
    1432            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    1433              : 
    1434            0 :         let image_layer_writer = ImageLayerWriter::new(
    1435            0 :             self.timeline.conf,
    1436            0 :             self.timeline.timeline_id,
    1437            0 :             self.timeline.tenant_shard_id,
    1438            0 :             key_range,
    1439            0 :             lsn,
    1440            0 :             ctx,
    1441            0 :         )
    1442            0 :         .await?;
    1443              : 
    1444            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1445            0 :             Err(CreateImageLayersError::Other(anyhow::anyhow!(
    1446            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1447            0 :             )))
    1448            0 :         });
    1449              : 
    1450            0 :         let keyspace = KeySpace {
    1451            0 :             ranges: self.get_keyspace(key_range, lsn, ctx).await?,
    1452              :         };
    1453              :         // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
    1454            0 :         let start = Key::MIN;
    1455              :         let ImageLayerCreationOutcome {
    1456            0 :             image,
    1457              :             next_start_key: _,
    1458            0 :         } = self
    1459            0 :             .timeline
    1460            0 :             .create_image_layer_for_rel_blocks(
    1461            0 :                 &keyspace,
    1462            0 :                 image_layer_writer,
    1463            0 :                 lsn,
    1464            0 :                 ctx,
    1465            0 :                 key_range.clone(),
    1466            0 :                 start,
    1467            0 :             )
    1468            0 :             .await?;
    1469              : 
    1470            0 :         if let Some(image_layer) = image {
    1471            0 :             self.new_images.push(image_layer);
    1472            0 :         }
    1473              : 
    1474            0 :         timer.stop_and_record();
    1475            0 : 
    1476            0 :         Ok(())
    1477            0 :     }
    1478              : }
    1479              : 
    1480              : impl CompactionRequestContext for crate::context::RequestContext {}
    1481              : 
    1482              : #[derive(Debug, Clone)]
    1483              : pub struct OwnArc<T>(pub Arc<T>);
    1484              : 
    1485              : impl<T> Deref for OwnArc<T> {
    1486              :     type Target = <Arc<T> as Deref>::Target;
    1487            0 :     fn deref(&self) -> &Self::Target {
    1488            0 :         &self.0
    1489            0 :     }
    1490              : }
    1491              : 
    1492              : impl<T> AsRef<T> for OwnArc<T> {
    1493            0 :     fn as_ref(&self) -> &T {
    1494            0 :         self.0.as_ref()
    1495            0 :     }
    1496              : }
    1497              : 
    1498              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1499            0 :     fn key_range(&self) -> &Range<Key> {
    1500            0 :         &self.key_range
    1501            0 :     }
    1502            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1503            0 :         &self.lsn_range
    1504            0 :     }
    1505            0 :     fn file_size(&self) -> u64 {
    1506            0 :         self.file_size
    1507            0 :     }
    1508            0 :     fn short_id(&self) -> std::string::String {
    1509            0 :         self.as_ref().short_id().to_string()
    1510            0 :     }
    1511            0 :     fn is_delta(&self) -> bool {
    1512            0 :         self.as_ref().is_delta()
    1513            0 :     }
    1514              : }
    1515              : 
    1516              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1517            0 :     fn key_range(&self) -> &Range<Key> {
    1518            0 :         &self.layer_desc().key_range
    1519            0 :     }
    1520            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1521            0 :         &self.layer_desc().lsn_range
    1522            0 :     }
    1523            0 :     fn file_size(&self) -> u64 {
    1524            0 :         self.layer_desc().file_size
    1525            0 :     }
    1526            0 :     fn short_id(&self) -> std::string::String {
    1527            0 :         self.layer_desc().short_id().to_string()
    1528            0 :     }
    1529            0 :     fn is_delta(&self) -> bool {
    1530            0 :         true
    1531            0 :     }
    1532              : }
    1533              : 
    1534              : use crate::tenant::timeline::DeltaEntry;
    1535              : 
    1536              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1537            0 :     fn key_range(&self) -> &Range<Key> {
    1538            0 :         &self.0.layer_desc().key_range
    1539            0 :     }
    1540            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1541            0 :         &self.0.layer_desc().lsn_range
    1542            0 :     }
    1543            0 :     fn file_size(&self) -> u64 {
    1544            0 :         self.0.layer_desc().file_size
    1545            0 :     }
    1546            0 :     fn short_id(&self) -> std::string::String {
    1547            0 :         self.0.layer_desc().short_id().to_string()
    1548            0 :     }
    1549            0 :     fn is_delta(&self) -> bool {
    1550            0 :         true
    1551            0 :     }
    1552              : }
    1553              : 
    1554              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1555              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1556              : 
    1557            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1558            0 :         self.0.load_keys(ctx).await
    1559            0 :     }
    1560              : }
    1561              : 
    1562              : impl CompactionLayer<Key> for ResidentImageLayer {
    1563            0 :     fn key_range(&self) -> &Range<Key> {
    1564            0 :         &self.0.layer_desc().key_range
    1565            0 :     }
    1566            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1567            0 :         &self.0.layer_desc().lsn_range
    1568            0 :     }
    1569            0 :     fn file_size(&self) -> u64 {
    1570            0 :         self.0.layer_desc().file_size
    1571            0 :     }
    1572            0 :     fn short_id(&self) -> std::string::String {
    1573            0 :         self.0.layer_desc().short_id().to_string()
    1574            0 :     }
    1575            0 :     fn is_delta(&self) -> bool {
    1576            0 :         false
    1577            0 :     }
    1578              : }
    1579              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta