LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 48.7 % 930 453
Test Date: 2024-05-10 13:18:37 Functions: 19.8 % 86 17

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{CompactFlags, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline};
      13              : 
      14              : use anyhow::{anyhow, Context};
      15              : use enumset::EnumSet;
      16              : use fail::fail_point;
      17              : use itertools::Itertools;
      18              : use pageserver_api::keyspace::ShardedRange;
      19              : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      22              : use utils::id::TimelineId;
      23              : 
      24              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      25              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      26              : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
      27              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      28              : use crate::tenant::timeline::{Layer, ResidentLayer};
      29              : use crate::tenant::DeltaLayer;
      30              : use crate::tenant::PageReconstructError;
      31              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      32              : use crate::{page_cache, ZERO_PAGE};
      33              : 
      34              : use crate::keyspace::KeySpace;
      35              : use crate::repository::Key;
      36              : 
      37              : use utils::lsn::Lsn;
      38              : 
      39              : use pageserver_compaction::helpers::overlaps_with;
      40              : use pageserver_compaction::interface::*;
      41              : 
      42              : use super::CompactionError;
      43              : 
      44              : impl Timeline {
      45              :     /// TODO: cancellation
      46          330 :     pub(crate) async fn compact_legacy(
      47          330 :         self: &Arc<Self>,
      48          330 :         _cancel: &CancellationToken,
      49          330 :         flags: EnumSet<CompactFlags>,
      50          330 :         ctx: &RequestContext,
      51          330 :     ) -> Result<(), CompactionError> {
      52          330 :         // High level strategy for compaction / image creation:
      53          330 :         //
      54          330 :         // 1. First, calculate the desired "partitioning" of the
      55          330 :         // currently in-use key space. The goal is to partition the
      56          330 :         // key space into roughly fixed-size chunks, but also take into
      57          330 :         // account any existing image layers, and try to align the
      58          330 :         // chunk boundaries with the existing image layers to avoid
      59          330 :         // too much churn. Also try to align chunk boundaries with
      60          330 :         // relation boundaries.  In principle, we don't know about
      61          330 :         // relation boundaries here, we just deal with key-value
      62          330 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      63          330 :         // map relations into key-value pairs. But in practice we know
      64          330 :         // that 'field6' is the block number, and the fields 1-5
      65          330 :         // identify a relation. This is just an optimization,
      66          330 :         // though.
      67          330 :         //
      68          330 :         // 2. Once we know the partitioning, for each partition,
      69          330 :         // decide if it's time to create a new image layer. The
      70          330 :         // criteria is: there has been too much "churn" since the last
      71          330 :         // image layer? The "churn" is fuzzy concept, it's a
      72          330 :         // combination of too many delta files, or too much WAL in
      73          330 :         // total in the delta file. Or perhaps: if creating an image
      74          330 :         // file would allow to delete some older files.
      75          330 :         //
      76          330 :         // 3. After that, we compact all level0 delta files if there
      77          330 :         // are too many of them.  While compacting, we also garbage
      78          330 :         // collect any page versions that are no longer needed because
      79          330 :         // of the new image layers we created in step 2.
      80          330 :         //
      81          330 :         // TODO: This high level strategy hasn't been implemented yet.
      82          330 :         // Below are functions compact_level0() and create_image_layers()
      83          330 :         // but they are a bit ad hoc and don't quite work like it's explained
      84          330 :         // above. Rewrite it.
      85          330 : 
      86          330 :         // Is the timeline being deleted?
      87          330 :         if self.is_stopping() {
      88            0 :             trace!("Dropping out of compaction on timeline shutdown");
      89            0 :             return Err(CompactionError::ShuttingDown);
      90          330 :         }
      91          330 : 
      92          330 :         let target_file_size = self.get_checkpoint_distance();
      93              : 
      94              :         // Define partitioning schema if needed
      95              : 
      96              :         // FIXME: the match should only cover repartitioning, not the next steps
      97          330 :         let partition_count = match self
      98          330 :             .repartition(
      99          330 :                 self.get_last_record_lsn(),
     100          330 :                 self.get_compaction_target_size(),
     101          330 :                 flags,
     102          330 :                 ctx,
     103          330 :             )
     104        13269 :             .await
     105              :         {
     106          330 :             Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
     107          330 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     108          330 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     109          330 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     110          330 :                     .build();
     111          330 : 
     112          330 :                 // 2. Compact
     113          330 :                 let timer = self.metrics.compact_time_histo.start_timer();
     114        41663 :                 self.compact_level0(target_file_size, ctx).await?;
     115          330 :                 timer.stop_and_record();
     116              : 
     117              :                 // 3. Create new image layers for partitions that have been modified
     118              :                 // "enough".
     119          330 :                 let dense_layers = self
     120          330 :                     .create_image_layers(
     121          330 :                         &dense_partitioning,
     122          330 :                         lsn,
     123          330 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     124            0 :                             ImageLayerCreationMode::Force
     125              :                         } else {
     126          330 :                             ImageLayerCreationMode::Try
     127              :                         },
     128          330 :                         &image_ctx,
     129              :                     )
     130            1 :                     .await
     131          330 :                     .map_err(anyhow::Error::from)?;
     132              : 
     133              :                 // For now, nothing will be produced...
     134          330 :                 let sparse_layers = self
     135          330 :                     .create_image_layers(
     136          330 :                         &sparse_partitioning.clone().into_dense(),
     137          330 :                         lsn,
     138          330 :                         if flags.contains(CompactFlags::ForceImageLayerCreation) {
     139            0 :                             ImageLayerCreationMode::Force
     140              :                         } else {
     141          330 :                             ImageLayerCreationMode::Try
     142              :                         },
     143          330 :                         &image_ctx,
     144              :                     )
     145            0 :                     .await
     146          330 :                     .map_err(anyhow::Error::from)?;
     147          330 :                 assert!(sparse_layers.is_empty());
     148              : 
     149          330 :                 self.upload_new_image_layers(dense_layers)?;
     150          330 :                 dense_partitioning.parts.len()
     151              :             }
     152            0 :             Err(err) => {
     153            0 :                 // no partitioning? This is normal, if the timeline was just created
     154            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     155            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     156            0 :                 // error but continue.
     157            0 :                 //
     158            0 :                 // Suppress error when it's due to cancellation
     159            0 :                 if !self.cancel.is_cancelled() {
     160            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     161            0 :                 }
     162            0 :                 1
     163              :             }
     164              :         };
     165              : 
     166          330 :         if self.shard_identity.count >= ShardCount::new(2) {
     167              :             // Limit the number of layer rewrites to the number of partitions: this means its
     168              :             // runtime should be comparable to a full round of image layer creations, rather than
     169              :             // being potentially much longer.
     170            0 :             let rewrite_max = partition_count;
     171            0 : 
     172            0 :             self.compact_shard_ancestors(rewrite_max, ctx).await?;
     173          330 :         }
     174              : 
     175          330 :         Ok(())
     176          330 :     }
     177              : 
     178              :     /// Check for layers that are elegible to be rewritten:
     179              :     /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
     180              :     ///   we don't indefinitely retain keys in this shard that aren't needed.
     181              :     /// - For future use: layers beyond pitr_interval that are in formats we would
     182              :     ///   rather not maintain compatibility with indefinitely.
     183              :     ///
     184              :     /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
     185              :     /// how much work it will try to do in each compaction pass.
     186            0 :     async fn compact_shard_ancestors(
     187            0 :         self: &Arc<Self>,
     188            0 :         rewrite_max: usize,
     189            0 :         _ctx: &RequestContext,
     190            0 :     ) -> anyhow::Result<()> {
     191            0 :         let mut drop_layers = Vec::new();
     192            0 :         let layers_to_rewrite: Vec<Layer> = Vec::new();
     193            0 : 
     194            0 :         // We will use the PITR cutoff as a condition for rewriting layers.
     195            0 :         let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
     196              : 
     197            0 :         let layers = self.layers.read().await;
     198            0 :         for layer_desc in layers.layer_map().iter_historic_layers() {
     199            0 :             let layer = layers.get_from_desc(&layer_desc);
     200            0 :             if layer.metadata().shard.shard_count == self.shard_identity.count {
     201              :                 // This layer does not belong to a historic ancestor, no need to re-image it.
     202            0 :                 continue;
     203            0 :             }
     204            0 : 
     205            0 :             // This layer was created on an ancestor shard: check if it contains any data for this shard.
     206            0 :             let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
     207            0 :             let layer_local_page_count = sharded_range.page_count();
     208            0 :             let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
     209            0 :             if layer_local_page_count == 0 {
     210              :                 // This ancestral layer only covers keys that belong to other shards.
     211              :                 // We include the full metadata in the log: if we had some critical bug that caused
     212              :                 // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
     213            0 :                 info!(%layer, old_metadata=?layer.metadata(),
     214            0 :                     "dropping layer after shard split, contains no keys for this shard.",
     215              :                 );
     216              : 
     217            0 :                 if cfg!(debug_assertions) {
     218              :                     // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
     219              :                     // wrong.  If ShardedRange claims the local page count is zero, then no keys in this layer
     220              :                     // should be !is_key_disposable()
     221            0 :                     let range = layer_desc.get_key_range();
     222            0 :                     let mut key = range.start;
     223            0 :                     while key < range.end {
     224            0 :                         debug_assert!(self.shard_identity.is_key_disposable(&key));
     225            0 :                         key = key.next();
     226              :                     }
     227            0 :                 }
     228              : 
     229            0 :                 drop_layers.push(layer);
     230            0 :                 continue;
     231            0 :             } else if layer_local_page_count != u32::MAX
     232            0 :                 && layer_local_page_count == layer_raw_page_count
     233              :             {
     234            0 :                 debug!(%layer,
     235            0 :                     "layer is entirely shard local ({} keys), no need to filter it",
     236              :                     layer_local_page_count
     237              :                 );
     238            0 :                 continue;
     239            0 :             }
     240            0 : 
     241            0 :             // Don't bother re-writing a layer unless it will at least halve its size
     242            0 :             if layer_local_page_count != u32::MAX
     243            0 :                 && layer_local_page_count > layer_raw_page_count / 2
     244              :             {
     245            0 :                 debug!(%layer,
     246            0 :                     "layer is already mostly local ({}/{}), not rewriting",
     247              :                     layer_local_page_count,
     248              :                     layer_raw_page_count
     249              :                 );
     250            0 :             }
     251              : 
     252              :             // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
     253              :             // without incurring the I/O cost of a rewrite.
     254            0 :             if layer_desc.get_lsn_range().end >= pitr_cutoff {
     255            0 :                 debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
     256            0 :                     layer_desc.get_lsn_range().end, pitr_cutoff);
     257            0 :                 continue;
     258            0 :             }
     259            0 : 
     260            0 :             if layer_desc.is_delta() {
     261              :                 // We do not yet implement rewrite of delta layers
     262            0 :                 debug!(%layer, "Skipping rewrite of delta layer");
     263            0 :                 continue;
     264            0 :             }
     265            0 : 
     266            0 :             // Only rewrite layers if they would have different remote paths: either they belong to this
     267            0 :             // shard but an old generation, or they belonged to another shard.  This also implicitly
     268            0 :             // guarantees that the layer is persistent in remote storage (as only remote persistent
     269            0 :             // layers are carried across shard splits, any local-only layer would be in the current generation)
     270            0 :             if layer.metadata().generation == self.generation
     271            0 :                 && layer.metadata().shard.shard_count == self.shard_identity.count
     272              :             {
     273            0 :                 debug!(%layer, "Skipping rewrite, is not from old generation");
     274            0 :                 continue;
     275            0 :             }
     276            0 : 
     277            0 :             if layers_to_rewrite.len() >= rewrite_max {
     278            0 :                 tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
     279            0 :                     layers_to_rewrite.len()
     280              :                 );
     281            0 :                 continue;
     282            0 :             }
     283            0 : 
     284            0 :             // Fall through: all our conditions for doing a rewrite passed.
     285            0 :             // TODO: implement rewriting
     286            0 :             tracing::debug!(%layer, "Would rewrite layer");
     287              :         }
     288              : 
     289              :         // Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
     290            0 :         drop(layers);
     291            0 : 
     292            0 :         // TODO: collect layers to rewrite
     293            0 :         let replace_layers = Vec::new();
     294            0 : 
     295            0 :         // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
     296            0 :         self.rewrite_layers(replace_layers, drop_layers).await?;
     297              : 
     298            0 :         if let Some(remote_client) = self.remote_client.as_ref() {
     299              :             // We wait for all uploads to complete before finishing this compaction stage.  This is not
     300              :             // necessary for correctness, but it simplifies testing, and avoids proceeding with another
     301              :             // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
     302              :             // load.
     303            0 :             remote_client.wait_completion().await?;
     304            0 :         }
     305              : 
     306            0 :         Ok(())
     307            0 :     }
     308              : 
     309              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     310              :     /// as Level 1 files.
     311          330 :     async fn compact_level0(
     312          330 :         self: &Arc<Self>,
     313          330 :         target_file_size: u64,
     314          330 :         ctx: &RequestContext,
     315          330 :     ) -> Result<(), CompactionError> {
     316              :         let CompactLevel0Phase1Result {
     317          330 :             new_layers,
     318          330 :             deltas_to_compact,
     319              :         } = {
     320          330 :             let phase1_span = info_span!("compact_level0_phase1");
     321          330 :             let ctx = ctx.attached_child();
     322          330 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     323          330 :                 version: Some(2),
     324          330 :                 tenant_id: Some(self.tenant_shard_id),
     325          330 :                 timeline_id: Some(self.timeline_id),
     326          330 :                 ..Default::default()
     327          330 :             };
     328          330 : 
     329          330 :             let begin = tokio::time::Instant::now();
     330          330 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     331          330 :             let now = tokio::time::Instant::now();
     332          330 :             stats.read_lock_acquisition_micros =
     333          330 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     334          330 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     335          330 :                 .instrument(phase1_span)
     336        41663 :                 .await?
     337              :         };
     338              : 
     339          330 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     340              :             // nothing to do
     341          306 :             return Ok(());
     342           24 :         }
     343           24 : 
     344           24 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     345            0 :             .await?;
     346           24 :         Ok(())
     347          330 :     }
     348              : 
     349              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     350          330 :     async fn compact_level0_phase1(
     351          330 :         self: &Arc<Self>,
     352          330 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     353          330 :         mut stats: CompactLevel0Phase1StatsBuilder,
     354          330 :         target_file_size: u64,
     355          330 :         ctx: &RequestContext,
     356          330 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     357          330 :         stats.read_lock_held_spawn_blocking_startup_micros =
     358          330 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     359          330 :         let layers = guard.layer_map();
     360          330 :         let level0_deltas = layers.get_level0_deltas()?;
     361          330 :         let mut level0_deltas = level0_deltas
     362          330 :             .into_iter()
     363         1372 :             .map(|x| guard.get_from_desc(&x))
     364          330 :             .collect_vec();
     365          330 :         stats.level0_deltas_count = Some(level0_deltas.len());
     366          330 :         // Only compact if enough layers have accumulated.
     367          330 :         let threshold = self.get_compaction_threshold();
     368          330 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     369          306 :             debug!(
     370            0 :                 level0_deltas = level0_deltas.len(),
     371            0 :                 threshold, "too few deltas to compact"
     372              :             );
     373          306 :             return Ok(CompactLevel0Phase1Result::default());
     374           24 :         }
     375           24 : 
     376           24 :         // This failpoint is used together with `test_duplicate_layers` integration test.
     377           24 :         // It returns the compaction result exactly the same layers as input to compaction.
     378           24 :         // We want to ensure that this will not cause any problem when updating the layer map
     379           24 :         // after the compaction is finished.
     380           24 :         //
     381           24 :         // Currently, there are two rare edge cases that will cause duplicated layers being
     382           24 :         // inserted.
     383           24 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
     384           24 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
     385           24 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
     386           24 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
     387           24 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
     388           24 :         //    layer replace instead of the normal remove / upload process.
     389           24 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
     390           24 :         //    size length. Compaction will likely create the same set of n files afterwards.
     391           24 :         //
     392           24 :         // This failpoint is a superset of both of the cases.
     393           24 :         if cfg!(feature = "testing") {
     394           24 :             let active = (|| {
     395           24 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
     396           24 :                 false
     397           24 :             })();
     398           24 : 
     399           24 :             if active {
     400            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
     401            0 :                 for delta in &level0_deltas {
     402              :                     // we are just faking these layers as being produced again for this failpoint
     403            0 :                     new_layers.push(
     404            0 :                         delta
     405            0 :                             .download_and_keep_resident()
     406            0 :                             .await
     407            0 :                             .context("download layer for failpoint")?,
     408              :                     );
     409              :                 }
     410            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
     411            0 :                 return Ok(CompactLevel0Phase1Result {
     412            0 :                     new_layers,
     413            0 :                     deltas_to_compact: level0_deltas,
     414            0 :                 });
     415           24 :             }
     416            0 :         }
     417              : 
     418              :         // Gather the files to compact in this iteration.
     419              :         //
     420              :         // Start with the oldest Level 0 delta file, and collect any other
     421              :         // level 0 files that form a contiguous sequence, such that the end
     422              :         // LSN of previous file matches the start LSN of the next file.
     423              :         //
     424              :         // Note that if the files don't form such a sequence, we might
     425              :         // "compact" just a single file. That's a bit pointless, but it allows
     426              :         // us to get rid of the level 0 file, and compact the other files on
     427              :         // the next iteration. This could probably made smarter, but such
     428              :         // "gaps" in the sequence of level 0 files should only happen in case
     429              :         // of a crash, partial download from cloud storage, or something like
     430              :         // that, so it's not a big deal in practice.
     431          476 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     432           24 :         let mut level0_deltas_iter = level0_deltas.iter();
     433           24 : 
     434           24 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     435           24 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     436           24 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     437           24 : 
     438           24 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     439          262 :         for l in level0_deltas_iter {
     440          238 :             let lsn_range = &l.layer_desc().lsn_range;
     441          238 : 
     442          238 :             if lsn_range.start != prev_lsn_end {
     443            0 :                 break;
     444          238 :             }
     445          238 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     446          238 :             prev_lsn_end = lsn_range.end;
     447              :         }
     448           24 :         let lsn_range = Range {
     449           24 :             start: deltas_to_compact
     450           24 :                 .first()
     451           24 :                 .unwrap()
     452           24 :                 .layer_desc()
     453           24 :                 .lsn_range
     454           24 :                 .start,
     455           24 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     456           24 :         };
     457           24 : 
     458           24 :         info!(
     459            0 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     460            0 :             lsn_range.start,
     461            0 :             lsn_range.end,
     462            0 :             deltas_to_compact.len(),
     463            0 :             level0_deltas.len()
     464              :         );
     465              : 
     466          262 :         for l in deltas_to_compact.iter() {
     467          262 :             info!("compact includes {l}");
     468              :         }
     469              : 
     470              :         // We don't need the original list of layers anymore. Drop it so that
     471              :         // we don't accidentally use it later in the function.
     472           24 :         drop(level0_deltas);
     473           24 : 
     474           24 :         stats.read_lock_held_prerequisites_micros = stats
     475           24 :             .read_lock_held_spawn_blocking_startup_micros
     476           24 :             .till_now();
     477           24 : 
     478           24 :         // Determine N largest holes where N is number of compacted layers.
     479           24 :         let max_holes = deltas_to_compact.len();
     480           24 :         let last_record_lsn = self.get_last_record_lsn();
     481           24 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     482           24 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     483           24 : 
     484           24 :         // min-heap (reserve space for one more element added before eviction)
     485           24 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     486           24 :         let mut prev: Option<Key> = None;
     487           24 : 
     488           24 :         let mut all_keys = Vec::new();
     489              : 
     490          262 :         for l in deltas_to_compact.iter() {
     491         2276 :             all_keys.extend(l.load_keys(ctx).await?);
     492              :         }
     493              : 
     494              :         // FIXME: should spawn_blocking the rest of this function
     495              : 
     496              :         // The current stdlib sorting implementation is designed in a way where it is
     497              :         // particularly fast where the slice is made up of sorted sub-ranges.
     498      4235848 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     499           24 : 
     500           24 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     501              : 
     502      2041998 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     503      2041998 :             if let Some(prev_key) = prev {
     504              :                 // just first fast filter
     505      2041974 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
     506            0 :                     let key_range = prev_key..next_key;
     507            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     508            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     509            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     510            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     511            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     512            0 :                     if coverage_size >= min_hole_coverage_size {
     513            0 :                         heap.push(Hole {
     514            0 :                             key_range,
     515            0 :                             coverage_size,
     516            0 :                         });
     517            0 :                         if heap.len() > max_holes {
     518            0 :                             heap.pop(); // remove smallest hole
     519            0 :                         }
     520            0 :                     }
     521      2041974 :                 }
     522           24 :             }
     523      2041998 :             prev = Some(next_key.next());
     524              :         }
     525           24 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     526           24 :         drop_rlock(guard);
     527           24 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     528           24 :         let mut holes = heap.into_vec();
     529           24 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     530           24 :         let mut next_hole = 0; // index of next hole in holes vector
     531           24 : 
     532           24 :         // This iterator walks through all key-value pairs from all the layers
     533           24 :         // we're compacting, in key, LSN order.
     534           24 :         let all_values_iter = all_keys.iter();
     535           24 : 
     536           24 :         // This iterator walks through all keys and is needed to calculate size used by each key
     537           24 :         let mut all_keys_iter = all_keys
     538           24 :             .iter()
     539      2041998 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     540      2041974 :             .coalesce(|mut prev, cur| {
     541      2041974 :                 // Coalesce keys that belong to the same key pair.
     542      2041974 :                 // This ensures that compaction doesn't put them
     543      2041974 :                 // into different layer files.
     544      2041974 :                 // Still limit this by the target file size,
     545      2041974 :                 // so that we keep the size of the files in
     546      2041974 :                 // check.
     547      2041974 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     548        20000 :                     prev.2 += cur.2;
     549        20000 :                     Ok(prev)
     550              :                 } else {
     551      2021974 :                     Err((prev, cur))
     552              :                 }
     553      2041974 :             });
     554           24 : 
     555           24 :         // Merge the contents of all the input delta layers into a new set
     556           24 :         // of delta layers, based on the current partitioning.
     557           24 :         //
     558           24 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     559           24 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     560           24 :         // would be too large. In that case, we also split on the LSN dimension.
     561           24 :         //
     562           24 :         // LSN
     563           24 :         //  ^
     564           24 :         //  |
     565           24 :         //  | +-----------+            +--+--+--+--+
     566           24 :         //  | |           |            |  |  |  |  |
     567           24 :         //  | +-----------+            |  |  |  |  |
     568           24 :         //  | |           |            |  |  |  |  |
     569           24 :         //  | +-----------+     ==>    |  |  |  |  |
     570           24 :         //  | |           |            |  |  |  |  |
     571           24 :         //  | +-----------+            |  |  |  |  |
     572           24 :         //  | |           |            |  |  |  |  |
     573           24 :         //  | +-----------+            +--+--+--+--+
     574           24 :         //  |
     575           24 :         //  +--------------> key
     576           24 :         //
     577           24 :         //
     578           24 :         // If one key (X) has a lot of page versions:
     579           24 :         //
     580           24 :         // LSN
     581           24 :         //  ^
     582           24 :         //  |                                 (X)
     583           24 :         //  | +-----------+            +--+--+--+--+
     584           24 :         //  | |           |            |  |  |  |  |
     585           24 :         //  | +-----------+            |  |  +--+  |
     586           24 :         //  | |           |            |  |  |  |  |
     587           24 :         //  | +-----------+     ==>    |  |  |  |  |
     588           24 :         //  | |           |            |  |  +--+  |
     589           24 :         //  | +-----------+            |  |  |  |  |
     590           24 :         //  | |           |            |  |  |  |  |
     591           24 :         //  | +-----------+            +--+--+--+--+
     592           24 :         //  |
     593           24 :         //  +--------------> key
     594           24 :         // TODO: this actually divides the layers into fixed-size chunks, not
     595           24 :         // based on the partitioning.
     596           24 :         //
     597           24 :         // TODO: we should also opportunistically materialize and
     598           24 :         // garbage collect what we can.
     599           24 :         let mut new_layers = Vec::new();
     600           24 :         let mut prev_key: Option<Key> = None;
     601           24 :         let mut writer: Option<DeltaLayerWriter> = None;
     602           24 :         let mut key_values_total_size = 0u64;
     603           24 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     604           24 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     605              : 
     606              :         for &DeltaEntry {
     607      2041998 :             key, lsn, ref val, ..
     608      2042022 :         } in all_values_iter
     609              :         {
     610      2041998 :             let value = val.load(ctx).await?;
     611      2041998 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     612      2041998 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     613      2041998 :             if !same_key || lsn == dup_end_lsn {
     614      2021998 :                 let mut next_key_size = 0u64;
     615      2021998 :                 let is_dup_layer = dup_end_lsn.is_valid();
     616      2021998 :                 dup_start_lsn = Lsn::INVALID;
     617      2021998 :                 if !same_key {
     618      2021998 :                     dup_end_lsn = Lsn::INVALID;
     619      2021998 :                 }
     620              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     621      2021998 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     622      2021998 :                     next_key_size = next_size;
     623      2021998 :                     if key != next_key {
     624      2021974 :                         if dup_end_lsn.is_valid() {
     625            0 :                             // We are writting segment with duplicates:
     626            0 :                             // place all remaining values of this key in separate segment
     627            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     628            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     629      2021974 :                         }
     630      2021974 :                         break;
     631           24 :                     }
     632           24 :                     key_values_total_size += next_size;
     633           24 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     634           24 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     635           24 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     636              :                         // Split key between multiple layers: such layer can contain only single key
     637            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     638            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     639              :                         } else {
     640            0 :                             lsn // start with the first LSN for this key
     641              :                         };
     642            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     643            0 :                         break;
     644           24 :                     }
     645              :                 }
     646              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     647      2021998 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     648            0 :                     dup_start_lsn = dup_end_lsn;
     649            0 :                     dup_end_lsn = lsn_range.end;
     650      2021998 :                 }
     651      2021998 :                 if writer.is_some() {
     652      2021974 :                     let written_size = writer.as_mut().unwrap().size();
     653      2021974 :                     let contains_hole =
     654      2021974 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     655              :                     // check if key cause layer overflow or contains hole...
     656      2021974 :                     if is_dup_layer
     657      2021974 :                         || dup_end_lsn.is_valid()
     658      2021974 :                         || written_size + key_values_total_size > target_file_size
     659      2021774 :                         || contains_hole
     660              :                     {
     661              :                         // ... if so, flush previous layer and prepare to write new one
     662          200 :                         new_layers.push(
     663          200 :                             writer
     664          200 :                                 .take()
     665          200 :                                 .unwrap()
     666          200 :                                 .finish(prev_key.unwrap().next(), self, ctx)
     667          509 :                                 .await?,
     668              :                         );
     669          200 :                         writer = None;
     670          200 : 
     671          200 :                         if contains_hole {
     672            0 :                             // skip hole
     673            0 :                             next_hole += 1;
     674          200 :                         }
     675      2021774 :                     }
     676           24 :                 }
     677              :                 // Remember size of key value because at next iteration we will access next item
     678      2021998 :                 key_values_total_size = next_key_size;
     679        20000 :             }
     680      2041998 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     681            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     682            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     683            0 :                 )))
     684      2041998 :             });
     685              : 
     686      2041998 :             if !self.shard_identity.is_key_disposable(&key) {
     687      2041998 :                 if writer.is_none() {
     688          224 :                     // Create writer if not initiaized yet
     689          224 :                     writer = Some(
     690              :                         DeltaLayerWriter::new(
     691          224 :                             self.conf,
     692          224 :                             self.timeline_id,
     693          224 :                             self.tenant_shard_id,
     694          224 :                             key,
     695          224 :                             if dup_end_lsn.is_valid() {
     696              :                                 // this is a layer containing slice of values of the same key
     697            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     698            0 :                                 dup_start_lsn..dup_end_lsn
     699              :                             } else {
     700          224 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     701          224 :                                 lsn_range.clone()
     702              :                             },
     703              :                         )
     704          112 :                         .await?,
     705              :                     );
     706      2041774 :                 }
     707              : 
     708      2041998 :                 writer
     709      2041998 :                     .as_mut()
     710      2041998 :                     .unwrap()
     711      2041998 :                     .put_value(key, lsn, value, ctx)
     712         1555 :                     .await?;
     713              :             } else {
     714            0 :                 debug!(
     715            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     716            0 :                     key,
     717            0 :                     self.shard_identity.get_shard_number(&key)
     718              :                 );
     719              :             }
     720              : 
     721      2041998 :             if !new_layers.is_empty() {
     722        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     723      2022212 :             }
     724              : 
     725      2041998 :             prev_key = Some(key);
     726              :         }
     727           24 :         if let Some(writer) = writer {
     728         1957 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
     729            0 :         }
     730              : 
     731              :         // Sync layers
     732           24 :         if !new_layers.is_empty() {
     733              :             // Print a warning if the created layer is larger than double the target size
     734              :             // Add two pages for potential overhead. This should in theory be already
     735              :             // accounted for in the target calculation, but for very small targets,
     736              :             // we still might easily hit the limit otherwise.
     737           24 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     738          224 :             for layer in new_layers.iter() {
     739          224 :                 if layer.layer_desc().file_size > warn_limit {
     740            0 :                     warn!(
     741              :                         %layer,
     742            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     743              :                     );
     744          224 :                 }
     745              :             }
     746              : 
     747              :             // The writer.finish() above already did the fsync of the inodes.
     748              :             // We just need to fsync the directory in which these inodes are linked,
     749              :             // which we know to be the timeline directory.
     750              :             //
     751              :             // We use fatal_err() below because the after writer.finish() returns with success,
     752              :             // the in-memory state of the filesystem already has the layer file in its final place,
     753              :             // and subsequent pageserver code could think it's durable while it really isn't.
     754           24 :             let timeline_dir = VirtualFile::open(
     755           24 :                 &self
     756           24 :                     .conf
     757           24 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     758           24 :             )
     759           12 :             .await
     760           24 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     761           24 :             timeline_dir
     762           24 :                 .sync_all()
     763           12 :                 .await
     764           24 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     765            0 :         }
     766              : 
     767           24 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     768           24 :         stats.new_deltas_count = Some(new_layers.len());
     769          224 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     770           24 : 
     771           24 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     772           24 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     773              :         {
     774           24 :             Ok(stats_json) => {
     775           24 :                 info!(
     776            0 :                     stats_json = stats_json.as_str(),
     777            0 :                     "compact_level0_phase1 stats available"
     778              :                 )
     779              :             }
     780            0 :             Err(e) => {
     781            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     782              :             }
     783              :         }
     784              : 
     785           24 :         Ok(CompactLevel0Phase1Result {
     786           24 :             new_layers,
     787           24 :             deltas_to_compact: deltas_to_compact
     788           24 :                 .into_iter()
     789          262 :                 .map(|x| x.drop_eviction_guard())
     790           24 :                 .collect::<Vec<_>>(),
     791           24 :         })
     792          330 :     }
     793              : }
     794              : 
     795              : #[derive(Default)]
     796              : struct CompactLevel0Phase1Result {
     797              :     new_layers: Vec<ResidentLayer>,
     798              :     deltas_to_compact: Vec<Layer>,
     799              : }
     800              : 
     801              : #[derive(Default)]
     802              : struct CompactLevel0Phase1StatsBuilder {
     803              :     version: Option<u64>,
     804              :     tenant_id: Option<TenantShardId>,
     805              :     timeline_id: Option<TimelineId>,
     806              :     read_lock_acquisition_micros: DurationRecorder,
     807              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     808              :     read_lock_held_key_sort_micros: DurationRecorder,
     809              :     read_lock_held_prerequisites_micros: DurationRecorder,
     810              :     read_lock_held_compute_holes_micros: DurationRecorder,
     811              :     read_lock_drop_micros: DurationRecorder,
     812              :     write_layer_files_micros: DurationRecorder,
     813              :     level0_deltas_count: Option<usize>,
     814              :     new_deltas_count: Option<usize>,
     815              :     new_deltas_size: Option<u64>,
     816              : }
     817              : 
     818              : #[derive(serde::Serialize)]
     819              : struct CompactLevel0Phase1Stats {
     820              :     version: u64,
     821              :     tenant_id: TenantShardId,
     822              :     timeline_id: TimelineId,
     823              :     read_lock_acquisition_micros: RecordedDuration,
     824              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     825              :     read_lock_held_key_sort_micros: RecordedDuration,
     826              :     read_lock_held_prerequisites_micros: RecordedDuration,
     827              :     read_lock_held_compute_holes_micros: RecordedDuration,
     828              :     read_lock_drop_micros: RecordedDuration,
     829              :     write_layer_files_micros: RecordedDuration,
     830              :     level0_deltas_count: usize,
     831              :     new_deltas_count: usize,
     832              :     new_deltas_size: u64,
     833              : }
     834              : 
     835              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     836              :     type Error = anyhow::Error;
     837              : 
     838           24 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     839           24 :         Ok(Self {
     840           24 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     841           24 :             tenant_id: value
     842           24 :                 .tenant_id
     843           24 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     844           24 :             timeline_id: value
     845           24 :                 .timeline_id
     846           24 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     847           24 :             read_lock_acquisition_micros: value
     848           24 :                 .read_lock_acquisition_micros
     849           24 :                 .into_recorded()
     850           24 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     851           24 :             read_lock_held_spawn_blocking_startup_micros: value
     852           24 :                 .read_lock_held_spawn_blocking_startup_micros
     853           24 :                 .into_recorded()
     854           24 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     855           24 :             read_lock_held_key_sort_micros: value
     856           24 :                 .read_lock_held_key_sort_micros
     857           24 :                 .into_recorded()
     858           24 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     859           24 :             read_lock_held_prerequisites_micros: value
     860           24 :                 .read_lock_held_prerequisites_micros
     861           24 :                 .into_recorded()
     862           24 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     863           24 :             read_lock_held_compute_holes_micros: value
     864           24 :                 .read_lock_held_compute_holes_micros
     865           24 :                 .into_recorded()
     866           24 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     867           24 :             read_lock_drop_micros: value
     868           24 :                 .read_lock_drop_micros
     869           24 :                 .into_recorded()
     870           24 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     871           24 :             write_layer_files_micros: value
     872           24 :                 .write_layer_files_micros
     873           24 :                 .into_recorded()
     874           24 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     875           24 :             level0_deltas_count: value
     876           24 :                 .level0_deltas_count
     877           24 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     878           24 :             new_deltas_count: value
     879           24 :                 .new_deltas_count
     880           24 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     881           24 :             new_deltas_size: value
     882           24 :                 .new_deltas_size
     883           24 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     884              :         })
     885           24 :     }
     886              : }
     887              : 
     888              : impl Timeline {
     889              :     /// Entry point for new tiered compaction algorithm.
     890              :     ///
     891              :     /// All the real work is in the implementation in the pageserver_compaction
     892              :     /// crate. The code here would apply to any algorithm implemented by the
     893              :     /// same interface, but tiered is the only one at the moment.
     894              :     ///
     895              :     /// TODO: cancellation
     896            0 :     pub(crate) async fn compact_tiered(
     897            0 :         self: &Arc<Self>,
     898            0 :         _cancel: &CancellationToken,
     899            0 :         ctx: &RequestContext,
     900            0 :     ) -> Result<(), CompactionError> {
     901            0 :         let fanout = self.get_compaction_threshold() as u64;
     902            0 :         let target_file_size = self.get_checkpoint_distance();
     903              : 
     904              :         // Find the top of the historical layers
     905            0 :         let end_lsn = {
     906            0 :             let guard = self.layers.read().await;
     907            0 :             let layers = guard.layer_map();
     908              : 
     909            0 :             let l0_deltas = layers.get_level0_deltas()?;
     910            0 :             drop(guard);
     911            0 : 
     912            0 :             // As an optimization, if we find that there are too few L0 layers,
     913            0 :             // bail out early. We know that the compaction algorithm would do
     914            0 :             // nothing in that case.
     915            0 :             if l0_deltas.len() < fanout as usize {
     916              :                 // doesn't need compacting
     917            0 :                 return Ok(());
     918            0 :             }
     919            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     920            0 :         };
     921            0 : 
     922            0 :         // Is the timeline being deleted?
     923            0 :         if self.is_stopping() {
     924            0 :             trace!("Dropping out of compaction on timeline shutdown");
     925            0 :             return Err(CompactionError::ShuttingDown);
     926            0 :         }
     927              : 
     928            0 :         let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
     929              :         // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
     930            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
     931            0 : 
     932            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     933            0 :             &mut adaptor,
     934            0 :             end_lsn,
     935            0 :             target_file_size,
     936            0 :             fanout,
     937            0 :             ctx,
     938            0 :         )
     939            0 :         .await?;
     940              : 
     941            0 :         adaptor.flush_updates().await?;
     942            0 :         Ok(())
     943            0 :     }
     944              : }
     945              : 
     946              : struct TimelineAdaptor {
     947              :     timeline: Arc<Timeline>,
     948              : 
     949              :     keyspace: (Lsn, KeySpace),
     950              : 
     951              :     new_deltas: Vec<ResidentLayer>,
     952              :     new_images: Vec<ResidentLayer>,
     953              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
     954              : }
     955              : 
     956              : impl TimelineAdaptor {
     957            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
     958            0 :         Self {
     959            0 :             timeline: timeline.clone(),
     960            0 :             keyspace,
     961            0 :             new_images: Vec::new(),
     962            0 :             new_deltas: Vec::new(),
     963            0 :             layers_to_delete: Vec::new(),
     964            0 :         }
     965            0 :     }
     966              : 
     967            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
     968            0 :         let layers_to_delete = {
     969            0 :             let guard = self.timeline.layers.read().await;
     970            0 :             self.layers_to_delete
     971            0 :                 .iter()
     972            0 :                 .map(|x| guard.get_from_desc(x))
     973            0 :                 .collect::<Vec<Layer>>()
     974            0 :         };
     975            0 :         self.timeline
     976            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
     977            0 :             .await?;
     978              : 
     979            0 :         self.timeline
     980            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
     981              : 
     982            0 :         self.new_deltas.clear();
     983            0 :         self.layers_to_delete.clear();
     984            0 :         Ok(())
     985            0 :     }
     986              : }
     987              : 
     988              : #[derive(Clone)]
     989              : struct ResidentDeltaLayer(ResidentLayer);
     990              : #[derive(Clone)]
     991              : struct ResidentImageLayer(ResidentLayer);
     992              : 
     993              : impl CompactionJobExecutor for TimelineAdaptor {
     994              :     type Key = crate::repository::Key;
     995              : 
     996              :     type Layer = OwnArc<PersistentLayerDesc>;
     997              :     type DeltaLayer = ResidentDeltaLayer;
     998              :     type ImageLayer = ResidentImageLayer;
     999              : 
    1000              :     type RequestContext = crate::context::RequestContext;
    1001              : 
    1002            0 :     fn get_shard_identity(&self) -> &ShardIdentity {
    1003            0 :         self.timeline.get_shard_identity()
    1004            0 :     }
    1005              : 
    1006            0 :     async fn get_layers(
    1007            0 :         &mut self,
    1008            0 :         key_range: &Range<Key>,
    1009            0 :         lsn_range: &Range<Lsn>,
    1010            0 :         _ctx: &RequestContext,
    1011            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
    1012            0 :         self.flush_updates().await?;
    1013              : 
    1014            0 :         let guard = self.timeline.layers.read().await;
    1015            0 :         let layer_map = guard.layer_map();
    1016            0 : 
    1017            0 :         let result = layer_map
    1018            0 :             .iter_historic_layers()
    1019            0 :             .filter(|l| {
    1020            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
    1021            0 :             })
    1022            0 :             .map(OwnArc)
    1023            0 :             .collect();
    1024            0 :         Ok(result)
    1025            0 :     }
    1026              : 
    1027            0 :     async fn get_keyspace(
    1028            0 :         &mut self,
    1029            0 :         key_range: &Range<Key>,
    1030            0 :         lsn: Lsn,
    1031            0 :         _ctx: &RequestContext,
    1032            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
    1033            0 :         if lsn == self.keyspace.0 {
    1034            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
    1035            0 :                 &self.keyspace.1.ranges,
    1036            0 :                 key_range,
    1037            0 :             ))
    1038              :         } else {
    1039              :             // The current compaction implementatin only ever requests the key space
    1040              :             // at the compaction end LSN.
    1041            0 :             anyhow::bail!("keyspace not available for requested lsn");
    1042              :         }
    1043            0 :     }
    1044              : 
    1045            0 :     async fn downcast_delta_layer(
    1046            0 :         &self,
    1047            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1048            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
    1049            0 :         // this is a lot more complex than a simple downcast...
    1050            0 :         if layer.is_delta() {
    1051            0 :             let l = {
    1052            0 :                 let guard = self.timeline.layers.read().await;
    1053            0 :                 guard.get_from_desc(layer)
    1054              :             };
    1055            0 :             let result = l.download_and_keep_resident().await?;
    1056              : 
    1057            0 :             Ok(Some(ResidentDeltaLayer(result)))
    1058              :         } else {
    1059            0 :             Ok(None)
    1060              :         }
    1061            0 :     }
    1062              : 
    1063            0 :     async fn create_image(
    1064            0 :         &mut self,
    1065            0 :         lsn: Lsn,
    1066            0 :         key_range: &Range<Key>,
    1067            0 :         ctx: &RequestContext,
    1068            0 :     ) -> anyhow::Result<()> {
    1069            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
    1070            0 :     }
    1071              : 
    1072            0 :     async fn create_delta(
    1073            0 :         &mut self,
    1074            0 :         lsn_range: &Range<Lsn>,
    1075            0 :         key_range: &Range<Key>,
    1076            0 :         input_layers: &[ResidentDeltaLayer],
    1077            0 :         ctx: &RequestContext,
    1078            0 :     ) -> anyhow::Result<()> {
    1079            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    1080              : 
    1081            0 :         let mut all_entries = Vec::new();
    1082            0 :         for dl in input_layers.iter() {
    1083            0 :             all_entries.extend(dl.load_keys(ctx).await?);
    1084              :         }
    1085              : 
    1086              :         // The current stdlib sorting implementation is designed in a way where it is
    1087              :         // particularly fast where the slice is made up of sorted sub-ranges.
    1088            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    1089              : 
    1090            0 :         let mut writer = DeltaLayerWriter::new(
    1091            0 :             self.timeline.conf,
    1092            0 :             self.timeline.timeline_id,
    1093            0 :             self.timeline.tenant_shard_id,
    1094            0 :             key_range.start,
    1095            0 :             lsn_range.clone(),
    1096            0 :         )
    1097            0 :         .await?;
    1098              : 
    1099            0 :         let mut dup_values = 0;
    1100            0 : 
    1101            0 :         // This iterator walks through all key-value pairs from all the layers
    1102            0 :         // we're compacting, in key, LSN order.
    1103            0 :         let mut prev: Option<(Key, Lsn)> = None;
    1104              :         for &DeltaEntry {
    1105            0 :             key, lsn, ref val, ..
    1106            0 :         } in all_entries.iter()
    1107              :         {
    1108            0 :             if prev == Some((key, lsn)) {
    1109              :                 // This is a duplicate. Skip it.
    1110              :                 //
    1111              :                 // It can happen if compaction is interrupted after writing some
    1112              :                 // layers but not all, and we are compacting the range again.
    1113              :                 // The calculations in the algorithm assume that there are no
    1114              :                 // duplicates, so the math on targeted file size is likely off,
    1115              :                 // and we will create smaller files than expected.
    1116            0 :                 dup_values += 1;
    1117            0 :                 continue;
    1118            0 :             }
    1119              : 
    1120            0 :             let value = val.load(ctx).await?;
    1121              : 
    1122            0 :             writer.put_value(key, lsn, value, ctx).await?;
    1123              : 
    1124            0 :             prev = Some((key, lsn));
    1125              :         }
    1126              : 
    1127            0 :         if dup_values > 0 {
    1128            0 :             warn!("delta layer created with {} duplicate values", dup_values);
    1129            0 :         }
    1130              : 
    1131            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
    1132            0 :             Err(anyhow::anyhow!(
    1133            0 :                 "failpoint delta-layer-writer-fail-before-finish"
    1134            0 :             ))
    1135            0 :         });
    1136              : 
    1137            0 :         let new_delta_layer = writer
    1138            0 :             .finish(prev.unwrap().0.next(), &self.timeline, ctx)
    1139            0 :             .await?;
    1140              : 
    1141            0 :         self.new_deltas.push(new_delta_layer);
    1142            0 :         Ok(())
    1143            0 :     }
    1144              : 
    1145            0 :     async fn delete_layer(
    1146            0 :         &mut self,
    1147            0 :         layer: &OwnArc<PersistentLayerDesc>,
    1148            0 :         _ctx: &RequestContext,
    1149            0 :     ) -> anyhow::Result<()> {
    1150            0 :         self.layers_to_delete.push(layer.clone().0);
    1151            0 :         Ok(())
    1152            0 :     }
    1153              : }
    1154              : 
    1155              : impl TimelineAdaptor {
    1156            0 :     async fn create_image_impl(
    1157            0 :         &mut self,
    1158            0 :         lsn: Lsn,
    1159            0 :         key_range: &Range<Key>,
    1160            0 :         ctx: &RequestContext,
    1161            0 :     ) -> Result<(), PageReconstructError> {
    1162            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
    1163              : 
    1164            0 :         let mut image_layer_writer = ImageLayerWriter::new(
    1165            0 :             self.timeline.conf,
    1166            0 :             self.timeline.timeline_id,
    1167            0 :             self.timeline.tenant_shard_id,
    1168            0 :             key_range,
    1169            0 :             lsn,
    1170            0 :         )
    1171            0 :         .await?;
    1172              : 
    1173            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1174            0 :             Err(PageReconstructError::Other(anyhow::anyhow!(
    1175            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1176            0 :             )))
    1177            0 :         });
    1178            0 :         let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
    1179            0 :         for range in &keyspace_ranges {
    1180            0 :             let mut key = range.start;
    1181            0 :             while key < range.end {
    1182            0 :                 let img = match self.timeline.get(key, lsn, ctx).await {
    1183            0 :                     Ok(img) => img,
    1184            0 :                     Err(err) => {
    1185            0 :                         // If we fail to reconstruct a VM or FSM page, we can zero the
    1186            0 :                         // page without losing any actual user data. That seems better
    1187            0 :                         // than failing repeatedly and getting stuck.
    1188            0 :                         //
    1189            0 :                         // We had a bug at one point, where we truncated the FSM and VM
    1190            0 :                         // in the pageserver, but the Postgres didn't know about that
    1191            0 :                         // and continued to generate incremental WAL records for pages
    1192            0 :                         // that didn't exist in the pageserver. Trying to replay those
    1193            0 :                         // WAL records failed to find the previous image of the page.
    1194            0 :                         // This special case allows us to recover from that situation.
    1195            0 :                         // See https://github.com/neondatabase/neon/issues/2601.
    1196            0 :                         //
    1197            0 :                         // Unfortunately we cannot do this for the main fork, or for
    1198            0 :                         // any metadata keys, keys, as that would lead to actual data
    1199            0 :                         // loss.
    1200            0 :                         if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    1201            0 :                             warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    1202            0 :                             ZERO_PAGE.clone()
    1203              :                         } else {
    1204            0 :                             return Err(err);
    1205              :                         }
    1206              :                     }
    1207              :                 };
    1208            0 :                 image_layer_writer.put_image(key, img, ctx).await?;
    1209            0 :                 key = key.next();
    1210              :             }
    1211              :         }
    1212            0 :         let image_layer = image_layer_writer.finish(&self.timeline, ctx).await?;
    1213              : 
    1214            0 :         self.new_images.push(image_layer);
    1215            0 : 
    1216            0 :         timer.stop_and_record();
    1217            0 : 
    1218            0 :         Ok(())
    1219            0 :     }
    1220              : }
    1221              : 
    1222              : impl CompactionRequestContext for crate::context::RequestContext {}
    1223              : 
    1224              : #[derive(Debug, Clone)]
    1225              : pub struct OwnArc<T>(pub Arc<T>);
    1226              : 
    1227              : impl<T> Deref for OwnArc<T> {
    1228              :     type Target = <Arc<T> as Deref>::Target;
    1229            0 :     fn deref(&self) -> &Self::Target {
    1230            0 :         &self.0
    1231            0 :     }
    1232              : }
    1233              : 
    1234              : impl<T> AsRef<T> for OwnArc<T> {
    1235            0 :     fn as_ref(&self) -> &T {
    1236            0 :         self.0.as_ref()
    1237            0 :     }
    1238              : }
    1239              : 
    1240              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1241            0 :     fn key_range(&self) -> &Range<Key> {
    1242            0 :         &self.key_range
    1243            0 :     }
    1244            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1245            0 :         &self.lsn_range
    1246            0 :     }
    1247            0 :     fn file_size(&self) -> u64 {
    1248            0 :         self.file_size
    1249            0 :     }
    1250            0 :     fn short_id(&self) -> std::string::String {
    1251            0 :         self.as_ref().short_id().to_string()
    1252            0 :     }
    1253            0 :     fn is_delta(&self) -> bool {
    1254            0 :         self.as_ref().is_delta()
    1255            0 :     }
    1256              : }
    1257              : 
    1258              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1259            0 :     fn key_range(&self) -> &Range<Key> {
    1260            0 :         &self.layer_desc().key_range
    1261            0 :     }
    1262            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1263            0 :         &self.layer_desc().lsn_range
    1264            0 :     }
    1265            0 :     fn file_size(&self) -> u64 {
    1266            0 :         self.layer_desc().file_size
    1267            0 :     }
    1268            0 :     fn short_id(&self) -> std::string::String {
    1269            0 :         self.layer_desc().short_id().to_string()
    1270            0 :     }
    1271            0 :     fn is_delta(&self) -> bool {
    1272            0 :         true
    1273            0 :     }
    1274              : }
    1275              : 
    1276              : use crate::tenant::timeline::DeltaEntry;
    1277              : 
    1278              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1279            0 :     fn key_range(&self) -> &Range<Key> {
    1280            0 :         &self.0.layer_desc().key_range
    1281            0 :     }
    1282            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1283            0 :         &self.0.layer_desc().lsn_range
    1284            0 :     }
    1285            0 :     fn file_size(&self) -> u64 {
    1286            0 :         self.0.layer_desc().file_size
    1287            0 :     }
    1288            0 :     fn short_id(&self) -> std::string::String {
    1289            0 :         self.0.layer_desc().short_id().to_string()
    1290            0 :     }
    1291            0 :     fn is_delta(&self) -> bool {
    1292            0 :         true
    1293            0 :     }
    1294              : }
    1295              : 
    1296              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1297              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1298              : 
    1299            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1300            0 :         self.0.load_keys(ctx).await
    1301            0 :     }
    1302              : }
    1303              : 
    1304              : impl CompactionLayer<Key> for ResidentImageLayer {
    1305            0 :     fn key_range(&self) -> &Range<Key> {
    1306            0 :         &self.0.layer_desc().key_range
    1307            0 :     }
    1308            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1309            0 :         &self.0.layer_desc().lsn_range
    1310            0 :     }
    1311            0 :     fn file_size(&self) -> u64 {
    1312            0 :         self.0.layer_desc().file_size
    1313            0 :     }
    1314            0 :     fn short_id(&self) -> std::string::String {
    1315            0 :         self.0.layer_desc().short_id().to_string()
    1316            0 :     }
    1317            0 :     fn is_delta(&self) -> bool {
    1318            0 :         false
    1319            0 :     }
    1320              : }
    1321              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta