Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
25 : use serde::Serialize;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{debug, info, info_span, trace, warn, Instrument};
28 : use utils::id::TimelineId;
29 :
30 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
31 : use crate::page_cache;
32 : use crate::tenant::checks::check_valid_layermap;
33 : use crate::tenant::remote_timeline_client::WaitCompletionError;
34 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
35 : use crate::tenant::storage_layer::split_writer::{
36 : SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
37 : };
38 : use crate::tenant::storage_layer::{
39 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
40 : };
41 : use crate::tenant::timeline::ImageLayerCreationOutcome;
42 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
43 : use crate::tenant::timeline::{Layer, ResidentLayer};
44 : use crate::tenant::DeltaLayer;
45 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
46 : use pageserver_api::config::tenant_conf_defaults::{
47 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
48 : };
49 :
50 : use crate::keyspace::KeySpace;
51 : use crate::repository::{Key, Value};
52 : use crate::walrecord::NeonWalRecord;
53 :
54 : use utils::lsn::Lsn;
55 :
56 : use pageserver_compaction::helpers::overlaps_with;
57 : use pageserver_compaction::interface::*;
58 :
59 : use super::CompactionError;
60 :
61 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
62 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
63 :
64 : /// The result of bottom-most compaction for a single key at each LSN.
65 : #[derive(Debug)]
66 : #[cfg_attr(test, derive(PartialEq))]
67 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
68 :
69 : /// The result of bottom-most compaction.
70 : #[derive(Debug)]
71 : #[cfg_attr(test, derive(PartialEq))]
72 : pub(crate) struct KeyHistoryRetention {
73 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
74 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
75 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
76 : pub(crate) above_horizon: KeyLogAtLsn,
77 : }
78 :
79 : impl KeyHistoryRetention {
80 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
81 : ///
82 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
83 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
84 : /// And we have branches at LSN 0x10, 0x20, 0x30.
85 : /// Then we delete branch @ 0x20.
86 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
87 : /// And that wouldnt' change the shape of the layer.
88 : ///
89 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
90 : ///
91 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
92 114 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
93 114 : if dry_run {
94 0 : return true;
95 114 : }
96 114 : let guard = tline.layers.read().await;
97 114 : if !guard.contains_key(key) {
98 66 : return false;
99 48 : }
100 48 : let layer_generation = guard.get_from_key(key).metadata().generation;
101 48 : drop(guard);
102 48 : if layer_generation == tline.generation {
103 48 : info!(
104 : key=%key,
105 : ?layer_generation,
106 0 : "discard layer due to duplicated layer key in the same generation",
107 : );
108 48 : true
109 : } else {
110 0 : false
111 : }
112 114 : }
113 :
114 : /// Pipe a history of a single key to the writers.
115 : ///
116 : /// If `image_writer` is none, the images will be placed into the delta layers.
117 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
118 : #[allow(clippy::too_many_arguments)]
119 1266 : async fn pipe_to(
120 1266 : self,
121 1266 : key: Key,
122 1266 : tline: &Arc<Timeline>,
123 1266 : delta_writer: &mut SplitDeltaLayerWriter,
124 1266 : mut image_writer: Option<&mut SplitImageLayerWriter>,
125 1266 : stat: &mut CompactionStatistics,
126 1266 : dry_run: bool,
127 1266 : ctx: &RequestContext,
128 1266 : ) -> anyhow::Result<()> {
129 1266 : let mut first_batch = true;
130 1266 : let discard = |key: &PersistentLayerKey| {
131 0 : let key = key.clone();
132 0 : async move { Self::discard_key(&key, tline, dry_run).await }
133 0 : };
134 4206 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
135 2940 : if first_batch {
136 1266 : if logs.len() == 1 && logs[0].1.is_image() {
137 1224 : let Value::Image(img) = &logs[0].1 else {
138 0 : unreachable!()
139 : };
140 1224 : stat.produce_image_key(img);
141 1224 : if let Some(image_writer) = image_writer.as_mut() {
142 1224 : image_writer
143 1224 : .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
144 1242 : .await?;
145 : } else {
146 0 : delta_writer
147 0 : .put_value_with_discard_fn(
148 0 : key,
149 0 : cutoff_lsn,
150 0 : Value::Image(img.clone()),
151 0 : tline,
152 0 : ctx,
153 0 : discard,
154 0 : )
155 0 : .await?;
156 : }
157 : } else {
158 84 : for (lsn, val) in logs {
159 42 : stat.produce_key(&val);
160 42 : delta_writer
161 42 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
162 3 : .await?;
163 : }
164 : }
165 1266 : first_batch = false;
166 : } else {
167 1920 : for (lsn, val) in logs {
168 246 : stat.produce_key(&val);
169 246 : delta_writer
170 246 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
171 24 : .await?;
172 : }
173 : }
174 : }
175 1266 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
176 1362 : for (lsn, val) in above_horizon_logs {
177 96 : stat.produce_key(&val);
178 96 : delta_writer
179 96 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
180 6 : .await?;
181 : }
182 1266 : Ok(())
183 1266 : }
184 : }
185 :
186 : #[derive(Debug, Serialize, Default)]
187 : struct CompactionStatisticsNumSize {
188 : num: u64,
189 : size: u64,
190 : }
191 :
192 : #[derive(Debug, Serialize, Default)]
193 : pub struct CompactionStatistics {
194 : delta_layer_visited: CompactionStatisticsNumSize,
195 : image_layer_visited: CompactionStatisticsNumSize,
196 : delta_layer_produced: CompactionStatisticsNumSize,
197 : image_layer_produced: CompactionStatisticsNumSize,
198 : num_delta_layer_discarded: usize,
199 : num_image_layer_discarded: usize,
200 : num_unique_keys_visited: usize,
201 : wal_keys_visited: CompactionStatisticsNumSize,
202 : image_keys_visited: CompactionStatisticsNumSize,
203 : wal_produced: CompactionStatisticsNumSize,
204 : image_produced: CompactionStatisticsNumSize,
205 : }
206 :
207 : impl CompactionStatistics {
208 2058 : fn estimated_size_of_value(val: &Value) -> usize {
209 798 : match val {
210 1260 : Value::Image(img) => img.len(),
211 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
212 798 : _ => std::mem::size_of::<NeonWalRecord>(),
213 : }
214 2058 : }
215 3288 : fn estimated_size_of_key() -> usize {
216 3288 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
217 3288 : }
218 138 : fn visit_delta_layer(&mut self, size: u64) {
219 138 : self.delta_layer_visited.num += 1;
220 138 : self.delta_layer_visited.size += size;
221 138 : }
222 108 : fn visit_image_layer(&mut self, size: u64) {
223 108 : self.image_layer_visited.num += 1;
224 108 : self.image_layer_visited.size += size;
225 108 : }
226 1266 : fn on_unique_key_visited(&mut self) {
227 1266 : self.num_unique_keys_visited += 1;
228 1266 : }
229 420 : fn visit_wal_key(&mut self, val: &Value) {
230 420 : self.wal_keys_visited.num += 1;
231 420 : self.wal_keys_visited.size +=
232 420 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
233 420 : }
234 1260 : fn visit_image_key(&mut self, val: &Value) {
235 1260 : self.image_keys_visited.num += 1;
236 1260 : self.image_keys_visited.size +=
237 1260 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
238 1260 : }
239 384 : fn produce_key(&mut self, val: &Value) {
240 384 : match val {
241 6 : Value::Image(img) => self.produce_image_key(img),
242 378 : Value::WalRecord(_) => self.produce_wal_key(val),
243 : }
244 384 : }
245 378 : fn produce_wal_key(&mut self, val: &Value) {
246 378 : self.wal_produced.num += 1;
247 378 : self.wal_produced.size +=
248 378 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
249 378 : }
250 1230 : fn produce_image_key(&mut self, val: &Bytes) {
251 1230 : self.image_produced.num += 1;
252 1230 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
253 1230 : }
254 24 : fn discard_delta_layer(&mut self) {
255 24 : self.num_delta_layer_discarded += 1;
256 24 : }
257 24 : fn discard_image_layer(&mut self) {
258 24 : self.num_image_layer_discarded += 1;
259 24 : }
260 30 : fn produce_delta_layer(&mut self, size: u64) {
261 30 : self.delta_layer_produced.num += 1;
262 30 : self.delta_layer_produced.size += size;
263 30 : }
264 36 : fn produce_image_layer(&mut self, size: u64) {
265 36 : self.image_layer_produced.num += 1;
266 36 : self.image_layer_produced.size += size;
267 36 : }
268 : }
269 :
270 : impl Timeline {
271 : /// TODO: cancellation
272 : ///
273 : /// Returns whether the compaction has pending tasks.
274 1092 : pub(crate) async fn compact_legacy(
275 1092 : self: &Arc<Self>,
276 1092 : cancel: &CancellationToken,
277 1092 : flags: EnumSet<CompactFlags>,
278 1092 : ctx: &RequestContext,
279 1092 : ) -> Result<bool, CompactionError> {
280 1092 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
281 0 : self.compact_with_gc(cancel, flags, ctx)
282 0 : .await
283 0 : .map_err(CompactionError::Other)?;
284 0 : return Ok(false);
285 1092 : }
286 1092 :
287 1092 : if flags.contains(CompactFlags::DryRun) {
288 0 : return Err(CompactionError::Other(anyhow!(
289 0 : "dry-run mode is not supported for legacy compaction for now"
290 0 : )));
291 1092 : }
292 1092 :
293 1092 : // High level strategy for compaction / image creation:
294 1092 : //
295 1092 : // 1. First, calculate the desired "partitioning" of the
296 1092 : // currently in-use key space. The goal is to partition the
297 1092 : // key space into roughly fixed-size chunks, but also take into
298 1092 : // account any existing image layers, and try to align the
299 1092 : // chunk boundaries with the existing image layers to avoid
300 1092 : // too much churn. Also try to align chunk boundaries with
301 1092 : // relation boundaries. In principle, we don't know about
302 1092 : // relation boundaries here, we just deal with key-value
303 1092 : // pairs, and the code in pgdatadir_mapping.rs knows how to
304 1092 : // map relations into key-value pairs. But in practice we know
305 1092 : // that 'field6' is the block number, and the fields 1-5
306 1092 : // identify a relation. This is just an optimization,
307 1092 : // though.
308 1092 : //
309 1092 : // 2. Once we know the partitioning, for each partition,
310 1092 : // decide if it's time to create a new image layer. The
311 1092 : // criteria is: there has been too much "churn" since the last
312 1092 : // image layer? The "churn" is fuzzy concept, it's a
313 1092 : // combination of too many delta files, or too much WAL in
314 1092 : // total in the delta file. Or perhaps: if creating an image
315 1092 : // file would allow to delete some older files.
316 1092 : //
317 1092 : // 3. After that, we compact all level0 delta files if there
318 1092 : // are too many of them. While compacting, we also garbage
319 1092 : // collect any page versions that are no longer needed because
320 1092 : // of the new image layers we created in step 2.
321 1092 : //
322 1092 : // TODO: This high level strategy hasn't been implemented yet.
323 1092 : // Below are functions compact_level0() and create_image_layers()
324 1092 : // but they are a bit ad hoc and don't quite work like it's explained
325 1092 : // above. Rewrite it.
326 1092 :
327 1092 : // Is the timeline being deleted?
328 1092 : if self.is_stopping() {
329 0 : trace!("Dropping out of compaction on timeline shutdown");
330 0 : return Err(CompactionError::ShuttingDown);
331 1092 : }
332 1092 :
333 1092 : let target_file_size = self.get_checkpoint_distance();
334 :
335 : // Define partitioning schema if needed
336 :
337 : // FIXME: the match should only cover repartitioning, not the next steps
338 1092 : let (partition_count, has_pending_tasks) = match self
339 1092 : .repartition(
340 1092 : self.get_last_record_lsn(),
341 1092 : self.get_compaction_target_size(),
342 1092 : flags,
343 1092 : ctx,
344 1092 : )
345 48003 : .await
346 : {
347 1092 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
348 1092 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
349 1092 : let image_ctx = RequestContextBuilder::extend(ctx)
350 1092 : .access_stats_behavior(AccessStatsBehavior::Skip)
351 1092 : .build();
352 1092 :
353 1092 : // 2. Compact
354 1092 : let timer = self.metrics.compact_time_histo.start_timer();
355 29656 : let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
356 1092 : timer.stop_and_record();
357 1092 :
358 1092 : let mut partitioning = dense_partitioning;
359 1092 : partitioning
360 1092 : .parts
361 1092 : .extend(sparse_partitioning.into_dense().parts);
362 1092 :
363 1092 : // 3. Create new image layers for partitions that have been modified
364 1092 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
365 1092 : if fully_compacted {
366 1092 : let image_layers = self
367 1092 : .create_image_layers(
368 1092 : &partitioning,
369 1092 : lsn,
370 1092 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
371 42 : ImageLayerCreationMode::Force
372 : } else {
373 1050 : ImageLayerCreationMode::Try
374 : },
375 1092 : &image_ctx,
376 : )
377 40891 : .await?;
378 :
379 1092 : self.upload_new_image_layers(image_layers)?;
380 : } else {
381 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
382 : }
383 1092 : (partitioning.parts.len(), !fully_compacted)
384 : }
385 0 : Err(err) => {
386 0 : // no partitioning? This is normal, if the timeline was just created
387 0 : // as an empty timeline. Also in unit tests, when we use the timeline
388 0 : // as a simple key-value store, ignoring the datadir layout. Log the
389 0 : // error but continue.
390 0 : //
391 0 : // Suppress error when it's due to cancellation
392 0 : if !self.cancel.is_cancelled() {
393 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
394 0 : }
395 0 : (1, false)
396 : }
397 : };
398 :
399 1092 : if self.shard_identity.count >= ShardCount::new(2) {
400 : // Limit the number of layer rewrites to the number of partitions: this means its
401 : // runtime should be comparable to a full round of image layer creations, rather than
402 : // being potentially much longer.
403 0 : let rewrite_max = partition_count;
404 0 :
405 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
406 1092 : }
407 :
408 1092 : Ok(has_pending_tasks)
409 1092 : }
410 :
411 : /// Check for layers that are elegible to be rewritten:
412 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
413 : /// we don't indefinitely retain keys in this shard that aren't needed.
414 : /// - For future use: layers beyond pitr_interval that are in formats we would
415 : /// rather not maintain compatibility with indefinitely.
416 : ///
417 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
418 : /// how much work it will try to do in each compaction pass.
419 0 : async fn compact_shard_ancestors(
420 0 : self: &Arc<Self>,
421 0 : rewrite_max: usize,
422 0 : ctx: &RequestContext,
423 0 : ) -> Result<(), CompactionError> {
424 0 : let mut drop_layers = Vec::new();
425 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
426 0 :
427 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
428 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
429 0 : // pitr_interval, for example because a branchpoint references it.
430 0 : //
431 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
432 0 : // are rewriting layers.
433 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
434 0 :
435 0 : tracing::info!(
436 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
437 0 : *latest_gc_cutoff,
438 0 : self.gc_info.read().unwrap().cutoffs.time
439 : );
440 :
441 0 : let layers = self.layers.read().await;
442 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
443 0 : let layer = layers.get_from_desc(&layer_desc);
444 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
445 : // This layer does not belong to a historic ancestor, no need to re-image it.
446 0 : continue;
447 0 : }
448 0 :
449 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
450 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
451 0 : let layer_local_page_count = sharded_range.page_count();
452 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
453 0 : if layer_local_page_count == 0 {
454 : // This ancestral layer only covers keys that belong to other shards.
455 : // We include the full metadata in the log: if we had some critical bug that caused
456 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
457 0 : info!(%layer, old_metadata=?layer.metadata(),
458 0 : "dropping layer after shard split, contains no keys for this shard.",
459 : );
460 :
461 0 : if cfg!(debug_assertions) {
462 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
463 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
464 : // should be !is_key_disposable()
465 0 : let range = layer_desc.get_key_range();
466 0 : let mut key = range.start;
467 0 : while key < range.end {
468 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
469 0 : key = key.next();
470 : }
471 0 : }
472 :
473 0 : drop_layers.push(layer);
474 0 : continue;
475 0 : } else if layer_local_page_count != u32::MAX
476 0 : && layer_local_page_count == layer_raw_page_count
477 : {
478 0 : debug!(%layer,
479 0 : "layer is entirely shard local ({} keys), no need to filter it",
480 : layer_local_page_count
481 : );
482 0 : continue;
483 0 : }
484 0 :
485 0 : // Don't bother re-writing a layer unless it will at least halve its size
486 0 : if layer_local_page_count != u32::MAX
487 0 : && layer_local_page_count > layer_raw_page_count / 2
488 : {
489 0 : debug!(%layer,
490 0 : "layer is already mostly local ({}/{}), not rewriting",
491 : layer_local_page_count,
492 : layer_raw_page_count
493 : );
494 0 : }
495 :
496 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
497 : // without incurring the I/O cost of a rewrite.
498 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
499 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
500 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
501 0 : continue;
502 0 : }
503 0 :
504 0 : if layer_desc.is_delta() {
505 : // We do not yet implement rewrite of delta layers
506 0 : debug!(%layer, "Skipping rewrite of delta layer");
507 0 : continue;
508 0 : }
509 0 :
510 0 : // Only rewrite layers if their generations differ. This guarantees:
511 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
512 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
513 0 : if layer.metadata().generation == self.generation {
514 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
515 0 : continue;
516 0 : }
517 0 :
518 0 : if layers_to_rewrite.len() >= rewrite_max {
519 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
520 0 : layers_to_rewrite.len()
521 : );
522 0 : continue;
523 0 : }
524 0 :
525 0 : // Fall through: all our conditions for doing a rewrite passed.
526 0 : layers_to_rewrite.push(layer);
527 : }
528 :
529 : // Drop read lock on layer map before we start doing time-consuming I/O
530 0 : drop(layers);
531 0 :
532 0 : let mut replace_image_layers = Vec::new();
533 :
534 0 : for layer in layers_to_rewrite {
535 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
536 0 : let mut image_layer_writer = ImageLayerWriter::new(
537 0 : self.conf,
538 0 : self.timeline_id,
539 0 : self.tenant_shard_id,
540 0 : &layer.layer_desc().key_range,
541 0 : layer.layer_desc().image_layer_lsn(),
542 0 : ctx,
543 0 : )
544 0 : .await
545 0 : .map_err(CompactionError::Other)?;
546 :
547 : // Safety of layer rewrites:
548 : // - We are writing to a different local file path than we are reading from, so the old Layer
549 : // cannot interfere with the new one.
550 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
551 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
552 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
553 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
554 : // - Any readers that have a reference to the old layer will keep it alive until they are done
555 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
556 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
557 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
558 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
559 : // - ingestion, which only inserts layers, therefore cannot collide with us.
560 0 : let resident = layer.download_and_keep_resident().await?;
561 :
562 0 : let keys_written = resident
563 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
564 0 : .await?;
565 :
566 0 : if keys_written > 0 {
567 0 : let (desc, path) = image_layer_writer
568 0 : .finish(ctx)
569 0 : .await
570 0 : .map_err(CompactionError::Other)?;
571 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
572 0 : .map_err(CompactionError::Other)?;
573 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
574 0 : layer.metadata().file_size,
575 0 : new_layer.metadata().file_size);
576 :
577 0 : replace_image_layers.push((layer, new_layer));
578 0 : } else {
579 0 : // Drop the old layer. Usually for this case we would already have noticed that
580 0 : // the layer has no data for us with the ShardedRange check above, but
581 0 : drop_layers.push(layer);
582 0 : }
583 : }
584 :
585 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
586 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
587 : // to remote index) and be removed. This is inefficient but safe.
588 0 : fail::fail_point!("compact-shard-ancestors-localonly");
589 0 :
590 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
591 0 : self.rewrite_layers(replace_image_layers, drop_layers)
592 0 : .await?;
593 :
594 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
595 0 :
596 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
597 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
598 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
599 0 : // load.
600 0 : match self.remote_client.wait_completion().await {
601 0 : Ok(()) => (),
602 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
603 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
604 0 : return Err(CompactionError::ShuttingDown)
605 : }
606 : }
607 :
608 0 : fail::fail_point!("compact-shard-ancestors-persistent");
609 0 :
610 0 : Ok(())
611 0 : }
612 :
613 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
614 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
615 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
616 : ///
617 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
618 : /// that we know won't be needed for reads.
619 588 : pub(super) async fn update_layer_visibility(
620 588 : &self,
621 588 : ) -> Result<(), super::layer_manager::Shutdown> {
622 588 : let head_lsn = self.get_last_record_lsn();
623 :
624 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
625 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
626 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
627 : // they will be subject to L0->L1 compaction in the near future.
628 588 : let layer_manager = self.layers.read().await;
629 588 : let layer_map = layer_manager.layer_map()?;
630 :
631 588 : let readable_points = {
632 588 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
633 588 :
634 588 : let mut readable_points = Vec::with_capacity(children.len() + 1);
635 588 : for (child_lsn, _child_timeline_id) in &children {
636 0 : readable_points.push(*child_lsn);
637 0 : }
638 588 : readable_points.push(head_lsn);
639 588 : readable_points
640 588 : };
641 588 :
642 588 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
643 1512 : for (layer_desc, visibility) in layer_visibility {
644 924 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
645 924 : let layer = layer_manager.get_from_desc(&layer_desc);
646 924 : layer.set_visibility(visibility);
647 924 : }
648 :
649 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
650 : // avoid assuming that everything at a branch point is visible.
651 588 : drop(covered);
652 588 : Ok(())
653 588 : }
654 :
655 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
656 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
657 1092 : async fn compact_level0(
658 1092 : self: &Arc<Self>,
659 1092 : target_file_size: u64,
660 1092 : ctx: &RequestContext,
661 1092 : ) -> Result<bool, CompactionError> {
662 : let CompactLevel0Phase1Result {
663 1092 : new_layers,
664 1092 : deltas_to_compact,
665 1092 : fully_compacted,
666 : } = {
667 1092 : let phase1_span = info_span!("compact_level0_phase1");
668 1092 : let ctx = ctx.attached_child();
669 1092 : let mut stats = CompactLevel0Phase1StatsBuilder {
670 1092 : version: Some(2),
671 1092 : tenant_id: Some(self.tenant_shard_id),
672 1092 : timeline_id: Some(self.timeline_id),
673 1092 : ..Default::default()
674 1092 : };
675 1092 :
676 1092 : let begin = tokio::time::Instant::now();
677 1092 : let phase1_layers_locked = self.layers.read().await;
678 1092 : let now = tokio::time::Instant::now();
679 1092 : stats.read_lock_acquisition_micros =
680 1092 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
681 1092 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
682 1092 : .instrument(phase1_span)
683 29652 : .await?
684 : };
685 :
686 1092 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
687 : // nothing to do
688 1008 : return Ok(true);
689 84 : }
690 84 :
691 84 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
692 3 : .await?;
693 84 : Ok(fully_compacted)
694 1092 : }
695 :
696 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
697 1092 : async fn compact_level0_phase1<'a>(
698 1092 : self: &'a Arc<Self>,
699 1092 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
700 1092 : mut stats: CompactLevel0Phase1StatsBuilder,
701 1092 : target_file_size: u64,
702 1092 : ctx: &RequestContext,
703 1092 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
704 1092 : stats.read_lock_held_spawn_blocking_startup_micros =
705 1092 : stats.read_lock_acquisition_micros.till_now(); // set by caller
706 1092 : let layers = guard.layer_map()?;
707 1092 : let level0_deltas = layers.level0_deltas();
708 1092 : stats.level0_deltas_count = Some(level0_deltas.len());
709 1092 :
710 1092 : // Only compact if enough layers have accumulated.
711 1092 : let threshold = self.get_compaction_threshold();
712 1092 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
713 1008 : debug!(
714 0 : level0_deltas = level0_deltas.len(),
715 0 : threshold, "too few deltas to compact"
716 : );
717 1008 : return Ok(CompactLevel0Phase1Result::default());
718 84 : }
719 84 :
720 84 : let mut level0_deltas = level0_deltas
721 84 : .iter()
722 1206 : .map(|x| guard.get_from_desc(x))
723 84 : .collect::<Vec<_>>();
724 84 :
725 84 : // Gather the files to compact in this iteration.
726 84 : //
727 84 : // Start with the oldest Level 0 delta file, and collect any other
728 84 : // level 0 files that form a contiguous sequence, such that the end
729 84 : // LSN of previous file matches the start LSN of the next file.
730 84 : //
731 84 : // Note that if the files don't form such a sequence, we might
732 84 : // "compact" just a single file. That's a bit pointless, but it allows
733 84 : // us to get rid of the level 0 file, and compact the other files on
734 84 : // the next iteration. This could probably made smarter, but such
735 84 : // "gaps" in the sequence of level 0 files should only happen in case
736 84 : // of a crash, partial download from cloud storage, or something like
737 84 : // that, so it's not a big deal in practice.
738 2244 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
739 84 : let mut level0_deltas_iter = level0_deltas.iter();
740 84 :
741 84 : let first_level0_delta = level0_deltas_iter.next().unwrap();
742 84 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
743 84 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
744 84 :
745 84 : // Accumulate the size of layers in `deltas_to_compact`
746 84 : let mut deltas_to_compact_bytes = 0;
747 84 :
748 84 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
749 84 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
750 84 : // work in this function to only operate on this much delta data at once.
751 84 : //
752 84 : // Take the max of the configured value & the default, so that tests that configure tiny values
753 84 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
754 84 : // still let them compact a full stack of L0s in one go.
755 84 : let delta_size_limit = std::cmp::max(
756 84 : self.get_compaction_threshold(),
757 84 : DEFAULT_COMPACTION_THRESHOLD,
758 84 : ) as u64
759 84 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
760 84 :
761 84 : let mut fully_compacted = true;
762 84 :
763 84 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
764 1206 : for l in level0_deltas_iter {
765 1122 : let lsn_range = &l.layer_desc().lsn_range;
766 1122 :
767 1122 : if lsn_range.start != prev_lsn_end {
768 0 : break;
769 1122 : }
770 1122 : deltas_to_compact.push(l.download_and_keep_resident().await?);
771 1122 : deltas_to_compact_bytes += l.metadata().file_size;
772 1122 : prev_lsn_end = lsn_range.end;
773 1122 :
774 1122 : if deltas_to_compact_bytes >= delta_size_limit {
775 0 : info!(
776 0 : l0_deltas_selected = deltas_to_compact.len(),
777 0 : l0_deltas_total = level0_deltas.len(),
778 0 : "L0 compaction picker hit max delta layer size limit: {}",
779 : delta_size_limit
780 : );
781 0 : fully_compacted = false;
782 0 :
783 0 : // Proceed with compaction, but only a subset of L0s
784 0 : break;
785 1122 : }
786 : }
787 84 : let lsn_range = Range {
788 84 : start: deltas_to_compact
789 84 : .first()
790 84 : .unwrap()
791 84 : .layer_desc()
792 84 : .lsn_range
793 84 : .start,
794 84 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
795 84 : };
796 84 :
797 84 : info!(
798 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
799 0 : lsn_range.start,
800 0 : lsn_range.end,
801 0 : deltas_to_compact.len(),
802 0 : level0_deltas.len()
803 : );
804 :
805 1206 : for l in deltas_to_compact.iter() {
806 1206 : info!("compact includes {l}");
807 : }
808 :
809 : // We don't need the original list of layers anymore. Drop it so that
810 : // we don't accidentally use it later in the function.
811 84 : drop(level0_deltas);
812 84 :
813 84 : stats.read_lock_held_prerequisites_micros = stats
814 84 : .read_lock_held_spawn_blocking_startup_micros
815 84 : .till_now();
816 :
817 : // TODO: replace with streaming k-merge
818 84 : let all_keys = {
819 84 : let mut all_keys = Vec::new();
820 1206 : for l in deltas_to_compact.iter() {
821 1206 : if self.cancel.is_cancelled() {
822 0 : return Err(CompactionError::ShuttingDown);
823 1206 : }
824 7083 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
825 : }
826 : // The current stdlib sorting implementation is designed in a way where it is
827 : // particularly fast where the slice is made up of sorted sub-ranges.
828 13271396 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
829 84 : all_keys
830 84 : };
831 84 :
832 84 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
833 :
834 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
835 : //
836 : // A hole is a key range for which this compaction doesn't have any WAL records.
837 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
838 : // cover the hole, but actually don't contain any WAL records for that key range.
839 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
840 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
841 : //
842 : // The algorithm chooses holes as follows.
843 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
844 : // - Filter: min threshold on range length
845 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
846 : //
847 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
848 : #[derive(PartialEq, Eq)]
849 : struct Hole {
850 : key_range: Range<Key>,
851 : coverage_size: usize,
852 : }
853 84 : let holes: Vec<Hole> = {
854 : use std::cmp::Ordering;
855 : impl Ord for Hole {
856 0 : fn cmp(&self, other: &Self) -> Ordering {
857 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
858 0 : }
859 : }
860 : impl PartialOrd for Hole {
861 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
862 0 : Some(self.cmp(other))
863 0 : }
864 : }
865 84 : let max_holes = deltas_to_compact.len();
866 84 : let last_record_lsn = self.get_last_record_lsn();
867 84 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
868 84 : let min_hole_coverage_size = 3; // TODO: something more flexible?
869 84 : // min-heap (reserve space for one more element added before eviction)
870 84 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
871 84 : let mut prev: Option<Key> = None;
872 :
873 6192114 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
874 6192114 : if let Some(prev_key) = prev {
875 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
876 : // compaction is the gap between data key and metadata keys.
877 6192030 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
878 0 : && !Key::is_metadata_key(&prev_key)
879 : {
880 0 : let key_range = prev_key..next_key;
881 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
882 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
883 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
884 0 : // That is why it is better to measure size of hole as number of covering image layers.
885 0 : let coverage_size =
886 0 : layers.image_coverage(&key_range, last_record_lsn).len();
887 0 : if coverage_size >= min_hole_coverage_size {
888 0 : heap.push(Hole {
889 0 : key_range,
890 0 : coverage_size,
891 0 : });
892 0 : if heap.len() > max_holes {
893 0 : heap.pop(); // remove smallest hole
894 0 : }
895 0 : }
896 6192030 : }
897 84 : }
898 6192114 : prev = Some(next_key.next());
899 : }
900 84 : let mut holes = heap.into_vec();
901 84 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
902 84 : holes
903 84 : };
904 84 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
905 84 : drop_rlock(guard);
906 84 :
907 84 : if self.cancel.is_cancelled() {
908 0 : return Err(CompactionError::ShuttingDown);
909 84 : }
910 84 :
911 84 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
912 :
913 : // This iterator walks through all key-value pairs from all the layers
914 : // we're compacting, in key, LSN order.
915 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
916 : // then the Value::Image is ordered before Value::WalRecord.
917 84 : let mut all_values_iter = {
918 84 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
919 1206 : for l in deltas_to_compact.iter() {
920 1206 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
921 1206 : deltas.push(l);
922 : }
923 84 : MergeIterator::create(&deltas, &[], ctx)
924 84 : };
925 84 :
926 84 : // This iterator walks through all keys and is needed to calculate size used by each key
927 84 : let mut all_keys_iter = all_keys
928 84 : .iter()
929 6192114 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
930 6192030 : .coalesce(|mut prev, cur| {
931 6192030 : // Coalesce keys that belong to the same key pair.
932 6192030 : // This ensures that compaction doesn't put them
933 6192030 : // into different layer files.
934 6192030 : // Still limit this by the target file size,
935 6192030 : // so that we keep the size of the files in
936 6192030 : // check.
937 6192030 : if prev.0 == cur.0 && prev.2 < target_file_size {
938 120114 : prev.2 += cur.2;
939 120114 : Ok(prev)
940 : } else {
941 6071916 : Err((prev, cur))
942 : }
943 6192030 : });
944 84 :
945 84 : // Merge the contents of all the input delta layers into a new set
946 84 : // of delta layers, based on the current partitioning.
947 84 : //
948 84 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
949 84 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
950 84 : // would be too large. In that case, we also split on the LSN dimension.
951 84 : //
952 84 : // LSN
953 84 : // ^
954 84 : // |
955 84 : // | +-----------+ +--+--+--+--+
956 84 : // | | | | | | | |
957 84 : // | +-----------+ | | | | |
958 84 : // | | | | | | | |
959 84 : // | +-----------+ ==> | | | | |
960 84 : // | | | | | | | |
961 84 : // | +-----------+ | | | | |
962 84 : // | | | | | | | |
963 84 : // | +-----------+ +--+--+--+--+
964 84 : // |
965 84 : // +--------------> key
966 84 : //
967 84 : //
968 84 : // If one key (X) has a lot of page versions:
969 84 : //
970 84 : // LSN
971 84 : // ^
972 84 : // | (X)
973 84 : // | +-----------+ +--+--+--+--+
974 84 : // | | | | | | | |
975 84 : // | +-----------+ | | +--+ |
976 84 : // | | | | | | | |
977 84 : // | +-----------+ ==> | | | | |
978 84 : // | | | | | +--+ |
979 84 : // | +-----------+ | | | | |
980 84 : // | | | | | | | |
981 84 : // | +-----------+ +--+--+--+--+
982 84 : // |
983 84 : // +--------------> key
984 84 : // TODO: this actually divides the layers into fixed-size chunks, not
985 84 : // based on the partitioning.
986 84 : //
987 84 : // TODO: we should also opportunistically materialize and
988 84 : // garbage collect what we can.
989 84 : let mut new_layers = Vec::new();
990 84 : let mut prev_key: Option<Key> = None;
991 84 : let mut writer: Option<DeltaLayerWriter> = None;
992 84 : let mut key_values_total_size = 0u64;
993 84 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
994 84 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
995 84 : let mut next_hole = 0; // index of next hole in holes vector
996 84 :
997 84 : let mut keys = 0;
998 :
999 6192198 : while let Some((key, lsn, value)) = all_values_iter
1000 6192198 : .next()
1001 10248 : .await
1002 6192198 : .map_err(CompactionError::Other)?
1003 : {
1004 6192114 : keys += 1;
1005 6192114 :
1006 6192114 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1007 : // avoid hitting the cancellation token on every key. in benches, we end up
1008 : // shuffling an order of million keys per layer, this means we'll check it
1009 : // around tens of times per layer.
1010 0 : return Err(CompactionError::ShuttingDown);
1011 6192114 : }
1012 6192114 :
1013 6192114 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
1014 6192114 : // We need to check key boundaries once we reach next key or end of layer with the same key
1015 6192114 : if !same_key || lsn == dup_end_lsn {
1016 6072000 : let mut next_key_size = 0u64;
1017 6072000 : let is_dup_layer = dup_end_lsn.is_valid();
1018 6072000 : dup_start_lsn = Lsn::INVALID;
1019 6072000 : if !same_key {
1020 6072000 : dup_end_lsn = Lsn::INVALID;
1021 6072000 : }
1022 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1023 6072000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1024 6072000 : next_key_size = next_size;
1025 6072000 : if key != next_key {
1026 6071916 : if dup_end_lsn.is_valid() {
1027 0 : // We are writting segment with duplicates:
1028 0 : // place all remaining values of this key in separate segment
1029 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1030 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1031 6071916 : }
1032 6071916 : break;
1033 84 : }
1034 84 : key_values_total_size += next_size;
1035 84 : // Check if it is time to split segment: if total keys size is larger than target file size.
1036 84 : // We need to avoid generation of empty segments if next_size > target_file_size.
1037 84 : if key_values_total_size > target_file_size && lsn != next_lsn {
1038 : // Split key between multiple layers: such layer can contain only single key
1039 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1040 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1041 : } else {
1042 0 : lsn // start with the first LSN for this key
1043 : };
1044 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1045 0 : break;
1046 84 : }
1047 : }
1048 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1049 6072000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1050 0 : dup_start_lsn = dup_end_lsn;
1051 0 : dup_end_lsn = lsn_range.end;
1052 6072000 : }
1053 6072000 : if writer.is_some() {
1054 6071916 : let written_size = writer.as_mut().unwrap().size();
1055 6071916 : let contains_hole =
1056 6071916 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1057 : // check if key cause layer overflow or contains hole...
1058 6071916 : if is_dup_layer
1059 6071916 : || dup_end_lsn.is_valid()
1060 6071916 : || written_size + key_values_total_size > target_file_size
1061 6071076 : || contains_hole
1062 : {
1063 : // ... if so, flush previous layer and prepare to write new one
1064 840 : let (desc, path) = writer
1065 840 : .take()
1066 840 : .unwrap()
1067 840 : .finish(prev_key.unwrap().next(), ctx)
1068 2130 : .await
1069 840 : .map_err(CompactionError::Other)?;
1070 840 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1071 840 : .map_err(CompactionError::Other)?;
1072 :
1073 840 : new_layers.push(new_delta);
1074 840 : writer = None;
1075 840 :
1076 840 : if contains_hole {
1077 0 : // skip hole
1078 0 : next_hole += 1;
1079 840 : }
1080 6071076 : }
1081 84 : }
1082 : // Remember size of key value because at next iteration we will access next item
1083 6072000 : key_values_total_size = next_key_size;
1084 120114 : }
1085 6192114 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1086 0 : Err(CompactionError::Other(anyhow::anyhow!(
1087 0 : "failpoint delta-layer-writer-fail-before-finish"
1088 0 : )))
1089 6192114 : });
1090 :
1091 6192114 : if !self.shard_identity.is_key_disposable(&key) {
1092 6192114 : if writer.is_none() {
1093 924 : if self.cancel.is_cancelled() {
1094 : // to be somewhat responsive to cancellation, check for each new layer
1095 0 : return Err(CompactionError::ShuttingDown);
1096 924 : }
1097 : // Create writer if not initiaized yet
1098 924 : writer = Some(
1099 : DeltaLayerWriter::new(
1100 924 : self.conf,
1101 924 : self.timeline_id,
1102 924 : self.tenant_shard_id,
1103 924 : key,
1104 924 : if dup_end_lsn.is_valid() {
1105 : // this is a layer containing slice of values of the same key
1106 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1107 0 : dup_start_lsn..dup_end_lsn
1108 : } else {
1109 924 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1110 924 : lsn_range.clone()
1111 : },
1112 924 : ctx,
1113 : )
1114 462 : .await
1115 924 : .map_err(CompactionError::Other)?,
1116 : );
1117 :
1118 924 : keys = 0;
1119 6191190 : }
1120 :
1121 6192114 : writer
1122 6192114 : .as_mut()
1123 6192114 : .unwrap()
1124 6192114 : .put_value(key, lsn, value, ctx)
1125 3676 : .await
1126 6192114 : .map_err(CompactionError::Other)?;
1127 : } else {
1128 0 : debug!(
1129 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1130 0 : key,
1131 0 : self.shard_identity.get_shard_number(&key)
1132 : );
1133 : }
1134 :
1135 6192114 : if !new_layers.is_empty() {
1136 59358 : fail_point!("after-timeline-compacted-first-L1");
1137 6132756 : }
1138 :
1139 6192114 : prev_key = Some(key);
1140 : }
1141 84 : if let Some(writer) = writer {
1142 84 : let (desc, path) = writer
1143 84 : .finish(prev_key.unwrap().next(), ctx)
1144 5969 : .await
1145 84 : .map_err(CompactionError::Other)?;
1146 84 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1147 84 : .map_err(CompactionError::Other)?;
1148 84 : new_layers.push(new_delta);
1149 0 : }
1150 :
1151 : // Sync layers
1152 84 : if !new_layers.is_empty() {
1153 : // Print a warning if the created layer is larger than double the target size
1154 : // Add two pages for potential overhead. This should in theory be already
1155 : // accounted for in the target calculation, but for very small targets,
1156 : // we still might easily hit the limit otherwise.
1157 84 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1158 924 : for layer in new_layers.iter() {
1159 924 : if layer.layer_desc().file_size > warn_limit {
1160 0 : warn!(
1161 : %layer,
1162 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1163 : );
1164 924 : }
1165 : }
1166 :
1167 : // The writer.finish() above already did the fsync of the inodes.
1168 : // We just need to fsync the directory in which these inodes are linked,
1169 : // which we know to be the timeline directory.
1170 : //
1171 : // We use fatal_err() below because the after writer.finish() returns with success,
1172 : // the in-memory state of the filesystem already has the layer file in its final place,
1173 : // and subsequent pageserver code could think it's durable while it really isn't.
1174 84 : let timeline_dir = VirtualFile::open(
1175 84 : &self
1176 84 : .conf
1177 84 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1178 84 : ctx,
1179 84 : )
1180 42 : .await
1181 84 : .fatal_err("VirtualFile::open for timeline dir fsync");
1182 84 : timeline_dir
1183 84 : .sync_all()
1184 42 : .await
1185 84 : .fatal_err("VirtualFile::sync_all timeline dir");
1186 0 : }
1187 :
1188 84 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1189 84 : stats.new_deltas_count = Some(new_layers.len());
1190 924 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1191 84 :
1192 84 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1193 84 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1194 : {
1195 84 : Ok(stats_json) => {
1196 84 : info!(
1197 0 : stats_json = stats_json.as_str(),
1198 0 : "compact_level0_phase1 stats available"
1199 : )
1200 : }
1201 0 : Err(e) => {
1202 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1203 : }
1204 : }
1205 :
1206 : // Without this, rustc complains about deltas_to_compact still
1207 : // being borrowed when we `.into_iter()` below.
1208 84 : drop(all_values_iter);
1209 84 :
1210 84 : Ok(CompactLevel0Phase1Result {
1211 84 : new_layers,
1212 84 : deltas_to_compact: deltas_to_compact
1213 84 : .into_iter()
1214 1206 : .map(|x| x.drop_eviction_guard())
1215 84 : .collect::<Vec<_>>(),
1216 84 : fully_compacted,
1217 84 : })
1218 1092 : }
1219 : }
1220 :
1221 : #[derive(Default)]
1222 : struct CompactLevel0Phase1Result {
1223 : new_layers: Vec<ResidentLayer>,
1224 : deltas_to_compact: Vec<Layer>,
1225 : // Whether we have included all L0 layers, or selected only part of them due to the
1226 : // L0 compaction size limit.
1227 : fully_compacted: bool,
1228 : }
1229 :
1230 : #[derive(Default)]
1231 : struct CompactLevel0Phase1StatsBuilder {
1232 : version: Option<u64>,
1233 : tenant_id: Option<TenantShardId>,
1234 : timeline_id: Option<TimelineId>,
1235 : read_lock_acquisition_micros: DurationRecorder,
1236 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1237 : read_lock_held_key_sort_micros: DurationRecorder,
1238 : read_lock_held_prerequisites_micros: DurationRecorder,
1239 : read_lock_held_compute_holes_micros: DurationRecorder,
1240 : read_lock_drop_micros: DurationRecorder,
1241 : write_layer_files_micros: DurationRecorder,
1242 : level0_deltas_count: Option<usize>,
1243 : new_deltas_count: Option<usize>,
1244 : new_deltas_size: Option<u64>,
1245 : }
1246 :
1247 : #[derive(serde::Serialize)]
1248 : struct CompactLevel0Phase1Stats {
1249 : version: u64,
1250 : tenant_id: TenantShardId,
1251 : timeline_id: TimelineId,
1252 : read_lock_acquisition_micros: RecordedDuration,
1253 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1254 : read_lock_held_key_sort_micros: RecordedDuration,
1255 : read_lock_held_prerequisites_micros: RecordedDuration,
1256 : read_lock_held_compute_holes_micros: RecordedDuration,
1257 : read_lock_drop_micros: RecordedDuration,
1258 : write_layer_files_micros: RecordedDuration,
1259 : level0_deltas_count: usize,
1260 : new_deltas_count: usize,
1261 : new_deltas_size: u64,
1262 : }
1263 :
1264 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1265 : type Error = anyhow::Error;
1266 :
1267 84 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1268 84 : Ok(Self {
1269 84 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1270 84 : tenant_id: value
1271 84 : .tenant_id
1272 84 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1273 84 : timeline_id: value
1274 84 : .timeline_id
1275 84 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1276 84 : read_lock_acquisition_micros: value
1277 84 : .read_lock_acquisition_micros
1278 84 : .into_recorded()
1279 84 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1280 84 : read_lock_held_spawn_blocking_startup_micros: value
1281 84 : .read_lock_held_spawn_blocking_startup_micros
1282 84 : .into_recorded()
1283 84 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1284 84 : read_lock_held_key_sort_micros: value
1285 84 : .read_lock_held_key_sort_micros
1286 84 : .into_recorded()
1287 84 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1288 84 : read_lock_held_prerequisites_micros: value
1289 84 : .read_lock_held_prerequisites_micros
1290 84 : .into_recorded()
1291 84 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1292 84 : read_lock_held_compute_holes_micros: value
1293 84 : .read_lock_held_compute_holes_micros
1294 84 : .into_recorded()
1295 84 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1296 84 : read_lock_drop_micros: value
1297 84 : .read_lock_drop_micros
1298 84 : .into_recorded()
1299 84 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1300 84 : write_layer_files_micros: value
1301 84 : .write_layer_files_micros
1302 84 : .into_recorded()
1303 84 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1304 84 : level0_deltas_count: value
1305 84 : .level0_deltas_count
1306 84 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1307 84 : new_deltas_count: value
1308 84 : .new_deltas_count
1309 84 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1310 84 : new_deltas_size: value
1311 84 : .new_deltas_size
1312 84 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1313 : })
1314 84 : }
1315 : }
1316 :
1317 : impl Timeline {
1318 : /// Entry point for new tiered compaction algorithm.
1319 : ///
1320 : /// All the real work is in the implementation in the pageserver_compaction
1321 : /// crate. The code here would apply to any algorithm implemented by the
1322 : /// same interface, but tiered is the only one at the moment.
1323 : ///
1324 : /// TODO: cancellation
1325 0 : pub(crate) async fn compact_tiered(
1326 0 : self: &Arc<Self>,
1327 0 : _cancel: &CancellationToken,
1328 0 : ctx: &RequestContext,
1329 0 : ) -> Result<(), CompactionError> {
1330 0 : let fanout = self.get_compaction_threshold() as u64;
1331 0 : let target_file_size = self.get_checkpoint_distance();
1332 :
1333 : // Find the top of the historical layers
1334 0 : let end_lsn = {
1335 0 : let guard = self.layers.read().await;
1336 0 : let layers = guard.layer_map()?;
1337 :
1338 0 : let l0_deltas = layers.level0_deltas();
1339 0 :
1340 0 : // As an optimization, if we find that there are too few L0 layers,
1341 0 : // bail out early. We know that the compaction algorithm would do
1342 0 : // nothing in that case.
1343 0 : if l0_deltas.len() < fanout as usize {
1344 : // doesn't need compacting
1345 0 : return Ok(());
1346 0 : }
1347 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1348 0 : };
1349 0 :
1350 0 : // Is the timeline being deleted?
1351 0 : if self.is_stopping() {
1352 0 : trace!("Dropping out of compaction on timeline shutdown");
1353 0 : return Err(CompactionError::ShuttingDown);
1354 0 : }
1355 :
1356 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1357 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1358 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1359 0 :
1360 0 : pageserver_compaction::compact_tiered::compact_tiered(
1361 0 : &mut adaptor,
1362 0 : end_lsn,
1363 0 : target_file_size,
1364 0 : fanout,
1365 0 : ctx,
1366 0 : )
1367 0 : .await
1368 : // TODO: compact_tiered needs to return CompactionError
1369 0 : .map_err(CompactionError::Other)?;
1370 :
1371 0 : adaptor.flush_updates().await?;
1372 0 : Ok(())
1373 0 : }
1374 :
1375 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1376 : ///
1377 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1378 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1379 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1380 : ///
1381 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1382 : ///
1383 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1384 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1385 : ///
1386 : /// The function will produce:
1387 : ///
1388 : /// ```plain
1389 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1390 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1391 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1392 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1393 : /// ```
1394 : ///
1395 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1396 1290 : pub(crate) async fn generate_key_retention(
1397 1290 : self: &Arc<Timeline>,
1398 1290 : key: Key,
1399 1290 : full_history: &[(Key, Lsn, Value)],
1400 1290 : horizon: Lsn,
1401 1290 : retain_lsn_below_horizon: &[Lsn],
1402 1290 : delta_threshold_cnt: usize,
1403 1290 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1404 1290 : ) -> anyhow::Result<KeyHistoryRetention> {
1405 1290 : // Pre-checks for the invariants
1406 1290 : if cfg!(debug_assertions) {
1407 3120 : for (log_key, _, _) in full_history {
1408 1830 : assert_eq!(log_key, &key, "mismatched key");
1409 : }
1410 1290 : for i in 1..full_history.len() {
1411 540 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1412 540 : if full_history[i - 1].1 == full_history[i].1 {
1413 0 : assert!(
1414 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1415 0 : "unordered delta/image, or duplicated delta"
1416 : );
1417 540 : }
1418 : }
1419 : // There was an assertion for no base image that checks if the first
1420 : // record in the history is `will_init` before, but it was removed.
1421 : // This is explained in the test cases for generate_key_retention.
1422 : // Search "incomplete history" for more information.
1423 3000 : for lsn in retain_lsn_below_horizon {
1424 1710 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1425 : }
1426 1290 : for i in 1..retain_lsn_below_horizon.len() {
1427 834 : assert!(
1428 834 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1429 0 : "unordered LSN"
1430 : );
1431 : }
1432 0 : }
1433 1290 : let has_ancestor = base_img_from_ancestor.is_some();
1434 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1435 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1436 1290 : let (mut split_history, lsn_split_points) = {
1437 1290 : let mut split_history = Vec::new();
1438 1290 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1439 1290 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1440 3000 : for lsn in retain_lsn_below_horizon {
1441 1710 : lsn_split_points.push(*lsn);
1442 1710 : }
1443 1290 : lsn_split_points.push(horizon);
1444 1290 : let mut current_idx = 0;
1445 3120 : for item @ (_, lsn, _) in full_history {
1446 2316 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1447 486 : current_idx += 1;
1448 486 : }
1449 1830 : split_history[current_idx].push(item);
1450 : }
1451 1290 : (split_history, lsn_split_points)
1452 : };
1453 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1454 5580 : for split_for_lsn in &mut split_history {
1455 4290 : let mut prev_lsn = None;
1456 4290 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1457 4290 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1458 1830 : if let Some(prev_lsn) = &prev_lsn {
1459 198 : if *prev_lsn == lsn {
1460 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1461 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1462 : // drop this delta and keep the image.
1463 : //
1464 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1465 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1466 : // dropped.
1467 : //
1468 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1469 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1470 0 : continue;
1471 198 : }
1472 1632 : }
1473 1830 : prev_lsn = Some(lsn);
1474 1830 : new_split_for_lsn.push(record);
1475 : }
1476 4290 : *split_for_lsn = new_split_for_lsn;
1477 : }
1478 : // Step 3: generate images when necessary
1479 1290 : let mut retention = Vec::with_capacity(split_history.len());
1480 1290 : let mut records_since_last_image = 0;
1481 1290 : let batch_cnt = split_history.len();
1482 1290 : assert!(
1483 1290 : batch_cnt >= 2,
1484 0 : "should have at least below + above horizon batches"
1485 : );
1486 1290 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1487 1290 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1488 54 : replay_history.push((key, lsn, Value::Image(img)));
1489 1236 : }
1490 :
1491 : /// Generate debug information for the replay history
1492 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1493 : use std::fmt::Write;
1494 0 : let mut output = String::new();
1495 0 : if let Some((key, _, _)) = replay_history.first() {
1496 0 : write!(output, "key={} ", key).unwrap();
1497 0 : let mut cnt = 0;
1498 0 : for (_, lsn, val) in replay_history {
1499 0 : if val.is_image() {
1500 0 : write!(output, "i@{} ", lsn).unwrap();
1501 0 : } else if val.will_init() {
1502 0 : write!(output, "di@{} ", lsn).unwrap();
1503 0 : } else {
1504 0 : write!(output, "d@{} ", lsn).unwrap();
1505 0 : }
1506 0 : cnt += 1;
1507 0 : if cnt >= 128 {
1508 0 : write!(output, "... and more").unwrap();
1509 0 : break;
1510 0 : }
1511 : }
1512 0 : } else {
1513 0 : write!(output, "<no history>").unwrap();
1514 0 : }
1515 0 : output
1516 0 : }
1517 :
1518 0 : fn generate_debug_trace(
1519 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1520 0 : full_history: &[(Key, Lsn, Value)],
1521 0 : lsns: &[Lsn],
1522 0 : horizon: Lsn,
1523 0 : ) -> String {
1524 : use std::fmt::Write;
1525 0 : let mut output = String::new();
1526 0 : if let Some(replay_history) = replay_history {
1527 0 : writeln!(
1528 0 : output,
1529 0 : "replay_history: {}",
1530 0 : generate_history_trace(replay_history)
1531 0 : )
1532 0 : .unwrap();
1533 0 : } else {
1534 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1535 0 : }
1536 0 : writeln!(
1537 0 : output,
1538 0 : "full_history: {}",
1539 0 : generate_history_trace(full_history)
1540 0 : )
1541 0 : .unwrap();
1542 0 : writeln!(
1543 0 : output,
1544 0 : "when processing: [{}] horizon={}",
1545 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1546 0 : horizon
1547 0 : )
1548 0 : .unwrap();
1549 0 : output
1550 0 : }
1551 :
1552 4290 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1553 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1554 4290 : records_since_last_image += split_for_lsn.len();
1555 4290 : let generate_image = if i == 0 && !has_ancestor {
1556 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1557 1236 : true
1558 3054 : } else if i == batch_cnt - 1 {
1559 : // Do not generate images for the last batch (above horizon)
1560 1290 : false
1561 1764 : } else if records_since_last_image >= delta_threshold_cnt {
1562 : // Generate images when there are too many records
1563 18 : true
1564 : } else {
1565 1746 : false
1566 : };
1567 4290 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1568 : // Only retain the items after the last image record
1569 5274 : for idx in (0..replay_history.len()).rev() {
1570 5274 : if replay_history[idx].2.will_init() {
1571 4290 : replay_history = replay_history[idx..].to_vec();
1572 4290 : break;
1573 984 : }
1574 : }
1575 4290 : if let Some((_, _, val)) = replay_history.first() {
1576 4290 : if !val.will_init() {
1577 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1578 0 : || {
1579 0 : generate_debug_trace(
1580 0 : Some(&replay_history),
1581 0 : full_history,
1582 0 : retain_lsn_below_horizon,
1583 0 : horizon,
1584 0 : )
1585 0 : },
1586 0 : );
1587 4290 : }
1588 0 : }
1589 4290 : if generate_image && records_since_last_image > 0 {
1590 1254 : records_since_last_image = 0;
1591 1254 : let replay_history_for_debug = if cfg!(debug_assertions) {
1592 1254 : Some(replay_history.clone())
1593 : } else {
1594 0 : None
1595 : };
1596 1254 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1597 1254 : let history = std::mem::take(&mut replay_history);
1598 1254 : let mut img = None;
1599 1254 : let mut records = Vec::with_capacity(history.len());
1600 1254 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1601 1254 : img = Some((*lsn, val.clone()));
1602 1254 : for (_, lsn, val) in history.into_iter().skip(1) {
1603 102 : let Value::WalRecord(rec) = val else {
1604 0 : return Err(anyhow::anyhow!(
1605 0 : "invalid record, first record is image, expect walrecords"
1606 0 : ))
1607 0 : .with_context(|| {
1608 0 : generate_debug_trace(
1609 0 : replay_history_for_debug_ref,
1610 0 : full_history,
1611 0 : retain_lsn_below_horizon,
1612 0 : horizon,
1613 0 : )
1614 0 : });
1615 : };
1616 102 : records.push((lsn, rec));
1617 : }
1618 : } else {
1619 0 : for (_, lsn, val) in history.into_iter() {
1620 0 : let Value::WalRecord(rec) = val else {
1621 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1622 0 : .with_context(|| generate_debug_trace(
1623 0 : replay_history_for_debug_ref,
1624 0 : full_history,
1625 0 : retain_lsn_below_horizon,
1626 0 : horizon,
1627 0 : ));
1628 : };
1629 0 : records.push((lsn, rec));
1630 : }
1631 : }
1632 1254 : records.reverse();
1633 1254 : let state = ValueReconstructState { img, records };
1634 1254 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1635 1254 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1636 1254 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1637 1254 : retention.push(vec![(request_lsn, Value::Image(img))]);
1638 3036 : } else {
1639 3036 : let deltas = split_for_lsn
1640 3036 : .iter()
1641 3036 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1642 3036 : .collect_vec();
1643 3036 : retention.push(deltas);
1644 3036 : }
1645 : }
1646 1290 : let mut result = Vec::with_capacity(retention.len());
1647 1290 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1648 4290 : for (idx, logs) in retention.into_iter().enumerate() {
1649 4290 : if idx == lsn_split_points.len() {
1650 1290 : return Ok(KeyHistoryRetention {
1651 1290 : below_horizon: result,
1652 1290 : above_horizon: KeyLogAtLsn(logs),
1653 1290 : });
1654 3000 : } else {
1655 3000 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1656 3000 : }
1657 : }
1658 0 : unreachable!("key retention is empty")
1659 1290 : }
1660 :
1661 : /// An experimental compaction building block that combines compaction with garbage collection.
1662 : ///
1663 : /// The current implementation picks all delta + image layers that are below or intersecting with
1664 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1665 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1666 : /// and create delta layers with all deltas >= gc horizon.
1667 78 : pub(crate) async fn compact_with_gc(
1668 78 : self: &Arc<Self>,
1669 78 : cancel: &CancellationToken,
1670 78 : flags: EnumSet<CompactFlags>,
1671 78 : ctx: &RequestContext,
1672 78 : ) -> anyhow::Result<()> {
1673 : use std::collections::BTreeSet;
1674 :
1675 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1676 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1677 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1678 :
1679 78 : let gc_lock = async {
1680 78 : tokio::select! {
1681 78 : guard = self.gc_lock.lock() => Ok(guard),
1682 : // TODO: refactor to CompactionError to correctly pass cancelled error
1683 78 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1684 : }
1685 78 : };
1686 :
1687 78 : let gc_lock = crate::timed(
1688 78 : gc_lock,
1689 78 : "acquires gc lock",
1690 78 : std::time::Duration::from_secs(5),
1691 78 : )
1692 3 : .await?;
1693 :
1694 78 : let dry_run = flags.contains(CompactFlags::DryRun);
1695 78 :
1696 78 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
1697 :
1698 78 : scopeguard::defer! {
1699 78 : info!("done enhanced gc bottom-most compaction");
1700 78 : };
1701 78 :
1702 78 : let mut stat = CompactionStatistics::default();
1703 :
1704 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1705 : // The layer selection has the following properties:
1706 : // 1. If a layer is in the selection, all layers below it are in the selection.
1707 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1708 78 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1709 78 : let guard = self.layers.read().await;
1710 78 : let layers = guard.layer_map()?;
1711 78 : let gc_info = self.gc_info.read().unwrap();
1712 78 : let mut retain_lsns_below_horizon = Vec::new();
1713 78 : let gc_cutoff = gc_info.cutoffs.select_min();
1714 102 : for (lsn, _timeline_id) in &gc_info.retain_lsns {
1715 102 : if lsn < &gc_cutoff {
1716 102 : retain_lsns_below_horizon.push(*lsn);
1717 102 : }
1718 : }
1719 78 : for lsn in gc_info.leases.keys() {
1720 0 : if lsn < &gc_cutoff {
1721 0 : retain_lsns_below_horizon.push(*lsn);
1722 0 : }
1723 : }
1724 78 : let mut selected_layers = Vec::new();
1725 78 : drop(gc_info);
1726 : // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
1727 78 : let Some(max_layer_lsn) = layers
1728 78 : .iter_historic_layers()
1729 300 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
1730 246 : .map(|desc| desc.get_lsn_range().end)
1731 78 : .max()
1732 : else {
1733 0 : info!("no layers to compact with gc");
1734 0 : return Ok(());
1735 : };
1736 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
1737 : // layers to compact.
1738 300 : for desc in layers.iter_historic_layers() {
1739 300 : if desc.get_lsn_range().end <= max_layer_lsn {
1740 246 : selected_layers.push(guard.get_from_desc(&desc));
1741 246 : }
1742 : }
1743 78 : if selected_layers.is_empty() {
1744 0 : info!("no layers to compact with gc");
1745 0 : return Ok(());
1746 78 : }
1747 78 : retain_lsns_below_horizon.sort();
1748 78 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1749 : };
1750 78 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1751 6 : Lsn(self.ancestor_lsn.0 + 1)
1752 : } else {
1753 72 : let res = retain_lsns_below_horizon
1754 72 : .first()
1755 72 : .copied()
1756 72 : .unwrap_or(gc_cutoff);
1757 72 : if cfg!(debug_assertions) {
1758 72 : assert_eq!(
1759 72 : res,
1760 72 : retain_lsns_below_horizon
1761 72 : .iter()
1762 72 : .min()
1763 72 : .copied()
1764 72 : .unwrap_or(gc_cutoff)
1765 72 : );
1766 0 : }
1767 72 : res
1768 : };
1769 78 : info!(
1770 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1771 0 : layer_selection.len(),
1772 : gc_cutoff,
1773 : lowest_retain_lsn
1774 : );
1775 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1776 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
1777 78 : let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
1778 324 : for layer in &layer_selection {
1779 246 : let desc = layer.layer_desc();
1780 246 : if desc.is_delta() {
1781 : // ignore single-key layer files
1782 138 : if desc.key_range.start.next() != desc.key_range.end {
1783 102 : let lsn_range = &desc.lsn_range;
1784 102 : lsn_split_point.insert(lsn_range.start);
1785 102 : lsn_split_point.insert(lsn_range.end);
1786 102 : }
1787 138 : stat.visit_delta_layer(desc.file_size());
1788 108 : } else {
1789 108 : stat.visit_image_layer(desc.file_size());
1790 108 : }
1791 : }
1792 78 : let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
1793 78 : .iter()
1794 246 : .map(|layer| layer.layer_desc().layer_name())
1795 78 : .collect_vec();
1796 78 : if let Some(err) = check_valid_layermap(&layer_names) {
1797 0 : bail!("cannot run gc-compaction because {}", err);
1798 78 : }
1799 78 : // The maximum LSN we are processing in this compaction loop
1800 78 : let end_lsn = layer_selection
1801 78 : .iter()
1802 246 : .map(|l| l.layer_desc().lsn_range.end)
1803 78 : .max()
1804 78 : .unwrap();
1805 78 : // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
1806 78 : // as an L0 layer.
1807 78 : let mut delta_layers = Vec::new();
1808 78 : let mut image_layers = Vec::new();
1809 78 : let mut downloaded_layers = Vec::new();
1810 324 : for layer in &layer_selection {
1811 246 : let resident_layer = layer.download_and_keep_resident().await?;
1812 246 : downloaded_layers.push(resident_layer);
1813 : }
1814 324 : for resident_layer in &downloaded_layers {
1815 246 : if resident_layer.layer_desc().is_delta() {
1816 138 : let layer = resident_layer.get_as_delta(ctx).await?;
1817 138 : delta_layers.push(layer);
1818 : } else {
1819 108 : let layer = resident_layer.get_as_image(ctx).await?;
1820 108 : image_layers.push(layer);
1821 : }
1822 : }
1823 78 : let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
1824 78 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1825 78 : // Data of the same key.
1826 78 : let mut accumulated_values = Vec::new();
1827 78 : let mut last_key: Option<Key> = None;
1828 :
1829 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
1830 : // when some condition meet.
1831 78 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
1832 : Some(
1833 72 : SplitImageLayerWriter::new(
1834 72 : self.conf,
1835 72 : self.timeline_id,
1836 72 : self.tenant_shard_id,
1837 72 : Key::MIN,
1838 72 : lowest_retain_lsn,
1839 72 : self.get_compaction_target_size(),
1840 72 : ctx,
1841 72 : )
1842 36 : .await?,
1843 : )
1844 : } else {
1845 6 : None
1846 : };
1847 :
1848 78 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
1849 78 : self.conf,
1850 78 : self.timeline_id,
1851 78 : self.tenant_shard_id,
1852 78 : lowest_retain_lsn..end_lsn,
1853 78 : self.get_compaction_target_size(),
1854 78 : )
1855 0 : .await?;
1856 :
1857 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
1858 : ///
1859 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
1860 : /// is needed for reconstruction. This should be fixed in the future.
1861 : ///
1862 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
1863 : /// images.
1864 1266 : async fn get_ancestor_image(
1865 1266 : tline: &Arc<Timeline>,
1866 1266 : key: Key,
1867 1266 : ctx: &RequestContext,
1868 1266 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
1869 1266 : if tline.ancestor_timeline.is_none() {
1870 1224 : return Ok(None);
1871 42 : };
1872 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
1873 : // as much existing code as possible.
1874 42 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
1875 42 : Ok(Some((key, tline.ancestor_lsn, img)))
1876 1266 : }
1877 :
1878 : // Actually, we can decide not to write to the image layer at all at this point because
1879 : // the key and LSN range are determined. However, to keep things simple here, we still
1880 : // create this writer, and discard the writer in the end.
1881 :
1882 1758 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1883 1680 : if cancel.is_cancelled() {
1884 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
1885 1680 : }
1886 1680 : match val {
1887 1260 : Value::Image(_) => stat.visit_image_key(&val),
1888 420 : Value::WalRecord(_) => stat.visit_wal_key(&val),
1889 : }
1890 1680 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1891 492 : if last_key.is_none() {
1892 78 : last_key = Some(key);
1893 414 : }
1894 492 : accumulated_values.push((key, lsn, val));
1895 : } else {
1896 1188 : let last_key = last_key.as_mut().unwrap();
1897 1188 : stat.on_unique_key_visited();
1898 1188 : let retention = self
1899 1188 : .generate_key_retention(
1900 1188 : *last_key,
1901 1188 : &accumulated_values,
1902 1188 : gc_cutoff,
1903 1188 : &retain_lsns_below_horizon,
1904 1188 : COMPACTION_DELTA_THRESHOLD,
1905 1188 : get_ancestor_image(self, *last_key, ctx).await?,
1906 : )
1907 0 : .await?;
1908 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1909 1188 : retention
1910 1188 : .pipe_to(
1911 1188 : *last_key,
1912 1188 : self,
1913 1188 : &mut delta_layer_writer,
1914 1188 : image_layer_writer.as_mut(),
1915 1188 : &mut stat,
1916 1188 : dry_run,
1917 1188 : ctx,
1918 1188 : )
1919 1203 : .await?;
1920 1188 : accumulated_values.clear();
1921 1188 : *last_key = key;
1922 1188 : accumulated_values.push((key, lsn, val));
1923 : }
1924 : }
1925 :
1926 78 : let last_key = last_key.expect("no keys produced during compaction");
1927 78 : // TODO: move this part to the loop body
1928 78 : stat.on_unique_key_visited();
1929 78 : let retention = self
1930 78 : .generate_key_retention(
1931 78 : last_key,
1932 78 : &accumulated_values,
1933 78 : gc_cutoff,
1934 78 : &retain_lsns_below_horizon,
1935 78 : COMPACTION_DELTA_THRESHOLD,
1936 78 : get_ancestor_image(self, last_key, ctx).await?,
1937 : )
1938 0 : .await?;
1939 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1940 78 : retention
1941 78 : .pipe_to(
1942 78 : last_key,
1943 78 : self,
1944 78 : &mut delta_layer_writer,
1945 78 : image_layer_writer.as_mut(),
1946 78 : &mut stat,
1947 78 : dry_run,
1948 78 : ctx,
1949 78 : )
1950 72 : .await?;
1951 :
1952 114 : let discard = |key: &PersistentLayerKey| {
1953 114 : let key = key.clone();
1954 114 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
1955 114 : };
1956 :
1957 78 : let produced_image_layers = if let Some(writer) = image_layer_writer {
1958 72 : if !dry_run {
1959 60 : writer
1960 60 : .finish_with_discard_fn(self, ctx, Key::MAX, discard)
1961 72 : .await?
1962 : } else {
1963 12 : let (layers, _) = writer.take()?;
1964 12 : assert!(layers.is_empty(), "image layers produced in dry run mode?");
1965 12 : Vec::new()
1966 : }
1967 : } else {
1968 6 : Vec::new()
1969 : };
1970 :
1971 78 : let produced_delta_layers = if !dry_run {
1972 66 : delta_layer_writer
1973 66 : .finish_with_discard_fn(self, ctx, discard)
1974 78 : .await?
1975 : } else {
1976 12 : let (layers, _) = delta_layer_writer.take()?;
1977 12 : assert!(layers.is_empty(), "delta layers produced in dry run mode?");
1978 12 : Vec::new()
1979 : };
1980 :
1981 78 : let mut compact_to = Vec::new();
1982 78 : let mut keep_layers = HashSet::new();
1983 78 : let produced_delta_layers_len = produced_delta_layers.len();
1984 78 : let produced_image_layers_len = produced_image_layers.len();
1985 132 : for action in produced_delta_layers {
1986 54 : match action {
1987 30 : SplitWriterResult::Produced(layer) => {
1988 30 : stat.produce_delta_layer(layer.layer_desc().file_size());
1989 30 : compact_to.push(layer);
1990 30 : }
1991 24 : SplitWriterResult::Discarded(l) => {
1992 24 : keep_layers.insert(l);
1993 24 : stat.discard_delta_layer();
1994 24 : }
1995 : }
1996 : }
1997 138 : for action in produced_image_layers {
1998 60 : match action {
1999 36 : SplitWriterResult::Produced(layer) => {
2000 36 : stat.produce_image_layer(layer.layer_desc().file_size());
2001 36 : compact_to.push(layer);
2002 36 : }
2003 24 : SplitWriterResult::Discarded(l) => {
2004 24 : keep_layers.insert(l);
2005 24 : stat.discard_image_layer();
2006 24 : }
2007 : }
2008 : }
2009 78 : let mut layer_selection = layer_selection;
2010 246 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2011 78 :
2012 78 : info!(
2013 0 : "gc-compaction statistics: {}",
2014 0 : serde_json::to_string(&stat)?
2015 : );
2016 :
2017 78 : if dry_run {
2018 12 : return Ok(());
2019 66 : }
2020 66 :
2021 66 : info!(
2022 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2023 0 : produced_delta_layers_len,
2024 0 : produced_image_layers_len,
2025 0 : layer_selection.len()
2026 : );
2027 :
2028 : // Step 3: Place back to the layer map.
2029 : {
2030 66 : let mut guard = self.layers.write().await;
2031 66 : guard
2032 66 : .open_mut()?
2033 66 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2034 66 : };
2035 66 : self.remote_client
2036 66 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2037 :
2038 66 : drop(gc_lock);
2039 66 :
2040 66 : Ok(())
2041 78 : }
2042 : }
2043 :
2044 : struct TimelineAdaptor {
2045 : timeline: Arc<Timeline>,
2046 :
2047 : keyspace: (Lsn, KeySpace),
2048 :
2049 : new_deltas: Vec<ResidentLayer>,
2050 : new_images: Vec<ResidentLayer>,
2051 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2052 : }
2053 :
2054 : impl TimelineAdaptor {
2055 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2056 0 : Self {
2057 0 : timeline: timeline.clone(),
2058 0 : keyspace,
2059 0 : new_images: Vec::new(),
2060 0 : new_deltas: Vec::new(),
2061 0 : layers_to_delete: Vec::new(),
2062 0 : }
2063 0 : }
2064 :
2065 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2066 0 : let layers_to_delete = {
2067 0 : let guard = self.timeline.layers.read().await;
2068 0 : self.layers_to_delete
2069 0 : .iter()
2070 0 : .map(|x| guard.get_from_desc(x))
2071 0 : .collect::<Vec<Layer>>()
2072 0 : };
2073 0 : self.timeline
2074 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2075 0 : .await?;
2076 :
2077 0 : self.timeline
2078 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2079 :
2080 0 : self.new_deltas.clear();
2081 0 : self.layers_to_delete.clear();
2082 0 : Ok(())
2083 0 : }
2084 : }
2085 :
2086 : #[derive(Clone)]
2087 : struct ResidentDeltaLayer(ResidentLayer);
2088 : #[derive(Clone)]
2089 : struct ResidentImageLayer(ResidentLayer);
2090 :
2091 : impl CompactionJobExecutor for TimelineAdaptor {
2092 : type Key = crate::repository::Key;
2093 :
2094 : type Layer = OwnArc<PersistentLayerDesc>;
2095 : type DeltaLayer = ResidentDeltaLayer;
2096 : type ImageLayer = ResidentImageLayer;
2097 :
2098 : type RequestContext = crate::context::RequestContext;
2099 :
2100 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2101 0 : self.timeline.get_shard_identity()
2102 0 : }
2103 :
2104 0 : async fn get_layers(
2105 0 : &mut self,
2106 0 : key_range: &Range<Key>,
2107 0 : lsn_range: &Range<Lsn>,
2108 0 : _ctx: &RequestContext,
2109 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2110 0 : self.flush_updates().await?;
2111 :
2112 0 : let guard = self.timeline.layers.read().await;
2113 0 : let layer_map = guard.layer_map()?;
2114 :
2115 0 : let result = layer_map
2116 0 : .iter_historic_layers()
2117 0 : .filter(|l| {
2118 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2119 0 : })
2120 0 : .map(OwnArc)
2121 0 : .collect();
2122 0 : Ok(result)
2123 0 : }
2124 :
2125 0 : async fn get_keyspace(
2126 0 : &mut self,
2127 0 : key_range: &Range<Key>,
2128 0 : lsn: Lsn,
2129 0 : _ctx: &RequestContext,
2130 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2131 0 : if lsn == self.keyspace.0 {
2132 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2133 0 : &self.keyspace.1.ranges,
2134 0 : key_range,
2135 0 : ))
2136 : } else {
2137 : // The current compaction implementation only ever requests the key space
2138 : // at the compaction end LSN.
2139 0 : anyhow::bail!("keyspace not available for requested lsn");
2140 : }
2141 0 : }
2142 :
2143 0 : async fn downcast_delta_layer(
2144 0 : &self,
2145 0 : layer: &OwnArc<PersistentLayerDesc>,
2146 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2147 0 : // this is a lot more complex than a simple downcast...
2148 0 : if layer.is_delta() {
2149 0 : let l = {
2150 0 : let guard = self.timeline.layers.read().await;
2151 0 : guard.get_from_desc(layer)
2152 : };
2153 0 : let result = l.download_and_keep_resident().await?;
2154 :
2155 0 : Ok(Some(ResidentDeltaLayer(result)))
2156 : } else {
2157 0 : Ok(None)
2158 : }
2159 0 : }
2160 :
2161 0 : async fn create_image(
2162 0 : &mut self,
2163 0 : lsn: Lsn,
2164 0 : key_range: &Range<Key>,
2165 0 : ctx: &RequestContext,
2166 0 : ) -> anyhow::Result<()> {
2167 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2168 0 : }
2169 :
2170 0 : async fn create_delta(
2171 0 : &mut self,
2172 0 : lsn_range: &Range<Lsn>,
2173 0 : key_range: &Range<Key>,
2174 0 : input_layers: &[ResidentDeltaLayer],
2175 0 : ctx: &RequestContext,
2176 0 : ) -> anyhow::Result<()> {
2177 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2178 :
2179 0 : let mut all_entries = Vec::new();
2180 0 : for dl in input_layers.iter() {
2181 0 : all_entries.extend(dl.load_keys(ctx).await?);
2182 : }
2183 :
2184 : // The current stdlib sorting implementation is designed in a way where it is
2185 : // particularly fast where the slice is made up of sorted sub-ranges.
2186 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2187 :
2188 0 : let mut writer = DeltaLayerWriter::new(
2189 0 : self.timeline.conf,
2190 0 : self.timeline.timeline_id,
2191 0 : self.timeline.tenant_shard_id,
2192 0 : key_range.start,
2193 0 : lsn_range.clone(),
2194 0 : ctx,
2195 0 : )
2196 0 : .await?;
2197 :
2198 0 : let mut dup_values = 0;
2199 0 :
2200 0 : // This iterator walks through all key-value pairs from all the layers
2201 0 : // we're compacting, in key, LSN order.
2202 0 : let mut prev: Option<(Key, Lsn)> = None;
2203 : for &DeltaEntry {
2204 0 : key, lsn, ref val, ..
2205 0 : } in all_entries.iter()
2206 : {
2207 0 : if prev == Some((key, lsn)) {
2208 : // This is a duplicate. Skip it.
2209 : //
2210 : // It can happen if compaction is interrupted after writing some
2211 : // layers but not all, and we are compacting the range again.
2212 : // The calculations in the algorithm assume that there are no
2213 : // duplicates, so the math on targeted file size is likely off,
2214 : // and we will create smaller files than expected.
2215 0 : dup_values += 1;
2216 0 : continue;
2217 0 : }
2218 :
2219 0 : let value = val.load(ctx).await?;
2220 :
2221 0 : writer.put_value(key, lsn, value, ctx).await?;
2222 :
2223 0 : prev = Some((key, lsn));
2224 : }
2225 :
2226 0 : if dup_values > 0 {
2227 0 : warn!("delta layer created with {} duplicate values", dup_values);
2228 0 : }
2229 :
2230 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2231 0 : Err(anyhow::anyhow!(
2232 0 : "failpoint delta-layer-writer-fail-before-finish"
2233 0 : ))
2234 0 : });
2235 :
2236 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2237 0 : let new_delta_layer =
2238 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2239 :
2240 0 : self.new_deltas.push(new_delta_layer);
2241 0 : Ok(())
2242 0 : }
2243 :
2244 0 : async fn delete_layer(
2245 0 : &mut self,
2246 0 : layer: &OwnArc<PersistentLayerDesc>,
2247 0 : _ctx: &RequestContext,
2248 0 : ) -> anyhow::Result<()> {
2249 0 : self.layers_to_delete.push(layer.clone().0);
2250 0 : Ok(())
2251 0 : }
2252 : }
2253 :
2254 : impl TimelineAdaptor {
2255 0 : async fn create_image_impl(
2256 0 : &mut self,
2257 0 : lsn: Lsn,
2258 0 : key_range: &Range<Key>,
2259 0 : ctx: &RequestContext,
2260 0 : ) -> Result<(), CreateImageLayersError> {
2261 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2262 :
2263 0 : let image_layer_writer = ImageLayerWriter::new(
2264 0 : self.timeline.conf,
2265 0 : self.timeline.timeline_id,
2266 0 : self.timeline.tenant_shard_id,
2267 0 : key_range,
2268 0 : lsn,
2269 0 : ctx,
2270 0 : )
2271 0 : .await?;
2272 :
2273 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2274 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2275 0 : "failpoint image-layer-writer-fail-before-finish"
2276 0 : )))
2277 0 : });
2278 :
2279 0 : let keyspace = KeySpace {
2280 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2281 : };
2282 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2283 0 : let start = Key::MIN;
2284 : let ImageLayerCreationOutcome {
2285 0 : image,
2286 : next_start_key: _,
2287 0 : } = self
2288 0 : .timeline
2289 0 : .create_image_layer_for_rel_blocks(
2290 0 : &keyspace,
2291 0 : image_layer_writer,
2292 0 : lsn,
2293 0 : ctx,
2294 0 : key_range.clone(),
2295 0 : start,
2296 0 : )
2297 0 : .await?;
2298 :
2299 0 : if let Some(image_layer) = image {
2300 0 : self.new_images.push(image_layer);
2301 0 : }
2302 :
2303 0 : timer.stop_and_record();
2304 0 :
2305 0 : Ok(())
2306 0 : }
2307 : }
2308 :
2309 : impl CompactionRequestContext for crate::context::RequestContext {}
2310 :
2311 : #[derive(Debug, Clone)]
2312 : pub struct OwnArc<T>(pub Arc<T>);
2313 :
2314 : impl<T> Deref for OwnArc<T> {
2315 : type Target = <Arc<T> as Deref>::Target;
2316 0 : fn deref(&self) -> &Self::Target {
2317 0 : &self.0
2318 0 : }
2319 : }
2320 :
2321 : impl<T> AsRef<T> for OwnArc<T> {
2322 0 : fn as_ref(&self) -> &T {
2323 0 : self.0.as_ref()
2324 0 : }
2325 : }
2326 :
2327 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2328 0 : fn key_range(&self) -> &Range<Key> {
2329 0 : &self.key_range
2330 0 : }
2331 0 : fn lsn_range(&self) -> &Range<Lsn> {
2332 0 : &self.lsn_range
2333 0 : }
2334 0 : fn file_size(&self) -> u64 {
2335 0 : self.file_size
2336 0 : }
2337 0 : fn short_id(&self) -> std::string::String {
2338 0 : self.as_ref().short_id().to_string()
2339 0 : }
2340 0 : fn is_delta(&self) -> bool {
2341 0 : self.as_ref().is_delta()
2342 0 : }
2343 : }
2344 :
2345 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2346 0 : fn key_range(&self) -> &Range<Key> {
2347 0 : &self.layer_desc().key_range
2348 0 : }
2349 0 : fn lsn_range(&self) -> &Range<Lsn> {
2350 0 : &self.layer_desc().lsn_range
2351 0 : }
2352 0 : fn file_size(&self) -> u64 {
2353 0 : self.layer_desc().file_size
2354 0 : }
2355 0 : fn short_id(&self) -> std::string::String {
2356 0 : self.layer_desc().short_id().to_string()
2357 0 : }
2358 0 : fn is_delta(&self) -> bool {
2359 0 : true
2360 0 : }
2361 : }
2362 :
2363 : use crate::tenant::timeline::DeltaEntry;
2364 :
2365 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2366 0 : fn key_range(&self) -> &Range<Key> {
2367 0 : &self.0.layer_desc().key_range
2368 0 : }
2369 0 : fn lsn_range(&self) -> &Range<Lsn> {
2370 0 : &self.0.layer_desc().lsn_range
2371 0 : }
2372 0 : fn file_size(&self) -> u64 {
2373 0 : self.0.layer_desc().file_size
2374 0 : }
2375 0 : fn short_id(&self) -> std::string::String {
2376 0 : self.0.layer_desc().short_id().to_string()
2377 0 : }
2378 0 : fn is_delta(&self) -> bool {
2379 0 : true
2380 0 : }
2381 : }
2382 :
2383 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2384 : type DeltaEntry<'a> = DeltaEntry<'a>;
2385 :
2386 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2387 0 : self.0.load_keys(ctx).await
2388 0 : }
2389 : }
2390 :
2391 : impl CompactionLayer<Key> for ResidentImageLayer {
2392 0 : fn key_range(&self) -> &Range<Key> {
2393 0 : &self.0.layer_desc().key_range
2394 0 : }
2395 0 : fn lsn_range(&self) -> &Range<Lsn> {
2396 0 : &self.0.layer_desc().lsn_range
2397 0 : }
2398 0 : fn file_size(&self) -> u64 {
2399 0 : self.0.layer_desc().file_size
2400 0 : }
2401 0 : fn short_id(&self) -> std::string::String {
2402 0 : self.0.layer_desc().short_id().to_string()
2403 0 : }
2404 0 : fn is_delta(&self) -> bool {
2405 0 : false
2406 0 : }
2407 : }
2408 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|