LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 48.9 % 929 454
Test Date: 2024-05-21 18:28:29 Functions: 19.8 % 86 17

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{CompactFlags, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline};
      13              : 
      14              : use anyhow::{anyhow, Context};
      15              : use enumset::EnumSet;
      16              : use fail::fail_point;
      17              : use itertools::Itertools;
      18              : use pageserver_api::keyspace::ShardedRange;
      19              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      22              : use utils::id::TimelineId;
      23              : 
      24              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      25              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      26              : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
      27              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      28              : use crate::tenant::timeline::{Layer, ResidentLayer};
      29              : use crate::tenant::DeltaLayer;
      30              : use crate::tenant::PageReconstructError;
      31              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      32              : use crate::{page_cache, ZERO_PAGE};
      33              : 
      34              : use crate::keyspace::KeySpace;
      35              : use crate::repository::Key;
      36              : 
      37              : use utils::lsn::Lsn;
      38              : 
      39              : use pageserver_compaction::helpers::overlaps_with;
      40              : use pageserver_compaction::interface::*;
      41              : 
      42              : use super::CompactionError;
      43              : 
      44              : impl Timeline {
      45              :     /// TODO: cancellation
      46          366 :     pub(crate) async fn compact_legacy(
      47          366 :         self: &Arc<Self>,
      48          366 :         _cancel: &CancellationToken,
      49          366 :         flags: EnumSet<CompactFlags>,
      50          366 :         ctx: &RequestContext,
      51          366 :     ) -> Result<(), CompactionError> {
      52          366 :         // High level strategy for compaction / image creation:
      53          366 :         //
      54          366 :         // 1. First, calculate the desired "partitioning" of the
      55          366 :         // currently in-use key space. The goal is to partition the
      56          366 :         // key space into roughly fixed-size chunks, but also take into
      57          366 :         // account any existing image layers, and try to align the
      58          366 :         // chunk boundaries with the existing image layers to avoid
      59          366 :         // too much churn. Also try to align chunk boundaries with
      60          366 :         // relation boundaries.  In principle, we don't know about
      61          366 :         // relation boundaries here, we just deal with key-value
      62          366 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      63          366 :         // map relations into key-value pairs. But in practice we know
      64          366 :         // that 'field6' is the block number, and the fields 1-5
      65          366 :         // identify a relation. This is just an optimization,
      66          366 :         // though.
      67          366 :         //
      68          366 :         // 2. Once we know the partitioning, for each partition,
      69          366 :         // decide if it's time to create a new image layer. The
      70          366 :         // criteria is: there has been too much "churn" since the last
      71          366 :         // image layer? The "churn" is fuzzy concept, it's a
      72          366 :         // combination of too many delta files, or too much WAL in
      73          366 :         // total in the delta file. Or perhaps: if creating an image
      74          366 :         // file would allow to delete some older files.
      75          366 :         //
      76          366 :         // 3. After that, we compact all level0 delta files if there
      77          366 :         // are too many of them.  While compacting, we also garbage
      78          366 :         // collect any page versions that are no longer needed because
      79          366 :         // of the new image layers we created in step 2.
      80          366 :         //
      81          366 :         // TODO: This high level strategy hasn't been implemented yet.
      82          366 :         // Below are functions compact_level0() and create_image_layers()
      83          366 :         // but they are a bit ad hoc and don't quite work like it's explained
      84          366 :         // above. Rewrite it.
      85          366 : 
      86          366 :         // Is the timeline being deleted?
      87          366 :         if self.is_stopping() {
      88            0 :             trace!("Dropping out of compaction on timeline shutdown");
      89            0 :             return Err(CompactionError::ShuttingDown);
      90          366 :         }
      91          366 : 
      92          366 :         let target_file_size = self.get_checkpoint_distance();
      93              : 
      94              :         // Define partitioning schema if needed
      95              : 
      96              :         // FIXME: the match should only cover repartitioning, not the next steps
      97          366 :         let partition_count = match self
      98          366 :             .repartition(
      99          366 :                 self.get_last_record_lsn(),
     100          366 :                 self.get_compaction_target_size(),
     101          366 :                 flags,
     102          366 :                 ctx,
     103          366 :             )
     104        13419 :             .await
     105              :         {
     106          366 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     107          366 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     108          366 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     109          366 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     110          366 :                     .build();
     111          366 : 
     112          366 :                 // 2. Compact
     113          366 :                 let timer = self.metrics.compact_time_histo.start_timer();
     114        42316 :                 self.compact_level0(target_file_size, ctx).await?;
     115          366 :                 timer.stop_and_record();
     116          366 : 
     117          366 :                 // 3. Create new image layers for partitions that have been modified
     118          366 :                 // "enough".
     119          366 :                 let mut partitioning = dense_partitioning;
     120          366 :                 partitioning
     121          366 :                     .parts
     122          366 :                     .extend(sparse_partitioning.into_dense().parts);
     123          366 :                 let image_layers = self
     124          366 :                     .create_image_layers(
     125          366 :                         &partitioning,
     126          366 :                         lsn,
     127          366 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     128           16 :                             ImageLayerCreationMode::Force
     129              :                         } else {
     130          350 :                             ImageLayerCreationMode::Try
     131              :                         },
     132          366 :                         &image_ctx,
     133              :                     )
     134        14689 :                     .await
     135          366 :                     .map_err(anyhow::Error::from)?;
     136              : 
     137          366 :                 self.upload_new_image_layers(image_layers)?;
     138          366 :                 partitioning.parts.len()
     139              :             }
     140            0 :             Err(err) => {
     141            0 :                 // no partitioning? This is normal, if the timeline was just created
     142            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     143            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     144            0 :                 // error but continue.
     145            0 :                 //
     146            0 :                 // Suppress error when it's due to cancellation
     147            0 :                 if !self.cancel.is_cancelled() {
     148            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     149            0 :                 }
     150            0 :                 1
     151              :             }
     152              :         };
     153              : 
     154          366 :         if self.shard_identity.count >= ShardCount::new(2) {
     155              :             // Limit the number of layer rewrites to the number of partitions: this means its
     156              :             // runtime should be comparable to a full round of image layer creations, rather than
     157              :             // being potentially much longer.
     158            0 :             let rewrite_max = partition_count;
     159            0 : 
     160            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     161          366 :         }
     162              : 
     163          366 :         Ok(())
     164          366 :     }
     165              : 
     166              :     /// Check for layers that are elegible to be rewritten:
     167              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     168              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     169              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     170              :     ///   rather not maintain compatibility with indefinitely.
     171              :     ///
     172              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     173              :     /// how much work it will try to do in each compaction pass.
     174            0 :     async fn compact_shard_ancestors(
     175            0 :         self: &Arc<Self>,
     176            0 :         rewrite_max: usize,
     177            0 :         _ctx: &RequestContext,
     178            0 :     ) -> anyhow::Result<()> {
     179            0 :         let mut drop_layers = Vec::new();
     180            0 :         let layers_to_rewrite: Vec<Layer> = Vec::new();
     181            0 : 
     182            0 :         // We will use the PITR cutoff as a condition for rewriting layers.
     183            0 :         let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
     184              : 
     185            0 :         let layers = self.layers.read().await;
     186            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     187            0 :             let layer = layers.get_from_desc(&layer_desc);
     188            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     189              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     190            0 :                 continue;
     191            0 :             }
     192            0 : 
     193            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     194            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     195            0 :             let layer_local_page_count = sharded_range.page_count();
     196            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     197            0 :             if layer_local_page_count == 0 {
     198              :                 // This ancestral layer only covers keys that belong to other shards.
     199              :                 // We include the full metadata in the log: if we had some critical bug that caused
     200              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     201            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     202            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     203              :                 );
     204              : 
     205            0 :                 if cfg!(debug_assertions) {
     206              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     207              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     208              :                     // should be !is_key_disposable()
     209            0 :                     let range = layer_desc.get_key_range();
     210            0 :                     let mut key = range.start;
     211            0 :                     while key < range.end {
     212            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     213            0 :                         key = key.next();
     214              :                     }
     215            0 :                 }
     216              : 
     217            0 :                 drop_layers.push(layer);
     218            0 :                 continue;
     219            0 :             } else if layer_local_page_count != u32::MAX
     220            0 :                 && layer_local_page_count == layer_raw_page_count
     221              :             {
     222            0 :                 debug!(%layer,
     223            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     224              :                     layer_local_page_count
     225              :                 );
     226            0 :                 continue;
     227            0 :             }
     228            0 : 
     229            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     230            0 :             if layer_local_page_count != u32::MAX
     231            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     232              :             {
     233            0 :                 debug!(%layer,
     234            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     235              :                     layer_local_page_count,
     236              :                     layer_raw_page_count
     237              :                 );
     238            0 :             }
     239              : 
     240              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     241              :             // without incurring the I/O cost of a rewrite.
     242            0 :             if layer_desc.get_lsn_range().end >= pitr_cutoff {
     243            0 :                 debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
     244            0 :                     layer_desc.get_lsn_range().end, pitr_cutoff);
     245            0 :                 continue;
     246            0 :             }
     247            0 : 
     248            0 :             if layer_desc.is_delta() {
     249              :                 // We do not yet implement rewrite of delta layers
     250            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     251            0 :                 continue;
     252            0 :             }
     253            0 : 
     254            0 :             // Only rewrite layers if they would have different remote paths: either they belong to this
     255            0 :             // shard but an old generation, or they belonged to another shard.  This also implicitly
     256            0 :             // guarantees that the layer is persistent in remote storage (as only remote persistent
     257            0 :             // layers are carried across shard splits, any local-only layer would be in the current generation)
     258            0 :             if layer.metadata().generation == self.generation
     259            0 :                 && layer.metadata().shard.shard_count == self.shard_identity.count
     260              :             {
     261            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     262            0 :                 continue;
     263            0 :             }
     264            0 : 
     265            0 :             if layers_to_rewrite.len() >= rewrite_max {
     266            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     267            0 :                     layers_to_rewrite.len()
     268              :                 );
     269            0 :                 continue;
     270            0 :             }
     271            0 : 
     272            0 :             // Fall through: all our conditions for doing a rewrite passed.
     273            0 :             // TODO: implement rewriting
     274            0 :             tracing::debug!(%layer, "Would rewrite layer");
     275              :         }
     276              : 
     277              :         // Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
     278            0 :         drop(layers);
     279            0 : 
     280            0 :         // TODO: collect layers to rewrite
     281            0 :         let replace_layers = Vec::new();
     282            0 : 
     283            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     284            0 :         self.rewrite_layers(replace_layers, drop_layers).await?;
     285              : 
     286              :         // We wait for all uploads to complete before finishing this compaction stage.  This is not
     287              :         // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     288              :         // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     289              :         // load.
     290            0 :         self.remote_client.wait_completion().await?;
     291              : 
     292            0 :         Ok(())
     293            0 :     }
     294              : 
     295              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     296              :     /// as Level 1 files.
     297          366 :     async fn compact_level0(
     298          366 :         self: &Arc<Self>,
     299          366 :         target_file_size: u64,
     300          366 :         ctx: &RequestContext,
     301          366 :     ) -> Result<(), CompactionError> {
     302              :         let CompactLevel0Phase1Result {
     303          366 :             new_layers,
     304          366 :             deltas_to_compact,
     305              :         } = {
     306          366 :             let phase1_span = info_span!("compact_level0_phase1");
     307          366 :             let ctx = ctx.attached_child();
     308          366 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     309          366 :                 version: Some(2),
     310          366 :                 tenant_id: Some(self.tenant_shard_id),
     311          366 :                 timeline_id: Some(self.timeline_id),
     312          366 :                 ..Default::default()
     313          366 :             };
     314          366 : 
     315          366 :             let begin = tokio::time::Instant::now();
     316          366 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     317          366 :             let now = tokio::time::Instant::now();
     318          366 :             stats.read_lock_acquisition_micros =
     319          366 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     320          366 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     321          366 :                 .instrument(phase1_span)
     322        42315 :                 .await?
     323              :         };
     324              : 
     325          366 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     326              :             // nothing to do
     327          338 :             return Ok(());
     328           28 :         }
     329           28 : 
     330           28 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     331            0 :             .await?;
     332           28 :         Ok(())
     333          366 :     }
     334              : 
     335              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     336          366 :     async fn compact_level0_phase1(
     337          366 :         self: &Arc<Self>,
     338          366 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     339          366 :         mut stats: CompactLevel0Phase1StatsBuilder,
     340          366 :         target_file_size: u64,
     341          366 :         ctx: &RequestContext,
     342          366 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     343          366 :         stats.read_lock_held_spawn_blocking_startup_micros =
     344          366 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     345          366 :         let layers = guard.layer_map();
     346          366 :         let level0_deltas = layers.get_level0_deltas()?;
     347          366 :         let mut level0_deltas = level0_deltas
     348          366 :             .into_iter()
     349         1542 :             .map(|x| guard.get_from_desc(&x))
     350          366 :             .collect_vec();
     351          366 :         stats.level0_deltas_count = Some(level0_deltas.len());
     352          366 :         // Only compact if enough layers have accumulated.
     353          366 :         let threshold = self.get_compaction_threshold();
     354          366 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     355          338 :             debug!(
     356            0 :                 level0_deltas = level0_deltas.len(),
     357            0 :                 threshold, "too few deltas to compact"
     358              :             );
     359          338 :             return Ok(CompactLevel0Phase1Result::default());
     360           28 :         }
     361           28 : 
     362           28 :         // This failpoint is used together with `test_duplicate_layers` integration test.
     363           28 :         // It returns the compaction result exactly the same layers as input to compaction.
     364           28 :         // We want to ensure that this will not cause any problem when updating the layer map
     365           28 :         // after the compaction is finished.
     366           28 :         //
     367           28 :         // Currently, there are two rare edge cases that will cause duplicated layers being
     368           28 :         // inserted.
     369           28 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
     370           28 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
     371           28 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
     372           28 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
     373           28 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
     374           28 :         //    layer replace instead of the normal remove / upload process.
     375           28 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
     376           28 :         //    size length. Compaction will likely create the same set of n files afterwards.
     377           28 :         //
     378           28 :         // This failpoint is a superset of both of the cases.
     379           28 :         if cfg!(feature = "testing") {
     380           28 :             let active = (|| {
     381           28 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
     382           28 :                 false
     383           28 :             })();
     384           28 : 
     385           28 :             if active {
     386            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
     387            0 :                 for delta in &level0_deltas {
     388              :                     // we are just faking these layers as being produced again for this failpoint
     389            0 :                     new_layers.push(
     390            0 :                         delta
     391            0 :                             .download_and_keep_resident()
     392            0 :                             .await
     393            0 :                             .context("download layer for failpoint")?,
     394              :                     );
     395              :                 }
     396            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
     397            0 :                 return Ok(CompactLevel0Phase1Result {
     398            0 :                     new_layers,
     399            0 :                     deltas_to_compact: level0_deltas,
     400            0 :                 });
     401           28 :             }
     402            0 :         }
     403              : 
     404              :         // Gather the files to compact in this iteration.
     405              :         //
     406              :         // Start with the oldest Level 0 delta file, and collect any other
     407              :         // level 0 files that form a contiguous sequence, such that the end
     408              :         // LSN of previous file matches the start LSN of the next file.
     409              :         //
     410              :         // Note that if the files don't form such a sequence, we might
     411              :         // "compact" just a single file. That's a bit pointless, but it allows
     412              :         // us to get rid of the level 0 file, and compact the other files on
     413              :         // the next iteration. This could probably made smarter, but such
     414              :         // "gaps" in the sequence of level 0 files should only happen in case
     415              :         // of a crash, partial download from cloud storage, or something like
     416              :         // that, so it's not a big deal in practice.
     417          588 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     418           28 :         let mut level0_deltas_iter = level0_deltas.iter();
     419           28 : 
     420           28 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     421           28 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     422           28 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     423           28 : 
     424           28 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     425          322 :         for l in level0_deltas_iter {
     426          294 :             let lsn_range = &l.layer_desc().lsn_range;
     427          294 : 
     428          294 :             if lsn_range.start != prev_lsn_end {
     429            0 :                 break;
     430          294 :             }
     431          294 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     432          294 :             prev_lsn_end = lsn_range.end;
     433              :         }
     434           28 :         let lsn_range = Range {
     435           28 :             start: deltas_to_compact
     436           28 :                 .first()
     437           28 :                 .unwrap()
     438           28 :                 .layer_desc()
     439           28 :                 .lsn_range
     440           28 :                 .start,
     441           28 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     442           28 :         };
     443           28 : 
     444           28 :         info!(
     445            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     446            0 :             lsn_range.start,
     447            0 :             lsn_range.end,
     448            0 :             deltas_to_compact.len(),
     449            0 :             level0_deltas.len()
     450              :         );
     451              : 
     452          322 :         for l in deltas_to_compact.iter() {
     453          322 :             info!("compact includes {l}");
     454              :         }
     455              : 
     456              :         // We don't need the original list of layers anymore. Drop it so that
     457              :         // we don't accidentally use it later in the function.
     458           28 :         drop(level0_deltas);
     459           28 : 
     460           28 :         stats.read_lock_held_prerequisites_micros = stats
     461           28 :             .read_lock_held_spawn_blocking_startup_micros
     462           28 :             .till_now();
     463           28 : 
     464           28 :         // Determine N largest holes where N is number of compacted layers.
     465           28 :         let max_holes = deltas_to_compact.len();
     466           28 :         let last_record_lsn = self.get_last_record_lsn();
     467           28 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     468           28 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     469           28 : 
     470           28 :         // min-heap (reserve space for one more element added before eviction)
     471           28 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     472           28 :         let mut prev: Option<Key> = None;
     473           28 : 
     474           28 :         let mut all_keys = Vec::new();
     475              : 
     476          322 :         for l in deltas_to_compact.iter() {
     477         2390 :             all_keys.extend(l.load_keys(ctx).await?);
     478              :         }
     479              : 
     480              :         // FIXME: should spawn_blocking the rest of this function
     481              : 
     482              :         // The current stdlib sorting implementation is designed in a way where it is
     483              :         // particularly fast where the slice is made up of sorted sub-ranges.
     484      4431762 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     485           28 : 
     486           28 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     487              : 
     488      2064038 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     489      2064038 :             if let Some(prev_key) = prev {
     490              :                 // just first fast filter, do not create hole entries for metadata keys. The last hole in the
     491              :                 // compaction is the gap between data key and metadata keys.
     492      2064010 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
     493            0 :                     && !Key::is_metadata_key(&prev_key)
     494              :                 {
     495            0 :                     let key_range = prev_key..next_key;
     496            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     497            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     498            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     499            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     500            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     501            0 :                     if coverage_size >= min_hole_coverage_size {
     502            0 :                         heap.push(Hole {
     503            0 :                             key_range,
     504            0 :                             coverage_size,
     505            0 :                         });
     506            0 :                         if heap.len() > max_holes {
     507            0 :                             heap.pop(); // remove smallest hole
     508            0 :                         }
     509            0 :                     }
     510      2064010 :                 }
     511           28 :             }
     512      2064038 :             prev = Some(next_key.next());
     513              :         }
     514           28 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     515           28 :         drop_rlock(guard);
     516           28 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     517           28 :         let mut holes = heap.into_vec();
     518           28 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     519           28 :         let mut next_hole = 0; // index of next hole in holes vector
     520           28 : 
     521           28 :         // This iterator walks through all key-value pairs from all the layers
     522           28 :         // we're compacting, in key, LSN order.
     523           28 :         let all_values_iter = all_keys.iter();
     524           28 : 
     525           28 :         // This iterator walks through all keys and is needed to calculate size used by each key
     526           28 :         let mut all_keys_iter = all_keys
     527           28 :             .iter()
     528      2064038 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     529      2064010 :             .coalesce(|mut prev, cur| {
     530      2064010 :                 // Coalesce keys that belong to the same key pair.
     531      2064010 :                 // This ensures that compaction doesn't put them
     532      2064010 :                 // into different layer files.
     533      2064010 :                 // Still limit this by the target file size,
     534      2064010 :                 // so that we keep the size of the files in
     535      2064010 :                 // check.
     536      2064010 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     537        40038 :                     prev.2 += cur.2;
     538        40038 :                     Ok(prev)
     539              :                 } else {
     540      2023972 :                     Err((prev, cur))
     541              :                 }
     542      2064010 :             });
     543           28 : 
     544           28 :         // Merge the contents of all the input delta layers into a new set
     545           28 :         // of delta layers, based on the current partitioning.
     546           28 :         //
     547           28 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     548           28 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     549           28 :         // would be too large. In that case, we also split on the LSN dimension.
     550           28 :         //
     551           28 :         // LSN
     552           28 :         //  ^
     553           28 :         //  |
     554           28 :         //  | +-----------+            +--+--+--+--+
     555           28 :         //  | |           |            |  |  |  |  |
     556           28 :         //  | +-----------+            |  |  |  |  |
     557           28 :         //  | |           |            |  |  |  |  |
     558           28 :         //  | +-----------+     ==>    |  |  |  |  |
     559           28 :         //  | |           |            |  |  |  |  |
     560           28 :         //  | +-----------+            |  |  |  |  |
     561           28 :         //  | |           |            |  |  |  |  |
     562           28 :         //  | +-----------+            +--+--+--+--+
     563           28 :         //  |
     564           28 :         //  +--------------> key
     565           28 :         //
     566           28 :         //
     567           28 :         // If one key (X) has a lot of page versions:
     568           28 :         //
     569           28 :         // LSN
     570           28 :         //  ^
     571           28 :         //  |                                 (X)
     572           28 :         //  | +-----------+            +--+--+--+--+
     573           28 :         //  | |           |            |  |  |  |  |
     574           28 :         //  | +-----------+            |  |  +--+  |
     575           28 :         //  | |           |            |  |  |  |  |
     576           28 :         //  | +-----------+     ==>    |  |  |  |  |
     577           28 :         //  | |           |            |  |  +--+  |
     578           28 :         //  | +-----------+            |  |  |  |  |
     579           28 :         //  | |           |            |  |  |  |  |
     580           28 :         //  | +-----------+            +--+--+--+--+
     581           28 :         //  |
     582           28 :         //  +--------------> key
     583           28 :         // TODO: this actually divides the layers into fixed-size chunks, not
     584           28 :         // based on the partitioning.
     585           28 :         //
     586           28 :         // TODO: we should also opportunistically materialize and
     587           28 :         // garbage collect what we can.
     588           28 :         let mut new_layers = Vec::new();
     589           28 :         let mut prev_key: Option<Key> = None;
     590           28 :         let mut writer: Option<DeltaLayerWriter> = None;
     591           28 :         let mut key_values_total_size = 0u64;
     592           28 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     593           28 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     594              : 
     595              :         for &DeltaEntry {
     596      2064038 :             key, lsn, ref val, ..
     597      2064066 :         } in all_values_iter
     598              :         {
     599      2064038 :             let value = val.load(ctx).await?;
     600      2064038 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     601      2064038 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     602      2064038 :             if !same_key || lsn == dup_end_lsn {
     603      2024000 :                 let mut next_key_size = 0u64;
     604      2024000 :                 let is_dup_layer = dup_end_lsn.is_valid();
     605      2024000 :                 dup_start_lsn = Lsn::INVALID;
     606      2024000 :                 if !same_key {
     607      2024000 :                     dup_end_lsn = Lsn::INVALID;
     608      2024000 :                 }
     609              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     610      2024000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     611      2024000 :                     next_key_size = next_size;
     612      2024000 :                     if key != next_key {
     613      2023972 :                         if dup_end_lsn.is_valid() {
     614            0 :                             // We are writting segment with duplicates:
     615            0 :                             // place all remaining values of this key in separate segment
     616            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     617            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     618      2023972 :                         }
     619      2023972 :                         break;
     620           28 :                     }
     621           28 :                     key_values_total_size += next_size;
     622           28 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     623           28 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     624           28 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     625              :                         // Split key between multiple layers: such layer can contain only single key
     626            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     627            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     628              :                         } else {
     629            0 :                             lsn // start with the first LSN for this key
     630              :                         };
     631            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     632            0 :                         break;
     633           28 :                     }
     634              :                 }
     635              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     636      2024000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     637            0 :                     dup_start_lsn = dup_end_lsn;
     638            0 :                     dup_end_lsn = lsn_range.end;
     639      2024000 :                 }
     640      2024000 :                 if writer.is_some() {
     641      2023972 :                     let written_size = writer.as_mut().unwrap().size();
     642      2023972 :                     let contains_hole =
     643      2023972 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     644              :                     // check if key cause layer overflow or contains hole...
     645      2023972 :                     if is_dup_layer
     646      2023972 :                         || dup_end_lsn.is_valid()
     647      2023972 :                         || written_size + key_values_total_size > target_file_size
     648      2023772 :                         || contains_hole
     649              :                     {
     650              :                         // ... if so, flush previous layer and prepare to write new one
     651          200 :                         new_layers.push(
     652          200 :                             writer
     653          200 :                                 .take()
     654          200 :                                 .unwrap()
     655          200 :                                 .finish(prev_key.unwrap().next(), self, ctx)
     656          509 :                                 .await?,
     657              :                         );
     658          200 :                         writer = None;
     659          200 : 
     660          200 :                         if contains_hole {
     661            0 :                             // skip hole
     662            0 :                             next_hole += 1;
     663          200 :                         }
     664      2023772 :                     }
     665           28 :                 }
     666              :                 // Remember size of key value because at next iteration we will access next item
     667      2024000 :                 key_values_total_size = next_key_size;
     668        40038 :             }
     669      2064038 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     670            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     671            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     672            0 :                 )))
     673      2064038 :             });
     674              : 
     675      2064038 :             if !self.shard_identity.is_key_disposable(&key) {
     676      2064038 :                 if writer.is_none() {
     677          228 :                     // Create writer if not initiaized yet
     678          228 :                     writer = Some(
     679              :                         DeltaLayerWriter::new(
     680          228 :                             self.conf,
     681          228 :                             self.timeline_id,
     682          228 :                             self.tenant_shard_id,
     683          228 :                             key,
     684          228 :                             if dup_end_lsn.is_valid() {
     685              :                                 // this is a layer containing slice of values of the same key
     686            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     687            0 :                                 dup_start_lsn..dup_end_lsn
     688              :                             } else {
     689          228 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     690          228 :                                 lsn_range.clone()
     691              :                             },
     692          228 :                             ctx,
     693              :                         )
     694          114 :                         .await?,
     695              :                     );
     696      2063810 :                 }
     697              : 
     698      2064038 :                 writer
     699      2064038 :                     .as_mut()
     700      2064038 :                     .unwrap()
     701      2064038 :                     .put_value(key, lsn, value, ctx)
     702         1570 :                     .await?;
     703              :             } else {
     704            0 :                 debug!(
     705            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     706            0 :                     key,
     707            0 :                     self.shard_identity.get_shard_number(&key)
     708              :                 );
     709              :             }
     710              : 
     711      2064038 :             if !new_layers.is_empty() {
     712        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     713      2044252 :             }
     714              : 
     715      2064038 :             prev_key = Some(key);
     716              :         }
     717           28 :         if let Some(writer) = writer {
     718         1992 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
     719            0 :         }
     720              : 
     721              :         // Sync layers
     722           28 :         if !new_layers.is_empty() {
     723              :             // Print a warning if the created layer is larger than double the target size
     724              :             // Add two pages for potential overhead. This should in theory be already
     725              :             // accounted for in the target calculation, but for very small targets,
     726              :             // we still might easily hit the limit otherwise.
     727           28 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     728          228 :             for layer in new_layers.iter() {
     729          228 :                 if layer.layer_desc().file_size > warn_limit {
     730            0 :                     warn!(
     731              :                         %layer,
     732            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     733              :                     );
     734          228 :                 }
     735              :             }
     736              : 
     737              :             // The writer.finish() above already did the fsync of the inodes.
     738              :             // We just need to fsync the directory in which these inodes are linked,
     739              :             // which we know to be the timeline directory.
     740              :             //
     741              :             // We use fatal_err() below because the after writer.finish() returns with success,
     742              :             // the in-memory state of the filesystem already has the layer file in its final place,
     743              :             // and subsequent pageserver code could think it's durable while it really isn't.
     744           28 :             let timeline_dir = VirtualFile::open(
     745           28 :                 &self
     746           28 :                     .conf
     747           28 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     748           28 :                 ctx,
     749           28 :             )
     750           14 :             .await
     751           28 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     752           28 :             timeline_dir
     753           28 :                 .sync_all()
     754           14 :                 .await
     755           28 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     756            0 :         }
     757              : 
     758           28 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     759           28 :         stats.new_deltas_count = Some(new_layers.len());
     760          228 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     761           28 : 
     762           28 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     763           28 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     764              :         {
     765           28 :             Ok(stats_json) => {
     766           28 :                 info!(
     767            0 :                     stats_json = stats_json.as_str(),
     768            0 :                     "compact_level0_phase1 stats available"
     769              :                 )
     770              :             }
     771            0 :             Err(e) => {
     772            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     773              :             }
     774              :         }
     775              : 
     776           28 :         Ok(CompactLevel0Phase1Result {
     777           28 :             new_layers,
     778           28 :             deltas_to_compact: deltas_to_compact
     779           28 :                 .into_iter()
     780          322 :                 .map(|x| x.drop_eviction_guard())
     781           28 :                 .collect::<Vec<_>>(),
     782           28 :         })
     783          366 :     }
     784              : }
     785              : 
     786              : #[derive(Default)]
     787              : struct CompactLevel0Phase1Result {
     788              :     new_layers: Vec<ResidentLayer>,
     789              :     deltas_to_compact: Vec<Layer>,
     790              : }
     791              : 
     792              : #[derive(Default)]
     793              : struct CompactLevel0Phase1StatsBuilder {
     794              :     version: Option<u64>,
     795              :     tenant_id: Option<TenantShardId>,
     796              :     timeline_id: Option<TimelineId>,
     797              :     read_lock_acquisition_micros: DurationRecorder,
     798              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     799              :     read_lock_held_key_sort_micros: DurationRecorder,
     800              :     read_lock_held_prerequisites_micros: DurationRecorder,
     801              :     read_lock_held_compute_holes_micros: DurationRecorder,
     802              :     read_lock_drop_micros: DurationRecorder,
     803              :     write_layer_files_micros: DurationRecorder,
     804              :     level0_deltas_count: Option<usize>,
     805              :     new_deltas_count: Option<usize>,
     806              :     new_deltas_size: Option<u64>,
     807              : }
     808              : 
     809              : #[derive(serde::Serialize)]
     810              : struct CompactLevel0Phase1Stats {
     811              :     version: u64,
     812              :     tenant_id: TenantShardId,
     813              :     timeline_id: TimelineId,
     814              :     read_lock_acquisition_micros: RecordedDuration,
     815              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     816              :     read_lock_held_key_sort_micros: RecordedDuration,
     817              :     read_lock_held_prerequisites_micros: RecordedDuration,
     818              :     read_lock_held_compute_holes_micros: RecordedDuration,
     819              :     read_lock_drop_micros: RecordedDuration,
     820              :     write_layer_files_micros: RecordedDuration,
     821              :     level0_deltas_count: usize,
     822              :     new_deltas_count: usize,
     823              :     new_deltas_size: u64,
     824              : }
     825              : 
     826              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     827              :     type Error = anyhow::Error;
     828              : 
     829           28 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     830           28 :         Ok(Self {
     831           28 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     832           28 :             tenant_id: value
     833           28 :                 .tenant_id
     834           28 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     835           28 :             timeline_id: value
     836           28 :                 .timeline_id
     837           28 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     838           28 :             read_lock_acquisition_micros: value
     839           28 :                 .read_lock_acquisition_micros
     840           28 :                 .into_recorded()
     841           28 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     842           28 :             read_lock_held_spawn_blocking_startup_micros: value
     843           28 :                 .read_lock_held_spawn_blocking_startup_micros
     844           28 :                 .into_recorded()
     845           28 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     846           28 :             read_lock_held_key_sort_micros: value
     847           28 :                 .read_lock_held_key_sort_micros
     848           28 :                 .into_recorded()
     849           28 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     850           28 :             read_lock_held_prerequisites_micros: value
     851           28 :                 .read_lock_held_prerequisites_micros
     852           28 :                 .into_recorded()
     853           28 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     854           28 :             read_lock_held_compute_holes_micros: value
     855           28 :                 .read_lock_held_compute_holes_micros
     856           28 :                 .into_recorded()
     857           28 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     858           28 :             read_lock_drop_micros: value
     859           28 :                 .read_lock_drop_micros
     860           28 :                 .into_recorded()
     861           28 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     862           28 :             write_layer_files_micros: value
     863           28 :                 .write_layer_files_micros
     864           28 :                 .into_recorded()
     865           28 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     866           28 :             level0_deltas_count: value
     867           28 :                 .level0_deltas_count
     868           28 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     869           28 :             new_deltas_count: value
     870           28 :                 .new_deltas_count
     871           28 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     872           28 :             new_deltas_size: value
     873           28 :                 .new_deltas_size
     874           28 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     875              :         })
     876           28 :     }
     877              : }
     878              : 
     879              : impl Timeline {
     880              :     /// Entry point for new tiered compaction algorithm.
     881              :     ///
     882              :     /// All the real work is in the implementation in the pageserver_compaction
     883              :     /// crate. The code here would apply to any algorithm implemented by the
     884              :     /// same interface, but tiered is the only one at the moment.
     885              :     ///
     886              :     /// TODO: cancellation
     887            0 :     pub(crate) async fn compact_tiered(
     888            0 :         self: &Arc<Self>,
     889            0 :         _cancel: &CancellationToken,
     890            0 :         ctx: &RequestContext,
     891            0 :     ) -> Result<(), CompactionError> {
     892            0 :         let fanout = self.get_compaction_threshold() as u64;
     893            0 :         let target_file_size = self.get_checkpoint_distance();
     894              : 
     895              :         // Find the top of the historical layers
     896            0 :         let end_lsn = {
     897            0 :             let guard = self.layers.read().await;
     898            0 :             let layers = guard.layer_map();
     899              : 
     900            0 :             let l0_deltas = layers.get_level0_deltas()?;
     901            0 :             drop(guard);
     902            0 : 
     903            0 :             // As an optimization, if we find that there are too few L0 layers,
     904            0 :             // bail out early. We know that the compaction algorithm would do
     905            0 :             // nothing in that case.
     906            0 :             if l0_deltas.len() < fanout as usize {
     907              :                 // doesn't need compacting
     908            0 :                 return Ok(());
     909            0 :             }
     910            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     911            0 :         };
     912            0 : 
     913            0 :         // Is the timeline being deleted?
     914            0 :         if self.is_stopping() {
     915            0 :             trace!("Dropping out of compaction on timeline shutdown");
     916            0 :             return Err(CompactionError::ShuttingDown);
     917            0 :         }
     918              : 
     919            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
     920              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
     921            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
     922            0 : 
     923            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     924            0 :             &mut adaptor,
     925            0 :             end_lsn,
     926            0 :             target_file_size,
     927            0 :             fanout,
     928            0 :             ctx,
     929            0 :         )
     930            0 :         .await?;
     931              : 
     932            0 :         adaptor.flush_updates().await?;
     933            0 :         Ok(())
     934            0 :     }
     935              : }
     936              : 
     937              : struct TimelineAdaptor {
     938              :     timeline: Arc<Timeline>,
     939              : 
     940              :     keyspace: (Lsn, KeySpace),
     941              : 
     942              :     new_deltas: Vec<ResidentLayer>,
     943              :     new_images: Vec<ResidentLayer>,
     944              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
     945              : }
     946              : 
     947              : impl TimelineAdaptor {
     948            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
     949            0 :         Self {
     950            0 :             timeline: timeline.clone(),
     951            0 :             keyspace,
     952            0 :             new_images: Vec::new(),
     953            0 :             new_deltas: Vec::new(),
     954            0 :             layers_to_delete: Vec::new(),
     955            0 :         }
     956            0 :     }
     957              : 
     958            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
     959            0 :         let layers_to_delete = {
     960            0 :             let guard = self.timeline.layers.read().await;
     961            0 :             self.layers_to_delete
     962            0 :                 .iter()
     963            0 :                 .map(|x| guard.get_from_desc(x))
     964            0 :                 .collect::<Vec<Layer>>()
     965            0 :         };
     966            0 :         self.timeline
     967            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
     968            0 :             .await?;
     969              : 
     970            0 :         self.timeline
     971            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
     972              : 
     973            0 :         self.new_deltas.clear();
     974            0 :         self.layers_to_delete.clear();
     975            0 :         Ok(())
     976            0 :     }
     977              : }
     978              : 
     979              : #[derive(Clone)]
     980              : struct ResidentDeltaLayer(ResidentLayer);
     981              : #[derive(Clone)]
     982              : struct ResidentImageLayer(ResidentLayer);
     983              : 
     984              : impl CompactionJobExecutor for TimelineAdaptor {
     985              :     type Key = crate::repository::Key;
     986              : 
     987              :     type Layer = OwnArc<PersistentLayerDesc>;
     988              :     type DeltaLayer = ResidentDeltaLayer;
     989              :     type ImageLayer = ResidentImageLayer;
     990              : 
     991              :     type RequestContext = crate::context::RequestContext;
     992              : 
     993            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
     994            0 :         self.timeline.get_shard_identity()
     995            0 :     }
     996              : 
     997            0 :     async fn get_layers(
     998            0 :         &mut self,
     999            0 :         key_range: &Range<Key>,
    1000            0 :         lsn_range: &Range<Lsn>,
    1001            0 :         _ctx: &RequestContext,
    1002            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    1003            0 :         self.flush_updates().await?;
    1004              : 
    1005            0 :         let guard = self.timeline.layers.read().await;
    1006            0 :         let layer_map = guard.layer_map();
    1007            0 : 
    1008            0 :         let result = layer_map
    1009            0 :             .iter_historic_layers()
    1010            0 :             .filter(|l| {
    1011            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    1012            0 :             })
    1013            0 :             .map(OwnArc)
    1014            0 :             .collect();
    1015            0 :         Ok(result)
    1016            0 :     }
    1017              : 
    1018            0 :     async fn get_keyspace(
    1019            0 :         &mut self,
    1020            0 :         key_range: &Range<Key>,
    1021            0 :         lsn: Lsn,
    1022            0 :         _ctx: &RequestContext,
    1023            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    1024            0 :         if lsn == self.keyspace.0 {
    1025            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    1026            0 :                 &self.keyspace.1.ranges,
    1027            0 :                 key_range,
    1028            0 :             ))
    1029              :         } else {
    1030              :             // The current compaction implementatin only ever requests the key space
    1031              :             // at the compaction end LSN.
    1032            0 :             anyhow::bail!("keyspace not available for requested lsn");
    1033              :         }
    1034            0 :     }
    1035              : 
    1036            0 :     async fn downcast_delta_layer(
    1037            0 :         &self,
    1038            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1039            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    1040            0 :         // this is a lot more complex than a simple downcast...
    1041            0 :         if layer.is_delta() {
    1042            0 :             let l = {
    1043            0 :                 let guard = self.timeline.layers.read().await;
    1044            0 :                 guard.get_from_desc(layer)
    1045              :             };
    1046            0 :             let result = l.download_and_keep_resident().await?;
    1047              : 
    1048            0 :             Ok(Some(ResidentDeltaLayer(result)))
    1049              :         } else {
    1050            0 :             Ok(None)
    1051              :         }
    1052            0 :     }
    1053              : 
    1054            0 :     async fn create_image(
    1055            0 :         &mut self,
    1056            0 :         lsn: Lsn,
    1057            0 :         key_range: &Range<Key>,
    1058            0 :         ctx: &RequestContext,
    1059            0 :     ) -> anyhow::Result<()> {
    1060            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    1061            0 :     }
    1062              : 
    1063            0 :     async fn create_delta(
    1064            0 :         &mut self,
    1065            0 :         lsn_range: &Range<Lsn>,
    1066            0 :         key_range: &Range<Key>,
    1067            0 :         input_layers: &[ResidentDeltaLayer],
    1068            0 :         ctx: &RequestContext,
    1069            0 :     ) -> anyhow::Result<()> {
    1070            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1071              : 
    1072            0 :         let mut all_entries = Vec::new();
    1073            0 :         for dl in input_layers.iter() {
    1074            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    1075              :         }
    1076              : 
    1077              :         // The current stdlib sorting implementation is designed in a way where it is
    1078              :         // particularly fast where the slice is made up of sorted sub-ranges.
    1079            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1080              : 
    1081            0 :         let mut writer = DeltaLayerWriter::new(
    1082            0 :             self.timeline.conf,
    1083            0 :             self.timeline.timeline_id,
    1084            0 :             self.timeline.tenant_shard_id,
    1085            0 :             key_range.start,
    1086            0 :             lsn_range.clone(),
    1087            0 :             ctx,
    1088            0 :         )
    1089            0 :         .await?;
    1090              : 
    1091            0 :         let mut dup_values = 0;
    1092            0 : 
    1093            0 :         // This iterator walks through all key-value pairs from all the layers
    1094            0 :         // we're compacting, in key, LSN order.
    1095            0 :         let mut prev: Option<(Key, Lsn)> = None;
    1096              :         for &DeltaEntry {
    1097            0 :             key, lsn, ref val, ..
    1098            0 :         } in all_entries.iter()
    1099              :         {
    1100            0 :             if prev == Some((key, lsn)) {
    1101              :                 // This is a duplicate. Skip it.
    1102              :                 //
    1103              :                 // It can happen if compaction is interrupted after writing some
    1104              :                 // layers but not all, and we are compacting the range again.
    1105              :                 // The calculations in the algorithm assume that there are no
    1106              :                 // duplicates, so the math on targeted file size is likely off,
    1107              :                 // and we will create smaller files than expected.
    1108            0 :                 dup_values += 1;
    1109            0 :                 continue;
    1110            0 :             }
    1111              : 
    1112            0 :             let value = val.load(ctx).await?;
    1113              : 
    1114            0 :             writer.put_value(key, lsn, value, ctx).await?;
    1115              : 
    1116            0 :             prev = Some((key, lsn));
    1117              :         }
    1118              : 
    1119            0 :         if dup_values > 0 {
    1120            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    1121            0 :         }
    1122              : 
    1123            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1124            0 :             Err(anyhow::anyhow!(
    1125            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    1126            0 :             ))
    1127            0 :         });
    1128              : 
    1129            0 :         let new_delta_layer = writer
    1130            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    1131            0 :             .await?;
    1132              : 
    1133            0 :         self.new_deltas.push(new_delta_layer);
    1134            0 :         Ok(())
    1135            0 :     }
    1136              : 
    1137            0 :     async fn delete_layer(
    1138            0 :         &mut self,
    1139            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1140            0 :         _ctx: &RequestContext,
    1141            0 :     ) -> anyhow::Result<()> {
    1142            0 :         self.layers_to_delete.push(layer.clone().0);
    1143            0 :         Ok(())
    1144            0 :     }
    1145              : }
    1146              : 
    1147              : impl TimelineAdaptor {
    1148            0 :     async fn create_image_impl(
    1149            0 :         &mut self,
    1150            0 :         lsn: Lsn,
    1151            0 :         key_range: &Range<Key>,
    1152            0 :         ctx: &RequestContext,
    1153            0 :     ) -> Result<(), PageReconstructError> {
    1154            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    1155              : 
    1156            0 :         let mut image_layer_writer = ImageLayerWriter::new(
    1157            0 :             self.timeline.conf,
    1158            0 :             self.timeline.timeline_id,
    1159            0 :             self.timeline.tenant_shard_id,
    1160            0 :             key_range,
    1161            0 :             lsn,
    1162            0 :             ctx,
    1163            0 :         )
    1164            0 :         .await?;
    1165              : 
    1166            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1167            0 :             Err(PageReconstructError::Other(anyhow::anyhow!(
    1168            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1169            0 :             )))
    1170            0 :         });
    1171            0 :         let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
    1172            0 :         for range in &keyspace_ranges {
    1173            0 :             let mut key = range.start;
    1174            0 :             while key < range.end {
    1175            0 :                 let img = match self.timeline.get(key, lsn, ctx).await {
    1176            0 :                     Ok(img) => img,
    1177            0 :                     Err(err) => {
    1178            0 :                         // If we fail to reconstruct a VM or FSM page, we can zero the
    1179            0 :                         // page without losing any actual user data. That seems better
    1180            0 :                         // than failing repeatedly and getting stuck.
    1181            0 :                         //
    1182            0 :                         // We had a bug at one point, where we truncated the FSM and VM
    1183            0 :                         // in the pageserver, but the Postgres didn't know about that
    1184            0 :                         // and continued to generate incremental WAL records for pages
    1185            0 :                         // that didn't exist in the pageserver. Trying to replay those
    1186            0 :                         // WAL records failed to find the previous image of the page.
    1187            0 :                         // This special case allows us to recover from that situation.
    1188            0 :                         // See https://github.com/neondatabase/neon/issues/2601.
    1189            0 :                         //
    1190            0 :                         // Unfortunately we cannot do this for the main fork, or for
    1191            0 :                         // any metadata keys, keys, as that would lead to actual data
    1192            0 :                         // loss.
    1193            0 :                         if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    1194            0 :                             warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    1195            0 :                             ZERO_PAGE.clone()
    1196              :                         } else {
    1197            0 :                             return Err(err);
    1198              :                         }
    1199              :                     }
    1200              :                 };
    1201            0 :                 image_layer_writer.put_image(key, img, ctx).await?;
    1202            0 :                 key = key.next();
    1203              :             }
    1204              :         }
    1205            0 :         let image_layer = image_layer_writer.finish(&self.timeline, ctx).await?;
    1206              : 
    1207            0 :         self.new_images.push(image_layer);
    1208            0 : 
    1209            0 :         timer.stop_and_record();
    1210            0 : 
    1211            0 :         Ok(())
    1212            0 :     }
    1213              : }
    1214              : 
    1215              : impl CompactionRequestContext for crate::context::RequestContext {}
    1216              : 
    1217              : #[derive(Debug, Clone)]
    1218              : pub struct OwnArc<T>(pub Arc<T>);
    1219              : 
    1220              : impl<T> Deref for OwnArc<T> {
    1221              :     type Target = <Arc<T> as Deref>::Target;
    1222            0 :     fn deref(&self) -> &Self::Target {
    1223            0 :         &self.0
    1224            0 :     }
    1225              : }
    1226              : 
    1227              : impl<T> AsRef<T> for OwnArc<T> {
    1228            0 :     fn as_ref(&self) -> &T {
    1229            0 :         self.0.as_ref()
    1230            0 :     }
    1231              : }
    1232              : 
    1233              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1234            0 :     fn key_range(&self) -> &Range<Key> {
    1235            0 :         &self.key_range
    1236            0 :     }
    1237            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1238            0 :         &self.lsn_range
    1239            0 :     }
    1240            0 :     fn file_size(&self) -> u64 {
    1241            0 :         self.file_size
    1242            0 :     }
    1243            0 :     fn short_id(&self) -> std::string::String {
    1244            0 :         self.as_ref().short_id().to_string()
    1245            0 :     }
    1246            0 :     fn is_delta(&self) -> bool {
    1247            0 :         self.as_ref().is_delta()
    1248            0 :     }
    1249              : }
    1250              : 
    1251              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1252            0 :     fn key_range(&self) -> &Range<Key> {
    1253            0 :         &self.layer_desc().key_range
    1254            0 :     }
    1255            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1256            0 :         &self.layer_desc().lsn_range
    1257            0 :     }
    1258            0 :     fn file_size(&self) -> u64 {
    1259            0 :         self.layer_desc().file_size
    1260            0 :     }
    1261            0 :     fn short_id(&self) -> std::string::String {
    1262            0 :         self.layer_desc().short_id().to_string()
    1263            0 :     }
    1264            0 :     fn is_delta(&self) -> bool {
    1265            0 :         true
    1266            0 :     }
    1267              : }
    1268              : 
    1269              : use crate::tenant::timeline::DeltaEntry;
    1270              : 
    1271              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1272            0 :     fn key_range(&self) -> &Range<Key> {
    1273            0 :         &self.0.layer_desc().key_range
    1274            0 :     }
    1275            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1276            0 :         &self.0.layer_desc().lsn_range
    1277            0 :     }
    1278            0 :     fn file_size(&self) -> u64 {
    1279            0 :         self.0.layer_desc().file_size
    1280            0 :     }
    1281            0 :     fn short_id(&self) -> std::string::String {
    1282            0 :         self.0.layer_desc().short_id().to_string()
    1283            0 :     }
    1284            0 :     fn is_delta(&self) -> bool {
    1285            0 :         true
    1286            0 :     }
    1287              : }
    1288              : 
    1289              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1290              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1291              : 
    1292            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1293            0 :         self.0.load_keys(ctx).await
    1294            0 :     }
    1295              : }
    1296              : 
    1297              : impl CompactionLayer<Key> for ResidentImageLayer {
    1298            0 :     fn key_range(&self) -> &Range<Key> {
    1299            0 :         &self.0.layer_desc().key_range
    1300            0 :     }
    1301            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1302            0 :         &self.0.layer_desc().lsn_range
    1303            0 :     }
    1304            0 :     fn file_size(&self) -> u64 {
    1305            0 :         self.0.layer_desc().file_size
    1306            0 :     }
    1307            0 :     fn short_id(&self) -> std::string::String {
    1308            0 :         self.0.layer_desc().short_id().to_string()
    1309            0 :     }
    1310            0 :     fn is_delta(&self) -> bool {
    1311            0 :         false
    1312            0 :     }
    1313              : }
    1314              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta