LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - compaction.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 54.4 % 826 449
Test Date: 2024-04-08 10:22:05 Functions: 20.0 % 100 20

            Line data    Source code
       1              : //! New compaction implementation. The algorithm itself is implemented in the
       2              : //! compaction crate. This file implements the callbacks and structs that allow
       3              : //! the algorithm to drive the process.
       4              : //!
       5              : //! The old legacy algorithm is implemented directly in `timeline.rs`.
       6              : 
       7              : use std::collections::BinaryHeap;
       8              : use std::ops::{Deref, Range};
       9              : use std::sync::Arc;
      10              : 
      11              : use super::layer_manager::LayerManager;
      12              : use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};
      13              : 
      14              : use anyhow::{anyhow, Context};
      15              : use async_trait::async_trait;
      16              : use enumset::EnumSet;
      17              : use fail::fail_point;
      18              : use itertools::Itertools;
      19              : use pageserver_api::shard::TenantShardId;
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{debug, info, info_span, trace, warn, Instrument};
      22              : use utils::id::TimelineId;
      23              : 
      24              : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
      25              : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
      26              : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
      27              : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
      28              : use crate::tenant::timeline::{Layer, ResidentLayer};
      29              : use crate::tenant::DeltaLayer;
      30              : use crate::tenant::PageReconstructError;
      31              : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
      32              : use crate::{page_cache, ZERO_PAGE};
      33              : 
      34              : use crate::keyspace::KeySpace;
      35              : use crate::repository::Key;
      36              : 
      37              : use utils::lsn::Lsn;
      38              : 
      39              : use pageserver_compaction::helpers::overlaps_with;
      40              : use pageserver_compaction::interface::*;
      41              : 
      42              : use super::CompactionError;
      43              : 
      44              : impl Timeline {
      45              :     /// TODO: cancellation
      46          510 :     pub(crate) async fn compact_legacy(
      47          510 :         self: &Arc<Self>,
      48          510 :         _cancel: &CancellationToken,
      49          510 :         flags: EnumSet<CompactFlags>,
      50          510 :         ctx: &RequestContext,
      51          510 :     ) -> Result<(), CompactionError> {
      52          510 :         // High level strategy for compaction / image creation:
      53          510 :         //
      54          510 :         // 1. First, calculate the desired "partitioning" of the
      55          510 :         // currently in-use key space. The goal is to partition the
      56          510 :         // key space into roughly fixed-size chunks, but also take into
      57          510 :         // account any existing image layers, and try to align the
      58          510 :         // chunk boundaries with the existing image layers to avoid
      59          510 :         // too much churn. Also try to align chunk boundaries with
      60          510 :         // relation boundaries.  In principle, we don't know about
      61          510 :         // relation boundaries here, we just deal with key-value
      62          510 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
      63          510 :         // map relations into key-value pairs. But in practice we know
      64          510 :         // that 'field6' is the block number, and the fields 1-5
      65          510 :         // identify a relation. This is just an optimization,
      66          510 :         // though.
      67          510 :         //
      68          510 :         // 2. Once we know the partitioning, for each partition,
      69          510 :         // decide if it's time to create a new image layer. The
      70          510 :         // criteria is: there has been too much "churn" since the last
      71          510 :         // image layer? The "churn" is fuzzy concept, it's a
      72          510 :         // combination of too many delta files, or too much WAL in
      73          510 :         // total in the delta file. Or perhaps: if creating an image
      74          510 :         // file would allow to delete some older files.
      75          510 :         //
      76          510 :         // 3. After that, we compact all level0 delta files if there
      77          510 :         // are too many of them.  While compacting, we also garbage
      78          510 :         // collect any page versions that are no longer needed because
      79          510 :         // of the new image layers we created in step 2.
      80          510 :         //
      81          510 :         // TODO: This high level strategy hasn't been implemented yet.
      82          510 :         // Below are functions compact_level0() and create_image_layers()
      83          510 :         // but they are a bit ad hoc and don't quite work like it's explained
      84          510 :         // above. Rewrite it.
      85          510 : 
      86          510 :         // Is the timeline being deleted?
      87          510 :         if self.is_stopping() {
      88            0 :             trace!("Dropping out of compaction on timeline shutdown");
      89            0 :             return Err(CompactionError::ShuttingDown);
      90          510 :         }
      91          510 : 
      92          510 :         let target_file_size = self.get_checkpoint_distance();
      93          510 : 
      94          510 :         // Define partitioning schema if needed
      95          510 : 
      96          510 :         // FIXME: the match should only cover repartitioning, not the next steps
      97          510 :         match self
      98          510 :             .repartition(
      99          510 :                 self.get_last_record_lsn(),
     100          510 :                 self.get_compaction_target_size(),
     101          510 :                 flags,
     102          510 :                 ctx,
     103          510 :             )
     104        13330 :             .await
     105              :         {
     106          510 :             Ok((partitioning, lsn)) => {
     107          510 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     108          510 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     109          510 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     110          510 :                     .build();
     111          510 : 
     112          510 :                 // 2. Compact
     113          510 :                 let timer = self.metrics.compact_time_histo.start_timer();
     114        76376 :                 self.compact_level0(target_file_size, ctx).await?;
     115          510 :                 timer.stop_and_record();
     116              : 
     117              :                 // 3. Create new image layers for partitions that have been modified
     118              :                 // "enough".
     119          510 :                 let layers = self
     120          510 :                     .create_image_layers(
     121          510 :                         &partitioning,
     122          510 :                         lsn,
     123          510 :                         flags.contains(CompactFlags::ForceImageLayerCreation),
     124          510 :                         &image_ctx,
     125          510 :                     )
     126            0 :                     .await
     127          510 :                     .map_err(anyhow::Error::from)?;
     128              : 
     129          510 :                 self.upload_new_image_layers(layers)?;
     130              :             }
     131            0 :             Err(err) => {
     132            0 :                 // no partitioning? This is normal, if the timeline was just created
     133            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     134            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     135            0 :                 // error but continue.
     136            0 :                 //
     137            0 :                 // Suppress error when it's due to cancellation
     138            0 :                 if !self.cancel.is_cancelled() {
     139            0 :                     tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
     140            0 :                 }
     141              :             }
     142              :         };
     143              : 
     144          510 :         Ok(())
     145          510 :     }
     146              : 
     147              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
     148              :     /// as Level 1 files.
     149          510 :     async fn compact_level0(
     150          510 :         self: &Arc<Self>,
     151          510 :         target_file_size: u64,
     152          510 :         ctx: &RequestContext,
     153          510 :     ) -> Result<(), CompactionError> {
     154              :         let CompactLevel0Phase1Result {
     155          510 :             new_layers,
     156          510 :             deltas_to_compact,
     157              :         } = {
     158          510 :             let phase1_span = info_span!("compact_level0_phase1");
     159          510 :             let ctx = ctx.attached_child();
     160          510 :             let mut stats = CompactLevel0Phase1StatsBuilder {
     161          510 :                 version: Some(2),
     162          510 :                 tenant_id: Some(self.tenant_shard_id),
     163          510 :                 timeline_id: Some(self.timeline_id),
     164          510 :                 ..Default::default()
     165          510 :             };
     166          510 : 
     167          510 :             let begin = tokio::time::Instant::now();
     168          510 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
     169          510 :             let now = tokio::time::Instant::now();
     170          510 :             stats.read_lock_acquisition_micros =
     171          510 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
     172          510 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
     173          510 :                 .instrument(phase1_span)
     174        76376 :                 .await?
     175              :         };
     176              : 
     177          510 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
     178              :             // nothing to do
     179          468 :             return Ok(());
     180           42 :         }
     181           42 : 
     182           42 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
     183            0 :             .await?;
     184           42 :         Ok(())
     185          510 :     }
     186              : 
     187              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
     188          510 :     async fn compact_level0_phase1(
     189          510 :         self: &Arc<Self>,
     190          510 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
     191          510 :         mut stats: CompactLevel0Phase1StatsBuilder,
     192          510 :         target_file_size: u64,
     193          510 :         ctx: &RequestContext,
     194          510 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
     195          510 :         stats.read_lock_held_spawn_blocking_startup_micros =
     196          510 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
     197          510 :         let layers = guard.layer_map();
     198          510 :         let level0_deltas = layers.get_level0_deltas()?;
     199          510 :         let mut level0_deltas = level0_deltas
     200          510 :             .into_iter()
     201         2362 :             .map(|x| guard.get_from_desc(&x))
     202          510 :             .collect_vec();
     203          510 :         stats.level0_deltas_count = Some(level0_deltas.len());
     204          510 :         // Only compact if enough layers have accumulated.
     205          510 :         let threshold = self.get_compaction_threshold();
     206          510 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
     207          468 :             debug!(
     208            0 :                 level0_deltas = level0_deltas.len(),
     209            0 :                 threshold, "too few deltas to compact"
     210            0 :             );
     211          468 :             return Ok(CompactLevel0Phase1Result::default());
     212           42 :         }
     213           42 : 
     214           42 :         // This failpoint is used together with `test_duplicate_layers` integration test.
     215           42 :         // It returns the compaction result exactly the same layers as input to compaction.
     216           42 :         // We want to ensure that this will not cause any problem when updating the layer map
     217           42 :         // after the compaction is finished.
     218           42 :         //
     219           42 :         // Currently, there are two rare edge cases that will cause duplicated layers being
     220           42 :         // inserted.
     221           42 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
     222           42 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
     223           42 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
     224           42 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
     225           42 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
     226           42 :         //    layer replace instead of the normal remove / upload process.
     227           42 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
     228           42 :         //    size length. Compaction will likely create the same set of n files afterwards.
     229           42 :         //
     230           42 :         // This failpoint is a superset of both of the cases.
     231           42 :         if cfg!(feature = "testing") {
     232           42 :             let active = (|| {
     233           42 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
     234           42 :                 false
     235           42 :             })();
     236           42 : 
     237           42 :             if active {
     238            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
     239            0 :                 for delta in &level0_deltas {
     240              :                     // we are just faking these layers as being produced again for this failpoint
     241            0 :                     new_layers.push(
     242            0 :                         delta
     243            0 :                             .download_and_keep_resident()
     244            0 :                             .await
     245            0 :                             .context("download layer for failpoint")?,
     246              :                     );
     247              :                 }
     248            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
     249            0 :                 return Ok(CompactLevel0Phase1Result {
     250            0 :                     new_layers,
     251            0 :                     deltas_to_compact: level0_deltas,
     252            0 :                 });
     253           42 :             }
     254            0 :         }
     255              : 
     256              :         // Gather the files to compact in this iteration.
     257              :         //
     258              :         // Start with the oldest Level 0 delta file, and collect any other
     259              :         // level 0 files that form a contiguous sequence, such that the end
     260              :         // LSN of previous file matches the start LSN of the next file.
     261              :         //
     262              :         // Note that if the files don't form such a sequence, we might
     263              :         // "compact" just a single file. That's a bit pointless, but it allows
     264              :         // us to get rid of the level 0 file, and compact the other files on
     265              :         // the next iteration. This could probably made smarter, but such
     266              :         // "gaps" in the sequence of level 0 files should only happen in case
     267              :         // of a crash, partial download from cloud storage, or something like
     268              :         // that, so it's not a big deal in practice.
     269          800 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
     270           42 :         let mut level0_deltas_iter = level0_deltas.iter();
     271           42 : 
     272           42 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
     273           42 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
     274           42 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
     275           42 : 
     276           42 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
     277          442 :         for l in level0_deltas_iter {
     278          400 :             let lsn_range = &l.layer_desc().lsn_range;
     279          400 : 
     280          400 :             if lsn_range.start != prev_lsn_end {
     281            0 :                 break;
     282          400 :             }
     283          400 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
     284          400 :             prev_lsn_end = lsn_range.end;
     285              :         }
     286           42 :         let lsn_range = Range {
     287           42 :             start: deltas_to_compact
     288           42 :                 .first()
     289           42 :                 .unwrap()
     290           42 :                 .layer_desc()
     291           42 :                 .lsn_range
     292           42 :                 .start,
     293           42 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
     294           42 :         };
     295           42 : 
     296           42 :         info!(
     297           42 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
     298           42 :             lsn_range.start,
     299           42 :             lsn_range.end,
     300           42 :             deltas_to_compact.len(),
     301           42 :             level0_deltas.len()
     302           42 :         );
     303              : 
     304          442 :         for l in deltas_to_compact.iter() {
     305          442 :             info!("compact includes {l}");
     306              :         }
     307              : 
     308              :         // We don't need the original list of layers anymore. Drop it so that
     309              :         // we don't accidentally use it later in the function.
     310           42 :         drop(level0_deltas);
     311           42 : 
     312           42 :         stats.read_lock_held_prerequisites_micros = stats
     313           42 :             .read_lock_held_spawn_blocking_startup_micros
     314           42 :             .till_now();
     315           42 : 
     316           42 :         // Determine N largest holes where N is number of compacted layers.
     317           42 :         let max_holes = deltas_to_compact.len();
     318           42 :         let last_record_lsn = self.get_last_record_lsn();
     319           42 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
     320           42 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
     321           42 : 
     322           42 :         // min-heap (reserve space for one more element added before eviction)
     323           42 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
     324           42 :         let mut prev: Option<Key> = None;
     325           42 : 
     326           42 :         let mut all_keys = Vec::new();
     327              : 
     328          442 :         for l in deltas_to_compact.iter() {
     329         3554 :             all_keys.extend(l.load_keys(ctx).await?);
     330              :         }
     331              : 
     332              :         // FIXME: should spawn_blocking the rest of this function
     333              : 
     334              :         // The current stdlib sorting implementation is designed in a way where it is
     335              :         // particularly fast where the slice is made up of sorted sub-ranges.
     336      6971288 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     337           42 : 
     338           42 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
     339              : 
     340      3121998 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
     341      3121998 :             if let Some(prev_key) = prev {
     342              :                 // just first fast filter
     343      3121956 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
     344            0 :                     let key_range = prev_key..next_key;
     345            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
     346            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
     347            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
     348            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
     349            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
     350            0 :                     if coverage_size >= min_hole_coverage_size {
     351            0 :                         heap.push(Hole {
     352            0 :                             key_range,
     353            0 :                             coverage_size,
     354            0 :                         });
     355            0 :                         if heap.len() > max_holes {
     356            0 :                             heap.pop(); // remove smallest hole
     357            0 :                         }
     358            0 :                     }
     359      3121956 :                 }
     360           42 :             }
     361      3121998 :             prev = Some(next_key.next());
     362              :         }
     363           42 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
     364           42 :         drop_rlock(guard);
     365           42 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
     366           42 :         let mut holes = heap.into_vec();
     367           42 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
     368           42 :         let mut next_hole = 0; // index of next hole in holes vector
     369           42 : 
     370           42 :         // This iterator walks through all key-value pairs from all the layers
     371           42 :         // we're compacting, in key, LSN order.
     372           42 :         let all_values_iter = all_keys.iter();
     373           42 : 
     374           42 :         // This iterator walks through all keys and is needed to calculate size used by each key
     375           42 :         let mut all_keys_iter = all_keys
     376           42 :             .iter()
     377      3121998 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
     378      3121956 :             .coalesce(|mut prev, cur| {
     379      3121956 :                 // Coalesce keys that belong to the same key pair.
     380      3121956 :                 // This ensures that compaction doesn't put them
     381      3121956 :                 // into different layer files.
     382      3121956 :                 // Still limit this by the target file size,
     383      3121956 :                 // so that we keep the size of the files in
     384      3121956 :                 // check.
     385      3121956 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
     386        92000 :                     prev.2 += cur.2;
     387        92000 :                     Ok(prev)
     388              :                 } else {
     389      3029956 :                     Err((prev, cur))
     390              :                 }
     391      3121956 :             });
     392           42 : 
     393           42 :         // Merge the contents of all the input delta layers into a new set
     394           42 :         // of delta layers, based on the current partitioning.
     395           42 :         //
     396           42 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
     397           42 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
     398           42 :         // would be too large. In that case, we also split on the LSN dimension.
     399           42 :         //
     400           42 :         // LSN
     401           42 :         //  ^
     402           42 :         //  |
     403           42 :         //  | +-----------+            +--+--+--+--+
     404           42 :         //  | |           |            |  |  |  |  |
     405           42 :         //  | +-----------+            |  |  |  |  |
     406           42 :         //  | |           |            |  |  |  |  |
     407           42 :         //  | +-----------+     ==>    |  |  |  |  |
     408           42 :         //  | |           |            |  |  |  |  |
     409           42 :         //  | +-----------+            |  |  |  |  |
     410           42 :         //  | |           |            |  |  |  |  |
     411           42 :         //  | +-----------+            +--+--+--+--+
     412           42 :         //  |
     413           42 :         //  +--------------> key
     414           42 :         //
     415           42 :         //
     416           42 :         // If one key (X) has a lot of page versions:
     417           42 :         //
     418           42 :         // LSN
     419           42 :         //  ^
     420           42 :         //  |                                 (X)
     421           42 :         //  | +-----------+            +--+--+--+--+
     422           42 :         //  | |           |            |  |  |  |  |
     423           42 :         //  | +-----------+            |  |  +--+  |
     424           42 :         //  | |           |            |  |  |  |  |
     425           42 :         //  | +-----------+     ==>    |  |  |  |  |
     426           42 :         //  | |           |            |  |  +--+  |
     427           42 :         //  | +-----------+            |  |  |  |  |
     428           42 :         //  | |           |            |  |  |  |  |
     429           42 :         //  | +-----------+            +--+--+--+--+
     430           42 :         //  |
     431           42 :         //  +--------------> key
     432           42 :         // TODO: this actually divides the layers into fixed-size chunks, not
     433           42 :         // based on the partitioning.
     434           42 :         //
     435           42 :         // TODO: we should also opportunistically materialize and
     436           42 :         // garbage collect what we can.
     437           42 :         let mut new_layers = Vec::new();
     438           42 :         let mut prev_key: Option<Key> = None;
     439           42 :         let mut writer: Option<DeltaLayerWriter> = None;
     440           42 :         let mut key_values_total_size = 0u64;
     441           42 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
     442           42 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
     443              : 
     444              :         for &DeltaEntry {
     445      3121998 :             key, lsn, ref val, ..
     446      3122040 :         } in all_values_iter
     447              :         {
     448      3121998 :             let value = val.load(ctx).await?;
     449      3121998 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
     450      3121998 :             // We need to check key boundaries once we reach next key or end of layer with the same key
     451      3121998 :             if !same_key || lsn == dup_end_lsn {
     452      3029998 :                 let mut next_key_size = 0u64;
     453      3029998 :                 let is_dup_layer = dup_end_lsn.is_valid();
     454      3029998 :                 dup_start_lsn = Lsn::INVALID;
     455      3029998 :                 if !same_key {
     456      3029998 :                     dup_end_lsn = Lsn::INVALID;
     457      3029998 :                 }
     458              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
     459      3029998 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
     460      3029998 :                     next_key_size = next_size;
     461      3029998 :                     if key != next_key {
     462      3029956 :                         if dup_end_lsn.is_valid() {
     463            0 :                             // We are writting segment with duplicates:
     464            0 :                             // place all remaining values of this key in separate segment
     465            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
     466            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
     467      3029956 :                         }
     468      3029956 :                         break;
     469           42 :                     }
     470           42 :                     key_values_total_size += next_size;
     471           42 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
     472           42 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
     473           42 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
     474              :                         // Split key between multiple layers: such layer can contain only single key
     475            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
     476            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
     477              :                         } else {
     478            0 :                             lsn // start with the first LSN for this key
     479              :                         };
     480            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
     481            0 :                         break;
     482           42 :                     }
     483              :                 }
     484              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
     485      3029998 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
     486            0 :                     dup_start_lsn = dup_end_lsn;
     487            0 :                     dup_end_lsn = lsn_range.end;
     488      3029998 :                 }
     489      3029998 :                 if writer.is_some() {
     490      3029956 :                     let written_size = writer.as_mut().unwrap().size();
     491      3029956 :                     let contains_hole =
     492      3029956 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
     493              :                     // check if key cause layer overflow or contains hole...
     494      3029956 :                     if is_dup_layer
     495      3029956 :                         || dup_end_lsn.is_valid()
     496      3029956 :                         || written_size + key_values_total_size > target_file_size
     497      3029756 :                         || contains_hole
     498              :                     {
     499              :                         // ... if so, flush previous layer and prepare to write new one
     500          200 :                         new_layers.push(
     501          200 :                             writer
     502          200 :                                 .take()
     503          200 :                                 .unwrap()
     504          200 :                                 .finish(prev_key.unwrap().next(), self)
     505          508 :                                 .await?,
     506              :                         );
     507          200 :                         writer = None;
     508          200 : 
     509          200 :                         if contains_hole {
     510            0 :                             // skip hole
     511            0 :                             next_hole += 1;
     512          200 :                         }
     513      3029756 :                     }
     514           42 :                 }
     515              :                 // Remember size of key value because at next iteration we will access next item
     516      3029998 :                 key_values_total_size = next_key_size;
     517        92000 :             }
     518      3121998 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
     519            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
     520            0 :                     "failpoint delta-layer-writer-fail-before-finish"
     521            0 :                 )))
     522      3121998 :             });
     523              : 
     524      3121998 :             if !self.shard_identity.is_key_disposable(&key) {
     525      3121998 :                 if writer.is_none() {
     526          242 :                     // Create writer if not initiaized yet
     527          242 :                     writer = Some(
     528              :                         DeltaLayerWriter::new(
     529          242 :                             self.conf,
     530          242 :                             self.timeline_id,
     531          242 :                             self.tenant_shard_id,
     532          242 :                             key,
     533          242 :                             if dup_end_lsn.is_valid() {
     534              :                                 // this is a layer containing slice of values of the same key
     535            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
     536            0 :                                 dup_start_lsn..dup_end_lsn
     537              :                             } else {
     538          242 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     539          242 :                                 lsn_range.clone()
     540              :                             },
     541              :                         )
     542          121 :                         .await?,
     543              :                     );
     544      3121756 :                 }
     545              : 
     546      3121998 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
     547              :             } else {
     548            0 :                 debug!(
     549            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
     550            0 :                     key,
     551            0 :                     self.shard_identity.get_shard_number(&key)
     552            0 :                 );
     553              :             }
     554              : 
     555      3121998 :             if !new_layers.is_empty() {
     556        19786 :                 fail_point!("after-timeline-compacted-first-L1");
     557      3102212 :             }
     558              : 
     559      3121998 :             prev_key = Some(key);
     560              :         }
     561           42 :         if let Some(writer) = writer {
     562         3009 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
     563            0 :         }
     564              : 
     565              :         // Sync layers
     566           42 :         if !new_layers.is_empty() {
     567              :             // Print a warning if the created layer is larger than double the target size
     568              :             // Add two pages for potential overhead. This should in theory be already
     569              :             // accounted for in the target calculation, but for very small targets,
     570              :             // we still might easily hit the limit otherwise.
     571           42 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
     572          242 :             for layer in new_layers.iter() {
     573          242 :                 if layer.layer_desc().file_size > warn_limit {
     574            0 :                     warn!(
     575            0 :                         %layer,
     576            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
     577            0 :                     );
     578          242 :                 }
     579              :             }
     580              : 
     581              :             // The writer.finish() above already did the fsync of the inodes.
     582              :             // We just need to fsync the directory in which these inodes are linked,
     583              :             // which we know to be the timeline directory.
     584              :             //
     585              :             // We use fatal_err() below because the after writer.finish() returns with success,
     586              :             // the in-memory state of the filesystem already has the layer file in its final place,
     587              :             // and subsequent pageserver code could think it's durable while it really isn't.
     588           42 :             let timeline_dir = VirtualFile::open(
     589           42 :                 &self
     590           42 :                     .conf
     591           42 :                     .timeline_path(&self.tenant_shard_id, &self.timeline_id),
     592           42 :             )
     593           21 :             .await
     594           42 :             .fatal_err("VirtualFile::open for timeline dir fsync");
     595           42 :             timeline_dir
     596           42 :                 .sync_all()
     597           21 :                 .await
     598           42 :                 .fatal_err("VirtualFile::sync_all timeline dir");
     599            0 :         }
     600              : 
     601           42 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
     602           42 :         stats.new_deltas_count = Some(new_layers.len());
     603          242 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
     604           42 : 
     605           42 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
     606           42 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
     607              :         {
     608           42 :             Ok(stats_json) => {
     609           42 :                 info!(
     610           42 :                     stats_json = stats_json.as_str(),
     611           42 :                     "compact_level0_phase1 stats available"
     612           42 :                 )
     613              :             }
     614            0 :             Err(e) => {
     615            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
     616              :             }
     617              :         }
     618              : 
     619           42 :         Ok(CompactLevel0Phase1Result {
     620           42 :             new_layers,
     621           42 :             deltas_to_compact: deltas_to_compact
     622           42 :                 .into_iter()
     623          442 :                 .map(|x| x.drop_eviction_guard())
     624           42 :                 .collect::<Vec<_>>(),
     625           42 :         })
     626          510 :     }
     627              : }
     628              : 
     629              : #[derive(Default)]
     630              : struct CompactLevel0Phase1Result {
     631              :     new_layers: Vec<ResidentLayer>,
     632              :     deltas_to_compact: Vec<Layer>,
     633              : }
     634              : 
     635              : #[derive(Default)]
     636              : struct CompactLevel0Phase1StatsBuilder {
     637              :     version: Option<u64>,
     638              :     tenant_id: Option<TenantShardId>,
     639              :     timeline_id: Option<TimelineId>,
     640              :     read_lock_acquisition_micros: DurationRecorder,
     641              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
     642              :     read_lock_held_key_sort_micros: DurationRecorder,
     643              :     read_lock_held_prerequisites_micros: DurationRecorder,
     644              :     read_lock_held_compute_holes_micros: DurationRecorder,
     645              :     read_lock_drop_micros: DurationRecorder,
     646              :     write_layer_files_micros: DurationRecorder,
     647              :     level0_deltas_count: Option<usize>,
     648              :     new_deltas_count: Option<usize>,
     649              :     new_deltas_size: Option<u64>,
     650              : }
     651              : 
     652              : #[derive(serde::Serialize)]
     653              : struct CompactLevel0Phase1Stats {
     654              :     version: u64,
     655              :     tenant_id: TenantShardId,
     656              :     timeline_id: TimelineId,
     657              :     read_lock_acquisition_micros: RecordedDuration,
     658              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
     659              :     read_lock_held_key_sort_micros: RecordedDuration,
     660              :     read_lock_held_prerequisites_micros: RecordedDuration,
     661              :     read_lock_held_compute_holes_micros: RecordedDuration,
     662              :     read_lock_drop_micros: RecordedDuration,
     663              :     write_layer_files_micros: RecordedDuration,
     664              :     level0_deltas_count: usize,
     665              :     new_deltas_count: usize,
     666              :     new_deltas_size: u64,
     667              : }
     668              : 
     669              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
     670              :     type Error = anyhow::Error;
     671              : 
     672           42 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
     673           42 :         Ok(Self {
     674           42 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
     675           42 :             tenant_id: value
     676           42 :                 .tenant_id
     677           42 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
     678           42 :             timeline_id: value
     679           42 :                 .timeline_id
     680           42 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
     681           42 :             read_lock_acquisition_micros: value
     682           42 :                 .read_lock_acquisition_micros
     683           42 :                 .into_recorded()
     684           42 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
     685           42 :             read_lock_held_spawn_blocking_startup_micros: value
     686           42 :                 .read_lock_held_spawn_blocking_startup_micros
     687           42 :                 .into_recorded()
     688           42 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
     689           42 :             read_lock_held_key_sort_micros: value
     690           42 :                 .read_lock_held_key_sort_micros
     691           42 :                 .into_recorded()
     692           42 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
     693           42 :             read_lock_held_prerequisites_micros: value
     694           42 :                 .read_lock_held_prerequisites_micros
     695           42 :                 .into_recorded()
     696           42 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
     697           42 :             read_lock_held_compute_holes_micros: value
     698           42 :                 .read_lock_held_compute_holes_micros
     699           42 :                 .into_recorded()
     700           42 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
     701           42 :             read_lock_drop_micros: value
     702           42 :                 .read_lock_drop_micros
     703           42 :                 .into_recorded()
     704           42 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
     705           42 :             write_layer_files_micros: value
     706           42 :                 .write_layer_files_micros
     707           42 :                 .into_recorded()
     708           42 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
     709           42 :             level0_deltas_count: value
     710           42 :                 .level0_deltas_count
     711           42 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
     712           42 :             new_deltas_count: value
     713           42 :                 .new_deltas_count
     714           42 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
     715           42 :             new_deltas_size: value
     716           42 :                 .new_deltas_size
     717           42 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
     718              :         })
     719           42 :     }
     720              : }
     721              : 
     722              : impl Timeline {
     723              :     /// Entry point for new tiered compaction algorithm.
     724              :     ///
     725              :     /// All the real work is in the implementation in the pageserver_compaction
     726              :     /// crate. The code here would apply to any algorithm implemented by the
     727              :     /// same interface, but tiered is the only one at the moment.
     728              :     ///
     729              :     /// TODO: cancellation
     730            0 :     pub(crate) async fn compact_tiered(
     731            0 :         self: &Arc<Self>,
     732            0 :         _cancel: &CancellationToken,
     733            0 :         ctx: &RequestContext,
     734            0 :     ) -> Result<(), CompactionError> {
     735            0 :         let fanout = self.get_compaction_threshold() as u64;
     736            0 :         let target_file_size = self.get_checkpoint_distance();
     737              : 
     738              :         // Find the top of the historical layers
     739            0 :         let end_lsn = {
     740            0 :             let guard = self.layers.read().await;
     741            0 :             let layers = guard.layer_map();
     742              : 
     743            0 :             let l0_deltas = layers.get_level0_deltas()?;
     744            0 :             drop(guard);
     745            0 : 
     746            0 :             // As an optimization, if we find that there are too few L0 layers,
     747            0 :             // bail out early. We know that the compaction algorithm would do
     748            0 :             // nothing in that case.
     749            0 :             if l0_deltas.len() < fanout as usize {
     750              :                 // doesn't need compacting
     751            0 :                 return Ok(());
     752            0 :             }
     753            0 :             l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
     754            0 :         };
     755            0 : 
     756            0 :         // Is the timeline being deleted?
     757            0 :         if self.is_stopping() {
     758            0 :             trace!("Dropping out of compaction on timeline shutdown");
     759            0 :             return Err(CompactionError::ShuttingDown);
     760            0 :         }
     761              : 
     762            0 :         let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
     763            0 :         let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
     764            0 : 
     765            0 :         pageserver_compaction::compact_tiered::compact_tiered(
     766            0 :             &mut adaptor,
     767            0 :             end_lsn,
     768            0 :             target_file_size,
     769            0 :             fanout,
     770            0 :             ctx,
     771            0 :         )
     772            0 :         .await?;
     773              : 
     774            0 :         adaptor.flush_updates().await?;
     775            0 :         Ok(())
     776            0 :     }
     777              : }
     778              : 
     779              : struct TimelineAdaptor {
     780              :     timeline: Arc<Timeline>,
     781              : 
     782              :     keyspace: (Lsn, KeySpace),
     783              : 
     784              :     new_deltas: Vec<ResidentLayer>,
     785              :     new_images: Vec<ResidentLayer>,
     786              :     layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
     787              : }
     788              : 
     789              : impl TimelineAdaptor {
     790            0 :     pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
     791            0 :         Self {
     792            0 :             timeline: timeline.clone(),
     793            0 :             keyspace,
     794            0 :             new_images: Vec::new(),
     795            0 :             new_deltas: Vec::new(),
     796            0 :             layers_to_delete: Vec::new(),
     797            0 :         }
     798            0 :     }
     799              : 
     800            0 :     pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
     801            0 :         let layers_to_delete = {
     802            0 :             let guard = self.timeline.layers.read().await;
     803            0 :             self.layers_to_delete
     804            0 :                 .iter()
     805            0 :                 .map(|x| guard.get_from_desc(x))
     806            0 :                 .collect::<Vec<Layer>>()
     807            0 :         };
     808            0 :         self.timeline
     809            0 :             .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
     810            0 :             .await?;
     811              : 
     812            0 :         self.timeline
     813            0 :             .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
     814              : 
     815            0 :         self.new_deltas.clear();
     816            0 :         self.layers_to_delete.clear();
     817            0 :         Ok(())
     818            0 :     }
     819              : }
     820              : 
     821              : #[derive(Clone)]
     822              : struct ResidentDeltaLayer(ResidentLayer);
     823              : #[derive(Clone)]
     824              : struct ResidentImageLayer(ResidentLayer);
     825              : 
     826              : impl CompactionJobExecutor for TimelineAdaptor {
     827              :     type Key = crate::repository::Key;
     828              : 
     829              :     type Layer = OwnArc<PersistentLayerDesc>;
     830              :     type DeltaLayer = ResidentDeltaLayer;
     831              :     type ImageLayer = ResidentImageLayer;
     832              : 
     833              :     type RequestContext = crate::context::RequestContext;
     834              : 
     835            0 :     async fn get_layers(
     836            0 :         &mut self,
     837            0 :         key_range: &Range<Key>,
     838            0 :         lsn_range: &Range<Lsn>,
     839            0 :         _ctx: &RequestContext,
     840            0 :     ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
     841            0 :         self.flush_updates().await?;
     842              : 
     843            0 :         let guard = self.timeline.layers.read().await;
     844            0 :         let layer_map = guard.layer_map();
     845            0 : 
     846            0 :         let result = layer_map
     847            0 :             .iter_historic_layers()
     848            0 :             .filter(|l| {
     849            0 :                 overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
     850            0 :             })
     851            0 :             .map(OwnArc)
     852            0 :             .collect();
     853            0 :         Ok(result)
     854            0 :     }
     855              : 
     856            0 :     async fn get_keyspace(
     857            0 :         &mut self,
     858            0 :         key_range: &Range<Key>,
     859            0 :         lsn: Lsn,
     860            0 :         _ctx: &RequestContext,
     861            0 :     ) -> anyhow::Result<Vec<Range<Key>>> {
     862            0 :         if lsn == self.keyspace.0 {
     863            0 :             Ok(pageserver_compaction::helpers::intersect_keyspace(
     864            0 :                 &self.keyspace.1.ranges,
     865            0 :                 key_range,
     866            0 :             ))
     867              :         } else {
     868              :             // The current compaction implementatin only ever requests the key space
     869              :             // at the compaction end LSN.
     870            0 :             anyhow::bail!("keyspace not available for requested lsn");
     871              :         }
     872            0 :     }
     873              : 
     874            0 :     async fn downcast_delta_layer(
     875            0 :         &self,
     876            0 :         layer: &OwnArc<PersistentLayerDesc>,
     877            0 :     ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
     878            0 :         // this is a lot more complex than a simple downcast...
     879            0 :         if layer.is_delta() {
     880            0 :             let l = {
     881            0 :                 let guard = self.timeline.layers.read().await;
     882            0 :                 guard.get_from_desc(layer)
     883              :             };
     884            0 :             let result = l.download_and_keep_resident().await?;
     885              : 
     886            0 :             Ok(Some(ResidentDeltaLayer(result)))
     887              :         } else {
     888            0 :             Ok(None)
     889              :         }
     890            0 :     }
     891              : 
     892            0 :     async fn create_image(
     893            0 :         &mut self,
     894            0 :         lsn: Lsn,
     895            0 :         key_range: &Range<Key>,
     896            0 :         ctx: &RequestContext,
     897            0 :     ) -> anyhow::Result<()> {
     898            0 :         Ok(self.create_image_impl(lsn, key_range, ctx).await?)
     899            0 :     }
     900              : 
     901            0 :     async fn create_delta(
     902            0 :         &mut self,
     903            0 :         lsn_range: &Range<Lsn>,
     904            0 :         key_range: &Range<Key>,
     905            0 :         input_layers: &[ResidentDeltaLayer],
     906            0 :         ctx: &RequestContext,
     907            0 :     ) -> anyhow::Result<()> {
     908            0 :         debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
     909              : 
     910            0 :         let mut all_entries = Vec::new();
     911            0 :         for dl in input_layers.iter() {
     912            0 :             all_entries.extend(dl.load_keys(ctx).await?);
     913              :         }
     914              : 
     915              :         // The current stdlib sorting implementation is designed in a way where it is
     916              :         // particularly fast where the slice is made up of sorted sub-ranges.
     917            0 :         all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
     918              : 
     919            0 :         let mut writer = DeltaLayerWriter::new(
     920            0 :             self.timeline.conf,
     921            0 :             self.timeline.timeline_id,
     922            0 :             self.timeline.tenant_shard_id,
     923            0 :             key_range.start,
     924            0 :             lsn_range.clone(),
     925            0 :         )
     926            0 :         .await?;
     927              : 
     928            0 :         let mut dup_values = 0;
     929            0 : 
     930            0 :         // This iterator walks through all key-value pairs from all the layers
     931            0 :         // we're compacting, in key, LSN order.
     932            0 :         let mut prev: Option<(Key, Lsn)> = None;
     933              :         for &DeltaEntry {
     934            0 :             key, lsn, ref val, ..
     935            0 :         } in all_entries.iter()
     936              :         {
     937            0 :             if prev == Some((key, lsn)) {
     938              :                 // This is a duplicate. Skip it.
     939              :                 //
     940              :                 // It can happen if compaction is interrupted after writing some
     941              :                 // layers but not all, and we are compacting the range again.
     942              :                 // The calculations in the algorithm assume that there are no
     943              :                 // duplicates, so the math on targeted file size is likely off,
     944              :                 // and we will create smaller files than expected.
     945            0 :                 dup_values += 1;
     946            0 :                 continue;
     947            0 :             }
     948              : 
     949            0 :             let value = val.load(ctx).await?;
     950              : 
     951            0 :             writer.put_value(key, lsn, value).await?;
     952              : 
     953            0 :             prev = Some((key, lsn));
     954              :         }
     955              : 
     956            0 :         if dup_values > 0 {
     957            0 :             warn!("delta layer created with {} duplicate values", dup_values);
     958            0 :         }
     959              : 
     960            0 :         fail_point!("delta-layer-writer-fail-before-finish", |_| {
     961            0 :             Err(anyhow::anyhow!(
     962            0 :                 "failpoint delta-layer-writer-fail-before-finish"
     963            0 :             ))
     964            0 :         });
     965              : 
     966            0 :         let new_delta_layer = writer
     967            0 :             .finish(prev.unwrap().0.next(), &self.timeline)
     968            0 :             .await?;
     969              : 
     970            0 :         self.new_deltas.push(new_delta_layer);
     971            0 :         Ok(())
     972            0 :     }
     973              : 
     974            0 :     async fn delete_layer(
     975            0 :         &mut self,
     976            0 :         layer: &OwnArc<PersistentLayerDesc>,
     977            0 :         _ctx: &RequestContext,
     978            0 :     ) -> anyhow::Result<()> {
     979            0 :         self.layers_to_delete.push(layer.clone().0);
     980            0 :         Ok(())
     981            0 :     }
     982              : }
     983              : 
     984              : impl TimelineAdaptor {
     985            0 :     async fn create_image_impl(
     986            0 :         &mut self,
     987            0 :         lsn: Lsn,
     988            0 :         key_range: &Range<Key>,
     989            0 :         ctx: &RequestContext,
     990            0 :     ) -> Result<(), PageReconstructError> {
     991            0 :         let timer = self.timeline.metrics.create_images_time_histo.start_timer();
     992              : 
     993            0 :         let mut image_layer_writer = ImageLayerWriter::new(
     994            0 :             self.timeline.conf,
     995            0 :             self.timeline.timeline_id,
     996            0 :             self.timeline.tenant_shard_id,
     997            0 :             key_range,
     998            0 :             lsn,
     999            0 :         )
    1000            0 :         .await?;
    1001              : 
    1002            0 :         fail_point!("image-layer-writer-fail-before-finish", |_| {
    1003            0 :             Err(PageReconstructError::Other(anyhow::anyhow!(
    1004            0 :                 "failpoint image-layer-writer-fail-before-finish"
    1005            0 :             )))
    1006            0 :         });
    1007            0 :         let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
    1008            0 :         for range in &keyspace_ranges {
    1009            0 :             let mut key = range.start;
    1010            0 :             while key < range.end {
    1011            0 :                 let img = match self.timeline.get(key, lsn, ctx).await {
    1012            0 :                     Ok(img) => img,
    1013            0 :                     Err(err) => {
    1014            0 :                         // If we fail to reconstruct a VM or FSM page, we can zero the
    1015            0 :                         // page without losing any actual user data. That seems better
    1016            0 :                         // than failing repeatedly and getting stuck.
    1017            0 :                         //
    1018            0 :                         // We had a bug at one point, where we truncated the FSM and VM
    1019            0 :                         // in the pageserver, but the Postgres didn't know about that
    1020            0 :                         // and continued to generate incremental WAL records for pages
    1021            0 :                         // that didn't exist in the pageserver. Trying to replay those
    1022            0 :                         // WAL records failed to find the previous image of the page.
    1023            0 :                         // This special case allows us to recover from that situation.
    1024            0 :                         // See https://github.com/neondatabase/neon/issues/2601.
    1025            0 :                         //
    1026            0 :                         // Unfortunately we cannot do this for the main fork, or for
    1027            0 :                         // any metadata keys, keys, as that would lead to actual data
    1028            0 :                         // loss.
    1029            0 :                         if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    1030            0 :                             warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    1031            0 :                             ZERO_PAGE.clone()
    1032              :                         } else {
    1033            0 :                             return Err(err);
    1034              :                         }
    1035              :                     }
    1036              :                 };
    1037            0 :                 image_layer_writer.put_image(key, img).await?;
    1038            0 :                 key = key.next();
    1039              :             }
    1040              :         }
    1041            0 :         let image_layer = image_layer_writer.finish(&self.timeline).await?;
    1042              : 
    1043            0 :         self.new_images.push(image_layer);
    1044            0 : 
    1045            0 :         timer.stop_and_record();
    1046            0 : 
    1047            0 :         Ok(())
    1048            0 :     }
    1049              : }
    1050              : 
    1051              : impl CompactionRequestContext for crate::context::RequestContext {}
    1052              : 
    1053              : #[derive(Debug, Clone)]
    1054              : pub struct OwnArc<T>(pub Arc<T>);
    1055              : 
    1056              : impl<T> Deref for OwnArc<T> {
    1057              :     type Target = <Arc<T> as Deref>::Target;
    1058            0 :     fn deref(&self) -> &Self::Target {
    1059            0 :         &self.0
    1060            0 :     }
    1061              : }
    1062              : 
    1063              : impl<T> AsRef<T> for OwnArc<T> {
    1064            0 :     fn as_ref(&self) -> &T {
    1065            0 :         self.0.as_ref()
    1066            0 :     }
    1067              : }
    1068              : 
    1069              : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
    1070            0 :     fn key_range(&self) -> &Range<Key> {
    1071            0 :         &self.key_range
    1072            0 :     }
    1073            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1074            0 :         &self.lsn_range
    1075            0 :     }
    1076            0 :     fn file_size(&self) -> u64 {
    1077            0 :         self.file_size
    1078            0 :     }
    1079            0 :     fn short_id(&self) -> std::string::String {
    1080            0 :         self.as_ref().short_id().to_string()
    1081            0 :     }
    1082            0 :     fn is_delta(&self) -> bool {
    1083            0 :         self.as_ref().is_delta()
    1084            0 :     }
    1085              : }
    1086              : 
    1087              : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
    1088            0 :     fn key_range(&self) -> &Range<Key> {
    1089            0 :         &self.layer_desc().key_range
    1090            0 :     }
    1091            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1092            0 :         &self.layer_desc().lsn_range
    1093            0 :     }
    1094            0 :     fn file_size(&self) -> u64 {
    1095            0 :         self.layer_desc().file_size
    1096            0 :     }
    1097            0 :     fn short_id(&self) -> std::string::String {
    1098            0 :         self.layer_desc().short_id().to_string()
    1099            0 :     }
    1100            0 :     fn is_delta(&self) -> bool {
    1101            0 :         true
    1102            0 :     }
    1103              : }
    1104              : 
    1105              : use crate::tenant::timeline::DeltaEntry;
    1106              : 
    1107              : impl CompactionLayer<Key> for ResidentDeltaLayer {
    1108            0 :     fn key_range(&self) -> &Range<Key> {
    1109            0 :         &self.0.layer_desc().key_range
    1110            0 :     }
    1111            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1112            0 :         &self.0.layer_desc().lsn_range
    1113            0 :     }
    1114            0 :     fn file_size(&self) -> u64 {
    1115            0 :         self.0.layer_desc().file_size
    1116            0 :     }
    1117            0 :     fn short_id(&self) -> std::string::String {
    1118            0 :         self.0.layer_desc().short_id().to_string()
    1119            0 :     }
    1120            0 :     fn is_delta(&self) -> bool {
    1121            0 :         true
    1122            0 :     }
    1123              : }
    1124              : 
    1125              : #[async_trait]
    1126              : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
    1127              :     type DeltaEntry<'a> = DeltaEntry<'a>;
    1128              : 
    1129            0 :     async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
    1130            0 :         self.0.load_keys(ctx).await
    1131            0 :     }
    1132              : }
    1133              : 
    1134              : impl CompactionLayer<Key> for ResidentImageLayer {
    1135            0 :     fn key_range(&self) -> &Range<Key> {
    1136            0 :         &self.0.layer_desc().key_range
    1137            0 :     }
    1138            0 :     fn lsn_range(&self) -> &Range<Lsn> {
    1139            0 :         &self.0.layer_desc().lsn_range
    1140            0 :     }
    1141            0 :     fn file_size(&self) -> u64 {
    1142            0 :         self.0.layer_desc().file_size
    1143            0 :     }
    1144            0 :     fn short_id(&self) -> std::string::String {
    1145            0 :         self.0.layer_desc().short_id().to_string()
    1146            0 :     }
    1147            0 :     fn is_delta(&self) -> bool {
    1148            0 :         false
    1149            0 :     }
    1150              : }
    1151              : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
        

Generated by: LCOV version 2.1-beta