Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use fail::fail_point;
20 : use itertools::Itertools;
21 : use pageserver_api::key::KEY_SIZE;
22 : use pageserver_api::keyspace::ShardedRange;
23 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
24 : use serde::Serialize;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, info, info_span, trace, warn, Instrument};
27 : use utils::id::TimelineId;
28 :
29 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
30 : use crate::page_cache;
31 : use crate::statvfs::Statvfs;
32 : use crate::tenant::checks::check_valid_layermap;
33 : use crate::tenant::remote_timeline_client::WaitCompletionError;
34 : use crate::tenant::storage_layer::batch_split_writer::{
35 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
36 : };
37 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
38 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
39 : use crate::tenant::storage_layer::{
40 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
41 : };
42 : use crate::tenant::timeline::ImageLayerCreationOutcome;
43 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
44 : use crate::tenant::timeline::{Layer, ResidentLayer};
45 : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
46 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
47 : use pageserver_api::config::tenant_conf_defaults::{
48 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
49 : };
50 :
51 : use pageserver_api::key::Key;
52 : use pageserver_api::keyspace::KeySpace;
53 : use pageserver_api::record::NeonWalRecord;
54 : use pageserver_api::value::Value;
55 :
56 : use utils::lsn::Lsn;
57 :
58 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
59 : use pageserver_compaction::interface::*;
60 :
61 : use super::CompactionError;
62 :
63 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
64 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
65 :
66 : /// A scheduled compaction task.
67 : pub(crate) struct ScheduledCompactionTask {
68 : /// It's unfortunate that we need to store a compact options struct here because the only outer
69 : /// API we can call here is `compact_with_options` which does a few setup calls before starting the
70 : /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
71 : pub options: CompactOptions,
72 : /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
73 : pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
74 : /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
75 : pub gc_block: Option<gc_block::Guard>,
76 : }
77 :
78 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
79 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
80 : /// called.
81 : #[derive(Debug, Clone)]
82 : pub(crate) struct GcCompactJob {
83 : pub dry_run: bool,
84 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
85 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
86 : pub compact_key_range: Range<Key>,
87 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
88 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
89 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
90 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
91 : pub compact_lsn_range: Range<Lsn>,
92 : }
93 :
94 : impl GcCompactJob {
95 54 : pub fn from_compact_options(options: CompactOptions) -> Self {
96 54 : GcCompactJob {
97 54 : dry_run: options.flags.contains(CompactFlags::DryRun),
98 54 : compact_key_range: options
99 54 : .compact_key_range
100 54 : .map(|x| x.into())
101 54 : .unwrap_or(Key::MIN..Key::MAX),
102 54 : compact_lsn_range: options
103 54 : .compact_lsn_range
104 54 : .map(|x| x.into())
105 54 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
106 54 : }
107 54 : }
108 : }
109 :
110 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
111 : /// and contains the exact layers we want to compact.
112 : pub struct GcCompactionJobDescription {
113 : /// All layers to read in the compaction job
114 : selected_layers: Vec<Layer>,
115 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
116 : /// keep all deltas <= this LSN or generate an image == this LSN.
117 : gc_cutoff: Lsn,
118 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
119 : /// generate an image == this LSN.
120 : retain_lsns_below_horizon: Vec<Lsn>,
121 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
122 : /// \>= this LSN will be kept and will not be rewritten.
123 : max_layer_lsn: Lsn,
124 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
125 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
126 : /// k-merge within gc-compaction.
127 : min_layer_lsn: Lsn,
128 : /// Only compact layers overlapping with this range.
129 : compaction_key_range: Range<Key>,
130 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
131 : /// This field is here solely for debugging. The field will not be read once the compaction
132 : /// description is generated.
133 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
134 : }
135 :
136 : /// The result of bottom-most compaction for a single key at each LSN.
137 : #[derive(Debug)]
138 : #[cfg_attr(test, derive(PartialEq))]
139 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
140 :
141 : /// The result of bottom-most compaction.
142 : #[derive(Debug)]
143 : #[cfg_attr(test, derive(PartialEq))]
144 : pub(crate) struct KeyHistoryRetention {
145 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
146 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
147 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
148 : pub(crate) above_horizon: KeyLogAtLsn,
149 : }
150 :
151 : impl KeyHistoryRetention {
152 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
153 : ///
154 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
155 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
156 : /// And we have branches at LSN 0x10, 0x20, 0x30.
157 : /// Then we delete branch @ 0x20.
158 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
159 : /// And that wouldnt' change the shape of the layer.
160 : ///
161 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
162 : ///
163 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
164 74 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
165 74 : if dry_run {
166 0 : return true;
167 74 : }
168 74 : let guard = tline.layers.read().await;
169 74 : if !guard.contains_key(key) {
170 52 : return false;
171 22 : }
172 22 : let layer_generation = guard.get_from_key(key).metadata().generation;
173 22 : drop(guard);
174 22 : if layer_generation == tline.generation {
175 22 : info!(
176 : key=%key,
177 : ?layer_generation,
178 0 : "discard layer due to duplicated layer key in the same generation",
179 : );
180 22 : true
181 : } else {
182 0 : false
183 : }
184 74 : }
185 :
186 : /// Pipe a history of a single key to the writers.
187 : ///
188 : /// If `image_writer` is none, the images will be placed into the delta layers.
189 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
190 : #[allow(clippy::too_many_arguments)]
191 622 : async fn pipe_to(
192 622 : self,
193 622 : key: Key,
194 622 : delta_writer: &mut SplitDeltaLayerWriter,
195 622 : mut image_writer: Option<&mut SplitImageLayerWriter>,
196 622 : stat: &mut CompactionStatistics,
197 622 : ctx: &RequestContext,
198 622 : ) -> anyhow::Result<()> {
199 622 : let mut first_batch = true;
200 2012 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
201 1390 : if first_batch {
202 622 : if logs.len() == 1 && logs[0].1.is_image() {
203 584 : let Value::Image(img) = &logs[0].1 else {
204 0 : unreachable!()
205 : };
206 584 : stat.produce_image_key(img);
207 584 : if let Some(image_writer) = image_writer.as_mut() {
208 584 : image_writer.put_image(key, img.clone(), ctx).await?;
209 : } else {
210 0 : delta_writer
211 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
212 0 : .await?;
213 : }
214 : } else {
215 66 : for (lsn, val) in logs {
216 28 : stat.produce_key(&val);
217 28 : delta_writer.put_value(key, lsn, val, ctx).await?;
218 : }
219 : }
220 622 : first_batch = false;
221 : } else {
222 884 : for (lsn, val) in logs {
223 116 : stat.produce_key(&val);
224 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
225 : }
226 : }
227 : }
228 622 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
229 680 : for (lsn, val) in above_horizon_logs {
230 58 : stat.produce_key(&val);
231 58 : delta_writer.put_value(key, lsn, val, ctx).await?;
232 : }
233 622 : Ok(())
234 622 : }
235 : }
236 :
237 : #[derive(Debug, Serialize, Default)]
238 : struct CompactionStatisticsNumSize {
239 : num: u64,
240 : size: u64,
241 : }
242 :
243 : #[derive(Debug, Serialize, Default)]
244 : pub struct CompactionStatistics {
245 : delta_layer_visited: CompactionStatisticsNumSize,
246 : image_layer_visited: CompactionStatisticsNumSize,
247 : delta_layer_produced: CompactionStatisticsNumSize,
248 : image_layer_produced: CompactionStatisticsNumSize,
249 : num_delta_layer_discarded: usize,
250 : num_image_layer_discarded: usize,
251 : num_unique_keys_visited: usize,
252 : wal_keys_visited: CompactionStatisticsNumSize,
253 : image_keys_visited: CompactionStatisticsNumSize,
254 : wal_produced: CompactionStatisticsNumSize,
255 : image_produced: CompactionStatisticsNumSize,
256 : }
257 :
258 : impl CompactionStatistics {
259 1042 : fn estimated_size_of_value(val: &Value) -> usize {
260 432 : match val {
261 610 : Value::Image(img) => img.len(),
262 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
263 432 : _ => std::mem::size_of::<NeonWalRecord>(),
264 : }
265 1042 : }
266 1636 : fn estimated_size_of_key() -> usize {
267 1636 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
268 1636 : }
269 86 : fn visit_delta_layer(&mut self, size: u64) {
270 86 : self.delta_layer_visited.num += 1;
271 86 : self.delta_layer_visited.size += size;
272 86 : }
273 66 : fn visit_image_layer(&mut self, size: u64) {
274 66 : self.image_layer_visited.num += 1;
275 66 : self.image_layer_visited.size += size;
276 66 : }
277 622 : fn on_unique_key_visited(&mut self) {
278 622 : self.num_unique_keys_visited += 1;
279 622 : }
280 240 : fn visit_wal_key(&mut self, val: &Value) {
281 240 : self.wal_keys_visited.num += 1;
282 240 : self.wal_keys_visited.size +=
283 240 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
284 240 : }
285 610 : fn visit_image_key(&mut self, val: &Value) {
286 610 : self.image_keys_visited.num += 1;
287 610 : self.image_keys_visited.size +=
288 610 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
289 610 : }
290 202 : fn produce_key(&mut self, val: &Value) {
291 202 : match val {
292 10 : Value::Image(img) => self.produce_image_key(img),
293 192 : Value::WalRecord(_) => self.produce_wal_key(val),
294 : }
295 202 : }
296 192 : fn produce_wal_key(&mut self, val: &Value) {
297 192 : self.wal_produced.num += 1;
298 192 : self.wal_produced.size +=
299 192 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
300 192 : }
301 594 : fn produce_image_key(&mut self, val: &Bytes) {
302 594 : self.image_produced.num += 1;
303 594 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
304 594 : }
305 14 : fn discard_delta_layer(&mut self) {
306 14 : self.num_delta_layer_discarded += 1;
307 14 : }
308 8 : fn discard_image_layer(&mut self) {
309 8 : self.num_image_layer_discarded += 1;
310 8 : }
311 22 : fn produce_delta_layer(&mut self, size: u64) {
312 22 : self.delta_layer_produced.num += 1;
313 22 : self.delta_layer_produced.size += size;
314 22 : }
315 30 : fn produce_image_layer(&mut self, size: u64) {
316 30 : self.image_layer_produced.num += 1;
317 30 : self.image_layer_produced.size += size;
318 30 : }
319 : }
320 :
321 : impl Timeline {
322 : /// TODO: cancellation
323 : ///
324 : /// Returns whether the compaction has pending tasks.
325 364 : pub(crate) async fn compact_legacy(
326 364 : self: &Arc<Self>,
327 364 : cancel: &CancellationToken,
328 364 : options: CompactOptions,
329 364 : ctx: &RequestContext,
330 364 : ) -> Result<bool, CompactionError> {
331 364 : if options
332 364 : .flags
333 364 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
334 : {
335 0 : self.compact_with_gc(cancel, options, ctx)
336 0 : .await
337 0 : .map_err(CompactionError::Other)?;
338 0 : return Ok(false);
339 364 : }
340 364 :
341 364 : if options.flags.contains(CompactFlags::DryRun) {
342 0 : return Err(CompactionError::Other(anyhow!(
343 0 : "dry-run mode is not supported for legacy compaction for now"
344 0 : )));
345 364 : }
346 364 :
347 364 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
348 : // maybe useful in the future? could implement this at some point
349 0 : return Err(CompactionError::Other(anyhow!(
350 0 : "compaction range is not supported for legacy compaction for now"
351 0 : )));
352 364 : }
353 364 :
354 364 : // High level strategy for compaction / image creation:
355 364 : //
356 364 : // 1. First, calculate the desired "partitioning" of the
357 364 : // currently in-use key space. The goal is to partition the
358 364 : // key space into roughly fixed-size chunks, but also take into
359 364 : // account any existing image layers, and try to align the
360 364 : // chunk boundaries with the existing image layers to avoid
361 364 : // too much churn. Also try to align chunk boundaries with
362 364 : // relation boundaries. In principle, we don't know about
363 364 : // relation boundaries here, we just deal with key-value
364 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
365 364 : // map relations into key-value pairs. But in practice we know
366 364 : // that 'field6' is the block number, and the fields 1-5
367 364 : // identify a relation. This is just an optimization,
368 364 : // though.
369 364 : //
370 364 : // 2. Once we know the partitioning, for each partition,
371 364 : // decide if it's time to create a new image layer. The
372 364 : // criteria is: there has been too much "churn" since the last
373 364 : // image layer? The "churn" is fuzzy concept, it's a
374 364 : // combination of too many delta files, or too much WAL in
375 364 : // total in the delta file. Or perhaps: if creating an image
376 364 : // file would allow to delete some older files.
377 364 : //
378 364 : // 3. After that, we compact all level0 delta files if there
379 364 : // are too many of them. While compacting, we also garbage
380 364 : // collect any page versions that are no longer needed because
381 364 : // of the new image layers we created in step 2.
382 364 : //
383 364 : // TODO: This high level strategy hasn't been implemented yet.
384 364 : // Below are functions compact_level0() and create_image_layers()
385 364 : // but they are a bit ad hoc and don't quite work like it's explained
386 364 : // above. Rewrite it.
387 364 :
388 364 : // Is the timeline being deleted?
389 364 : if self.is_stopping() {
390 0 : trace!("Dropping out of compaction on timeline shutdown");
391 0 : return Err(CompactionError::ShuttingDown);
392 364 : }
393 364 :
394 364 : let target_file_size = self.get_checkpoint_distance();
395 :
396 : // Define partitioning schema if needed
397 :
398 : // FIXME: the match should only cover repartitioning, not the next steps
399 364 : let (partition_count, has_pending_tasks) = match self
400 364 : .repartition(
401 364 : self.get_last_record_lsn(),
402 364 : self.get_compaction_target_size(),
403 364 : options.flags,
404 364 : ctx,
405 364 : )
406 364 : .await
407 : {
408 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
409 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
410 364 : let image_ctx = RequestContextBuilder::extend(ctx)
411 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
412 364 : .build();
413 364 :
414 364 : // 2. Compact
415 364 : let timer = self.metrics.compact_time_histo.start_timer();
416 364 : let fully_compacted = self
417 364 : .compact_level0(
418 364 : target_file_size,
419 364 : options.flags.contains(CompactFlags::ForceL0Compaction),
420 364 : ctx,
421 364 : )
422 364 : .await?;
423 364 : timer.stop_and_record();
424 364 :
425 364 : let mut partitioning = dense_partitioning;
426 364 : partitioning
427 364 : .parts
428 364 : .extend(sparse_partitioning.into_dense().parts);
429 364 :
430 364 : // 3. Create new image layers for partitions that have been modified
431 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
432 364 : if fully_compacted {
433 364 : let image_layers = self
434 364 : .create_image_layers(
435 364 : &partitioning,
436 364 : lsn,
437 364 : if options
438 364 : .flags
439 364 : .contains(CompactFlags::ForceImageLayerCreation)
440 : {
441 14 : ImageLayerCreationMode::Force
442 : } else {
443 350 : ImageLayerCreationMode::Try
444 : },
445 364 : &image_ctx,
446 364 : )
447 364 : .await?;
448 :
449 364 : self.upload_new_image_layers(image_layers)?;
450 : } else {
451 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
452 : }
453 364 : (partitioning.parts.len(), !fully_compacted)
454 : }
455 0 : Err(err) => {
456 0 : // no partitioning? This is normal, if the timeline was just created
457 0 : // as an empty timeline. Also in unit tests, when we use the timeline
458 0 : // as a simple key-value store, ignoring the datadir layout. Log the
459 0 : // error but continue.
460 0 : //
461 0 : // Suppress error when it's due to cancellation
462 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
463 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
464 0 : }
465 0 : (1, false)
466 : }
467 : };
468 :
469 364 : if self.shard_identity.count >= ShardCount::new(2) {
470 : // Limit the number of layer rewrites to the number of partitions: this means its
471 : // runtime should be comparable to a full round of image layer creations, rather than
472 : // being potentially much longer.
473 0 : let rewrite_max = partition_count;
474 0 :
475 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
476 364 : }
477 :
478 364 : Ok(has_pending_tasks)
479 364 : }
480 :
481 : /// Check for layers that are elegible to be rewritten:
482 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
483 : /// we don't indefinitely retain keys in this shard that aren't needed.
484 : /// - For future use: layers beyond pitr_interval that are in formats we would
485 : /// rather not maintain compatibility with indefinitely.
486 : ///
487 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
488 : /// how much work it will try to do in each compaction pass.
489 0 : async fn compact_shard_ancestors(
490 0 : self: &Arc<Self>,
491 0 : rewrite_max: usize,
492 0 : ctx: &RequestContext,
493 0 : ) -> Result<(), CompactionError> {
494 0 : let mut drop_layers = Vec::new();
495 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
496 0 :
497 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
498 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
499 0 : // pitr_interval, for example because a branchpoint references it.
500 0 : //
501 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
502 0 : // are rewriting layers.
503 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
504 0 :
505 0 : tracing::info!(
506 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
507 0 : *latest_gc_cutoff,
508 0 : self.gc_info.read().unwrap().cutoffs.time
509 : );
510 :
511 0 : let layers = self.layers.read().await;
512 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
513 0 : let layer = layers.get_from_desc(&layer_desc);
514 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
515 : // This layer does not belong to a historic ancestor, no need to re-image it.
516 0 : continue;
517 0 : }
518 0 :
519 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
520 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
521 0 : let layer_local_page_count = sharded_range.page_count();
522 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
523 0 : if layer_local_page_count == 0 {
524 : // This ancestral layer only covers keys that belong to other shards.
525 : // We include the full metadata in the log: if we had some critical bug that caused
526 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
527 0 : info!(%layer, old_metadata=?layer.metadata(),
528 0 : "dropping layer after shard split, contains no keys for this shard.",
529 : );
530 :
531 0 : if cfg!(debug_assertions) {
532 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
533 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
534 : // should be !is_key_disposable()
535 0 : let range = layer_desc.get_key_range();
536 0 : let mut key = range.start;
537 0 : while key < range.end {
538 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
539 0 : key = key.next();
540 : }
541 0 : }
542 :
543 0 : drop_layers.push(layer);
544 0 : continue;
545 0 : } else if layer_local_page_count != u32::MAX
546 0 : && layer_local_page_count == layer_raw_page_count
547 : {
548 0 : debug!(%layer,
549 0 : "layer is entirely shard local ({} keys), no need to filter it",
550 : layer_local_page_count
551 : );
552 0 : continue;
553 0 : }
554 0 :
555 0 : // Don't bother re-writing a layer unless it will at least halve its size
556 0 : if layer_local_page_count != u32::MAX
557 0 : && layer_local_page_count > layer_raw_page_count / 2
558 : {
559 0 : debug!(%layer,
560 0 : "layer is already mostly local ({}/{}), not rewriting",
561 : layer_local_page_count,
562 : layer_raw_page_count
563 : );
564 0 : }
565 :
566 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
567 : // without incurring the I/O cost of a rewrite.
568 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
569 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
570 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
571 0 : continue;
572 0 : }
573 0 :
574 0 : if layer_desc.is_delta() {
575 : // We do not yet implement rewrite of delta layers
576 0 : debug!(%layer, "Skipping rewrite of delta layer");
577 0 : continue;
578 0 : }
579 0 :
580 0 : // Only rewrite layers if their generations differ. This guarantees:
581 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
582 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
583 0 : if layer.metadata().generation == self.generation {
584 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
585 0 : continue;
586 0 : }
587 0 :
588 0 : if layers_to_rewrite.len() >= rewrite_max {
589 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
590 0 : layers_to_rewrite.len()
591 : );
592 0 : continue;
593 0 : }
594 0 :
595 0 : // Fall through: all our conditions for doing a rewrite passed.
596 0 : layers_to_rewrite.push(layer);
597 : }
598 :
599 : // Drop read lock on layer map before we start doing time-consuming I/O
600 0 : drop(layers);
601 0 :
602 0 : let mut replace_image_layers = Vec::new();
603 :
604 0 : for layer in layers_to_rewrite {
605 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
606 0 : let mut image_layer_writer = ImageLayerWriter::new(
607 0 : self.conf,
608 0 : self.timeline_id,
609 0 : self.tenant_shard_id,
610 0 : &layer.layer_desc().key_range,
611 0 : layer.layer_desc().image_layer_lsn(),
612 0 : ctx,
613 0 : )
614 0 : .await
615 0 : .map_err(CompactionError::Other)?;
616 :
617 : // Safety of layer rewrites:
618 : // - We are writing to a different local file path than we are reading from, so the old Layer
619 : // cannot interfere with the new one.
620 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
621 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
622 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
623 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
624 : // - Any readers that have a reference to the old layer will keep it alive until they are done
625 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
626 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
627 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
628 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
629 : // - ingestion, which only inserts layers, therefore cannot collide with us.
630 0 : let resident = layer.download_and_keep_resident().await?;
631 :
632 0 : let keys_written = resident
633 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
634 0 : .await?;
635 :
636 0 : if keys_written > 0 {
637 0 : let (desc, path) = image_layer_writer
638 0 : .finish(ctx)
639 0 : .await
640 0 : .map_err(CompactionError::Other)?;
641 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
642 0 : .map_err(CompactionError::Other)?;
643 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
644 0 : layer.metadata().file_size,
645 0 : new_layer.metadata().file_size);
646 :
647 0 : replace_image_layers.push((layer, new_layer));
648 0 : } else {
649 0 : // Drop the old layer. Usually for this case we would already have noticed that
650 0 : // the layer has no data for us with the ShardedRange check above, but
651 0 : drop_layers.push(layer);
652 0 : }
653 : }
654 :
655 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
656 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
657 : // to remote index) and be removed. This is inefficient but safe.
658 0 : fail::fail_point!("compact-shard-ancestors-localonly");
659 0 :
660 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
661 0 : self.rewrite_layers(replace_image_layers, drop_layers)
662 0 : .await?;
663 :
664 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
665 0 :
666 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
667 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
668 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
669 0 : // load.
670 0 : match self.remote_client.wait_completion().await {
671 0 : Ok(()) => (),
672 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
673 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
674 0 : return Err(CompactionError::ShuttingDown)
675 : }
676 : }
677 :
678 0 : fail::fail_point!("compact-shard-ancestors-persistent");
679 0 :
680 0 : Ok(())
681 0 : }
682 :
683 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
684 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
685 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
686 : ///
687 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
688 : /// that we know won't be needed for reads.
689 202 : pub(super) async fn update_layer_visibility(
690 202 : &self,
691 202 : ) -> Result<(), super::layer_manager::Shutdown> {
692 202 : let head_lsn = self.get_last_record_lsn();
693 :
694 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
695 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
696 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
697 : // they will be subject to L0->L1 compaction in the near future.
698 202 : let layer_manager = self.layers.read().await;
699 202 : let layer_map = layer_manager.layer_map()?;
700 :
701 202 : let readable_points = {
702 202 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
703 202 :
704 202 : let mut readable_points = Vec::with_capacity(children.len() + 1);
705 202 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
706 0 : if *is_offloaded == MaybeOffloaded::Yes {
707 0 : continue;
708 0 : }
709 0 : readable_points.push(*child_lsn);
710 : }
711 202 : readable_points.push(head_lsn);
712 202 : readable_points
713 202 : };
714 202 :
715 202 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
716 524 : for (layer_desc, visibility) in layer_visibility {
717 322 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
718 322 : let layer = layer_manager.get_from_desc(&layer_desc);
719 322 : layer.set_visibility(visibility);
720 322 : }
721 :
722 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
723 : // avoid assuming that everything at a branch point is visible.
724 202 : drop(covered);
725 202 : Ok(())
726 202 : }
727 :
728 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
729 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
730 364 : async fn compact_level0(
731 364 : self: &Arc<Self>,
732 364 : target_file_size: u64,
733 364 : force_compaction_ignore_threshold: bool,
734 364 : ctx: &RequestContext,
735 364 : ) -> Result<bool, CompactionError> {
736 : let CompactLevel0Phase1Result {
737 364 : new_layers,
738 364 : deltas_to_compact,
739 364 : fully_compacted,
740 : } = {
741 364 : let phase1_span = info_span!("compact_level0_phase1");
742 364 : let ctx = ctx.attached_child();
743 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
744 364 : version: Some(2),
745 364 : tenant_id: Some(self.tenant_shard_id),
746 364 : timeline_id: Some(self.timeline_id),
747 364 : ..Default::default()
748 364 : };
749 364 :
750 364 : let begin = tokio::time::Instant::now();
751 364 : let phase1_layers_locked = self.layers.read().await;
752 364 : let now = tokio::time::Instant::now();
753 364 : stats.read_lock_acquisition_micros =
754 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
755 364 : self.compact_level0_phase1(
756 364 : phase1_layers_locked,
757 364 : stats,
758 364 : target_file_size,
759 364 : force_compaction_ignore_threshold,
760 364 : &ctx,
761 364 : )
762 364 : .instrument(phase1_span)
763 364 : .await?
764 : };
765 :
766 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
767 : // nothing to do
768 336 : return Ok(true);
769 28 : }
770 28 :
771 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
772 28 : .await?;
773 28 : Ok(fully_compacted)
774 364 : }
775 :
776 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
777 364 : async fn compact_level0_phase1<'a>(
778 364 : self: &'a Arc<Self>,
779 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
780 364 : mut stats: CompactLevel0Phase1StatsBuilder,
781 364 : target_file_size: u64,
782 364 : force_compaction_ignore_threshold: bool,
783 364 : ctx: &RequestContext,
784 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
785 364 : stats.read_lock_held_spawn_blocking_startup_micros =
786 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
787 364 : let layers = guard.layer_map()?;
788 364 : let level0_deltas = layers.level0_deltas();
789 364 : stats.level0_deltas_count = Some(level0_deltas.len());
790 364 :
791 364 : // Only compact if enough layers have accumulated.
792 364 : let threshold = self.get_compaction_threshold();
793 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
794 336 : if force_compaction_ignore_threshold {
795 0 : if !level0_deltas.is_empty() {
796 0 : info!(
797 0 : level0_deltas = level0_deltas.len(),
798 0 : threshold, "too few deltas to compact, but forcing compaction"
799 : );
800 : } else {
801 0 : info!(
802 0 : level0_deltas = level0_deltas.len(),
803 0 : threshold, "too few deltas to compact, cannot force compaction"
804 : );
805 0 : return Ok(CompactLevel0Phase1Result::default());
806 : }
807 : } else {
808 336 : debug!(
809 0 : level0_deltas = level0_deltas.len(),
810 0 : threshold, "too few deltas to compact"
811 : );
812 336 : return Ok(CompactLevel0Phase1Result::default());
813 : }
814 28 : }
815 :
816 28 : let mut level0_deltas = level0_deltas
817 28 : .iter()
818 402 : .map(|x| guard.get_from_desc(x))
819 28 : .collect::<Vec<_>>();
820 28 :
821 28 : // Gather the files to compact in this iteration.
822 28 : //
823 28 : // Start with the oldest Level 0 delta file, and collect any other
824 28 : // level 0 files that form a contiguous sequence, such that the end
825 28 : // LSN of previous file matches the start LSN of the next file.
826 28 : //
827 28 : // Note that if the files don't form such a sequence, we might
828 28 : // "compact" just a single file. That's a bit pointless, but it allows
829 28 : // us to get rid of the level 0 file, and compact the other files on
830 28 : // the next iteration. This could probably made smarter, but such
831 28 : // "gaps" in the sequence of level 0 files should only happen in case
832 28 : // of a crash, partial download from cloud storage, or something like
833 28 : // that, so it's not a big deal in practice.
834 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
835 28 : let mut level0_deltas_iter = level0_deltas.iter();
836 28 :
837 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
838 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
839 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
840 28 :
841 28 : // Accumulate the size of layers in `deltas_to_compact`
842 28 : let mut deltas_to_compact_bytes = 0;
843 28 :
844 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
845 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
846 28 : // work in this function to only operate on this much delta data at once.
847 28 : //
848 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
849 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
850 28 : // still let them compact a full stack of L0s in one go.
851 28 : let delta_size_limit = std::cmp::max(
852 28 : self.get_compaction_threshold(),
853 28 : DEFAULT_COMPACTION_THRESHOLD,
854 28 : ) as u64
855 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
856 28 :
857 28 : let mut fully_compacted = true;
858 28 :
859 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
860 402 : for l in level0_deltas_iter {
861 374 : let lsn_range = &l.layer_desc().lsn_range;
862 374 :
863 374 : if lsn_range.start != prev_lsn_end {
864 0 : break;
865 374 : }
866 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
867 374 : deltas_to_compact_bytes += l.metadata().file_size;
868 374 : prev_lsn_end = lsn_range.end;
869 374 :
870 374 : if deltas_to_compact_bytes >= delta_size_limit {
871 0 : info!(
872 0 : l0_deltas_selected = deltas_to_compact.len(),
873 0 : l0_deltas_total = level0_deltas.len(),
874 0 : "L0 compaction picker hit max delta layer size limit: {}",
875 : delta_size_limit
876 : );
877 0 : fully_compacted = false;
878 0 :
879 0 : // Proceed with compaction, but only a subset of L0s
880 0 : break;
881 374 : }
882 : }
883 28 : let lsn_range = Range {
884 28 : start: deltas_to_compact
885 28 : .first()
886 28 : .unwrap()
887 28 : .layer_desc()
888 28 : .lsn_range
889 28 : .start,
890 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
891 28 : };
892 28 :
893 28 : info!(
894 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
895 0 : lsn_range.start,
896 0 : lsn_range.end,
897 0 : deltas_to_compact.len(),
898 0 : level0_deltas.len()
899 : );
900 :
901 402 : for l in deltas_to_compact.iter() {
902 402 : info!("compact includes {l}");
903 : }
904 :
905 : // We don't need the original list of layers anymore. Drop it so that
906 : // we don't accidentally use it later in the function.
907 28 : drop(level0_deltas);
908 28 :
909 28 : stats.read_lock_held_prerequisites_micros = stats
910 28 : .read_lock_held_spawn_blocking_startup_micros
911 28 : .till_now();
912 :
913 : // TODO: replace with streaming k-merge
914 28 : let all_keys = {
915 28 : let mut all_keys = Vec::new();
916 402 : for l in deltas_to_compact.iter() {
917 402 : if self.cancel.is_cancelled() {
918 0 : return Err(CompactionError::ShuttingDown);
919 402 : }
920 402 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
921 402 : let keys = delta
922 402 : .index_entries(ctx)
923 402 : .await
924 402 : .map_err(CompactionError::Other)?;
925 402 : all_keys.extend(keys);
926 : }
927 : // The current stdlib sorting implementation is designed in a way where it is
928 : // particularly fast where the slice is made up of sorted sub-ranges.
929 4423784 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
930 28 : all_keys
931 28 : };
932 28 :
933 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
934 :
935 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
936 : //
937 : // A hole is a key range for which this compaction doesn't have any WAL records.
938 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
939 : // cover the hole, but actually don't contain any WAL records for that key range.
940 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
941 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
942 : //
943 : // The algorithm chooses holes as follows.
944 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
945 : // - Filter: min threshold on range length
946 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
947 : //
948 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
949 : #[derive(PartialEq, Eq)]
950 : struct Hole {
951 : key_range: Range<Key>,
952 : coverage_size: usize,
953 : }
954 28 : let holes: Vec<Hole> = {
955 : use std::cmp::Ordering;
956 : impl Ord for Hole {
957 0 : fn cmp(&self, other: &Self) -> Ordering {
958 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
959 0 : }
960 : }
961 : impl PartialOrd for Hole {
962 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
963 0 : Some(self.cmp(other))
964 0 : }
965 : }
966 28 : let max_holes = deltas_to_compact.len();
967 28 : let last_record_lsn = self.get_last_record_lsn();
968 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
969 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
970 28 : // min-heap (reserve space for one more element added before eviction)
971 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
972 28 : let mut prev: Option<Key> = None;
973 :
974 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
975 2064038 : if let Some(prev_key) = prev {
976 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
977 : // compaction is the gap between data key and metadata keys.
978 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
979 0 : && !Key::is_metadata_key(&prev_key)
980 : {
981 0 : let key_range = prev_key..next_key;
982 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
983 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
984 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
985 0 : // That is why it is better to measure size of hole as number of covering image layers.
986 0 : let coverage_size =
987 0 : layers.image_coverage(&key_range, last_record_lsn).len();
988 0 : if coverage_size >= min_hole_coverage_size {
989 0 : heap.push(Hole {
990 0 : key_range,
991 0 : coverage_size,
992 0 : });
993 0 : if heap.len() > max_holes {
994 0 : heap.pop(); // remove smallest hole
995 0 : }
996 0 : }
997 2064010 : }
998 28 : }
999 2064038 : prev = Some(next_key.next());
1000 : }
1001 28 : let mut holes = heap.into_vec();
1002 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1003 28 : holes
1004 28 : };
1005 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1006 28 : drop_rlock(guard);
1007 28 :
1008 28 : if self.cancel.is_cancelled() {
1009 0 : return Err(CompactionError::ShuttingDown);
1010 28 : }
1011 28 :
1012 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1013 :
1014 : // This iterator walks through all key-value pairs from all the layers
1015 : // we're compacting, in key, LSN order.
1016 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1017 : // then the Value::Image is ordered before Value::WalRecord.
1018 28 : let mut all_values_iter = {
1019 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1020 402 : for l in deltas_to_compact.iter() {
1021 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1022 402 : deltas.push(l);
1023 : }
1024 28 : MergeIterator::create(&deltas, &[], ctx)
1025 28 : };
1026 28 :
1027 28 : // This iterator walks through all keys and is needed to calculate size used by each key
1028 28 : let mut all_keys_iter = all_keys
1029 28 : .iter()
1030 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1031 2064010 : .coalesce(|mut prev, cur| {
1032 2064010 : // Coalesce keys that belong to the same key pair.
1033 2064010 : // This ensures that compaction doesn't put them
1034 2064010 : // into different layer files.
1035 2064010 : // Still limit this by the target file size,
1036 2064010 : // so that we keep the size of the files in
1037 2064010 : // check.
1038 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
1039 40038 : prev.2 += cur.2;
1040 40038 : Ok(prev)
1041 : } else {
1042 2023972 : Err((prev, cur))
1043 : }
1044 2064010 : });
1045 28 :
1046 28 : // Merge the contents of all the input delta layers into a new set
1047 28 : // of delta layers, based on the current partitioning.
1048 28 : //
1049 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1050 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1051 28 : // would be too large. In that case, we also split on the LSN dimension.
1052 28 : //
1053 28 : // LSN
1054 28 : // ^
1055 28 : // |
1056 28 : // | +-----------+ +--+--+--+--+
1057 28 : // | | | | | | | |
1058 28 : // | +-----------+ | | | | |
1059 28 : // | | | | | | | |
1060 28 : // | +-----------+ ==> | | | | |
1061 28 : // | | | | | | | |
1062 28 : // | +-----------+ | | | | |
1063 28 : // | | | | | | | |
1064 28 : // | +-----------+ +--+--+--+--+
1065 28 : // |
1066 28 : // +--------------> key
1067 28 : //
1068 28 : //
1069 28 : // If one key (X) has a lot of page versions:
1070 28 : //
1071 28 : // LSN
1072 28 : // ^
1073 28 : // | (X)
1074 28 : // | +-----------+ +--+--+--+--+
1075 28 : // | | | | | | | |
1076 28 : // | +-----------+ | | +--+ |
1077 28 : // | | | | | | | |
1078 28 : // | +-----------+ ==> | | | | |
1079 28 : // | | | | | +--+ |
1080 28 : // | +-----------+ | | | | |
1081 28 : // | | | | | | | |
1082 28 : // | +-----------+ +--+--+--+--+
1083 28 : // |
1084 28 : // +--------------> key
1085 28 : // TODO: this actually divides the layers into fixed-size chunks, not
1086 28 : // based on the partitioning.
1087 28 : //
1088 28 : // TODO: we should also opportunistically materialize and
1089 28 : // garbage collect what we can.
1090 28 : let mut new_layers = Vec::new();
1091 28 : let mut prev_key: Option<Key> = None;
1092 28 : let mut writer: Option<DeltaLayerWriter> = None;
1093 28 : let mut key_values_total_size = 0u64;
1094 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1095 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1096 28 : let mut next_hole = 0; // index of next hole in holes vector
1097 28 :
1098 28 : let mut keys = 0;
1099 :
1100 2064066 : while let Some((key, lsn, value)) = all_values_iter
1101 2064066 : .next()
1102 2064066 : .await
1103 2064066 : .map_err(CompactionError::Other)?
1104 : {
1105 2064038 : keys += 1;
1106 2064038 :
1107 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1108 : // avoid hitting the cancellation token on every key. in benches, we end up
1109 : // shuffling an order of million keys per layer, this means we'll check it
1110 : // around tens of times per layer.
1111 0 : return Err(CompactionError::ShuttingDown);
1112 2064038 : }
1113 2064038 :
1114 2064038 : let same_key = prev_key == Some(key);
1115 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1116 2064038 : if !same_key || lsn == dup_end_lsn {
1117 2024000 : let mut next_key_size = 0u64;
1118 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1119 2024000 : dup_start_lsn = Lsn::INVALID;
1120 2024000 : if !same_key {
1121 2024000 : dup_end_lsn = Lsn::INVALID;
1122 2024000 : }
1123 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1124 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1125 2024000 : next_key_size = next_size;
1126 2024000 : if key != next_key {
1127 2023972 : if dup_end_lsn.is_valid() {
1128 0 : // We are writting segment with duplicates:
1129 0 : // place all remaining values of this key in separate segment
1130 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1131 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1132 2023972 : }
1133 2023972 : break;
1134 28 : }
1135 28 : key_values_total_size += next_size;
1136 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1137 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1138 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1139 : // Split key between multiple layers: such layer can contain only single key
1140 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1141 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1142 : } else {
1143 0 : lsn // start with the first LSN for this key
1144 : };
1145 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1146 0 : break;
1147 28 : }
1148 : }
1149 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1150 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1151 0 : dup_start_lsn = dup_end_lsn;
1152 0 : dup_end_lsn = lsn_range.end;
1153 2024000 : }
1154 2024000 : if writer.is_some() {
1155 2023972 : let written_size = writer.as_mut().unwrap().size();
1156 2023972 : let contains_hole =
1157 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1158 : // check if key cause layer overflow or contains hole...
1159 2023972 : if is_dup_layer
1160 2023972 : || dup_end_lsn.is_valid()
1161 2023972 : || written_size + key_values_total_size > target_file_size
1162 2023692 : || contains_hole
1163 : {
1164 : // ... if so, flush previous layer and prepare to write new one
1165 280 : let (desc, path) = writer
1166 280 : .take()
1167 280 : .unwrap()
1168 280 : .finish(prev_key.unwrap().next(), ctx)
1169 280 : .await
1170 280 : .map_err(CompactionError::Other)?;
1171 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1172 280 : .map_err(CompactionError::Other)?;
1173 :
1174 280 : new_layers.push(new_delta);
1175 280 : writer = None;
1176 280 :
1177 280 : if contains_hole {
1178 0 : // skip hole
1179 0 : next_hole += 1;
1180 280 : }
1181 2023692 : }
1182 28 : }
1183 : // Remember size of key value because at next iteration we will access next item
1184 2024000 : key_values_total_size = next_key_size;
1185 40038 : }
1186 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1187 0 : Err(CompactionError::Other(anyhow::anyhow!(
1188 0 : "failpoint delta-layer-writer-fail-before-finish"
1189 0 : )))
1190 2064038 : });
1191 :
1192 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1193 2064038 : if writer.is_none() {
1194 308 : if self.cancel.is_cancelled() {
1195 : // to be somewhat responsive to cancellation, check for each new layer
1196 0 : return Err(CompactionError::ShuttingDown);
1197 308 : }
1198 : // Create writer if not initiaized yet
1199 308 : writer = Some(
1200 : DeltaLayerWriter::new(
1201 308 : self.conf,
1202 308 : self.timeline_id,
1203 308 : self.tenant_shard_id,
1204 308 : key,
1205 308 : if dup_end_lsn.is_valid() {
1206 : // this is a layer containing slice of values of the same key
1207 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1208 0 : dup_start_lsn..dup_end_lsn
1209 : } else {
1210 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1211 308 : lsn_range.clone()
1212 : },
1213 308 : ctx,
1214 308 : )
1215 308 : .await
1216 308 : .map_err(CompactionError::Other)?,
1217 : );
1218 :
1219 308 : keys = 0;
1220 2063730 : }
1221 :
1222 2064038 : writer
1223 2064038 : .as_mut()
1224 2064038 : .unwrap()
1225 2064038 : .put_value(key, lsn, value, ctx)
1226 2064038 : .await
1227 2064038 : .map_err(CompactionError::Other)?;
1228 : } else {
1229 0 : let shard = self.shard_identity.shard_index();
1230 0 : let owner = self.shard_identity.get_shard_number(&key);
1231 0 : if cfg!(debug_assertions) {
1232 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
1233 0 : }
1234 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
1235 : }
1236 :
1237 2064038 : if !new_layers.is_empty() {
1238 19786 : fail_point!("after-timeline-compacted-first-L1");
1239 2044252 : }
1240 :
1241 2064038 : prev_key = Some(key);
1242 : }
1243 28 : if let Some(writer) = writer {
1244 28 : let (desc, path) = writer
1245 28 : .finish(prev_key.unwrap().next(), ctx)
1246 28 : .await
1247 28 : .map_err(CompactionError::Other)?;
1248 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1249 28 : .map_err(CompactionError::Other)?;
1250 28 : new_layers.push(new_delta);
1251 0 : }
1252 :
1253 : // Sync layers
1254 28 : if !new_layers.is_empty() {
1255 : // Print a warning if the created layer is larger than double the target size
1256 : // Add two pages for potential overhead. This should in theory be already
1257 : // accounted for in the target calculation, but for very small targets,
1258 : // we still might easily hit the limit otherwise.
1259 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1260 308 : for layer in new_layers.iter() {
1261 308 : if layer.layer_desc().file_size > warn_limit {
1262 0 : warn!(
1263 : %layer,
1264 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1265 : );
1266 308 : }
1267 : }
1268 :
1269 : // The writer.finish() above already did the fsync of the inodes.
1270 : // We just need to fsync the directory in which these inodes are linked,
1271 : // which we know to be the timeline directory.
1272 : //
1273 : // We use fatal_err() below because the after writer.finish() returns with success,
1274 : // the in-memory state of the filesystem already has the layer file in its final place,
1275 : // and subsequent pageserver code could think it's durable while it really isn't.
1276 28 : let timeline_dir = VirtualFile::open(
1277 28 : &self
1278 28 : .conf
1279 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1280 28 : ctx,
1281 28 : )
1282 28 : .await
1283 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1284 28 : timeline_dir
1285 28 : .sync_all()
1286 28 : .await
1287 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1288 0 : }
1289 :
1290 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1291 28 : stats.new_deltas_count = Some(new_layers.len());
1292 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1293 28 :
1294 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1295 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1296 : {
1297 28 : Ok(stats_json) => {
1298 28 : info!(
1299 0 : stats_json = stats_json.as_str(),
1300 0 : "compact_level0_phase1 stats available"
1301 : )
1302 : }
1303 0 : Err(e) => {
1304 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1305 : }
1306 : }
1307 :
1308 : // Without this, rustc complains about deltas_to_compact still
1309 : // being borrowed when we `.into_iter()` below.
1310 28 : drop(all_values_iter);
1311 28 :
1312 28 : Ok(CompactLevel0Phase1Result {
1313 28 : new_layers,
1314 28 : deltas_to_compact: deltas_to_compact
1315 28 : .into_iter()
1316 402 : .map(|x| x.drop_eviction_guard())
1317 28 : .collect::<Vec<_>>(),
1318 28 : fully_compacted,
1319 28 : })
1320 364 : }
1321 : }
1322 :
1323 : #[derive(Default)]
1324 : struct CompactLevel0Phase1Result {
1325 : new_layers: Vec<ResidentLayer>,
1326 : deltas_to_compact: Vec<Layer>,
1327 : // Whether we have included all L0 layers, or selected only part of them due to the
1328 : // L0 compaction size limit.
1329 : fully_compacted: bool,
1330 : }
1331 :
1332 : #[derive(Default)]
1333 : struct CompactLevel0Phase1StatsBuilder {
1334 : version: Option<u64>,
1335 : tenant_id: Option<TenantShardId>,
1336 : timeline_id: Option<TimelineId>,
1337 : read_lock_acquisition_micros: DurationRecorder,
1338 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1339 : read_lock_held_key_sort_micros: DurationRecorder,
1340 : read_lock_held_prerequisites_micros: DurationRecorder,
1341 : read_lock_held_compute_holes_micros: DurationRecorder,
1342 : read_lock_drop_micros: DurationRecorder,
1343 : write_layer_files_micros: DurationRecorder,
1344 : level0_deltas_count: Option<usize>,
1345 : new_deltas_count: Option<usize>,
1346 : new_deltas_size: Option<u64>,
1347 : }
1348 :
1349 : #[derive(serde::Serialize)]
1350 : struct CompactLevel0Phase1Stats {
1351 : version: u64,
1352 : tenant_id: TenantShardId,
1353 : timeline_id: TimelineId,
1354 : read_lock_acquisition_micros: RecordedDuration,
1355 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1356 : read_lock_held_key_sort_micros: RecordedDuration,
1357 : read_lock_held_prerequisites_micros: RecordedDuration,
1358 : read_lock_held_compute_holes_micros: RecordedDuration,
1359 : read_lock_drop_micros: RecordedDuration,
1360 : write_layer_files_micros: RecordedDuration,
1361 : level0_deltas_count: usize,
1362 : new_deltas_count: usize,
1363 : new_deltas_size: u64,
1364 : }
1365 :
1366 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1367 : type Error = anyhow::Error;
1368 :
1369 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1370 28 : Ok(Self {
1371 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1372 28 : tenant_id: value
1373 28 : .tenant_id
1374 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1375 28 : timeline_id: value
1376 28 : .timeline_id
1377 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1378 28 : read_lock_acquisition_micros: value
1379 28 : .read_lock_acquisition_micros
1380 28 : .into_recorded()
1381 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1382 28 : read_lock_held_spawn_blocking_startup_micros: value
1383 28 : .read_lock_held_spawn_blocking_startup_micros
1384 28 : .into_recorded()
1385 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1386 28 : read_lock_held_key_sort_micros: value
1387 28 : .read_lock_held_key_sort_micros
1388 28 : .into_recorded()
1389 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1390 28 : read_lock_held_prerequisites_micros: value
1391 28 : .read_lock_held_prerequisites_micros
1392 28 : .into_recorded()
1393 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1394 28 : read_lock_held_compute_holes_micros: value
1395 28 : .read_lock_held_compute_holes_micros
1396 28 : .into_recorded()
1397 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1398 28 : read_lock_drop_micros: value
1399 28 : .read_lock_drop_micros
1400 28 : .into_recorded()
1401 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1402 28 : write_layer_files_micros: value
1403 28 : .write_layer_files_micros
1404 28 : .into_recorded()
1405 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1406 28 : level0_deltas_count: value
1407 28 : .level0_deltas_count
1408 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1409 28 : new_deltas_count: value
1410 28 : .new_deltas_count
1411 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1412 28 : new_deltas_size: value
1413 28 : .new_deltas_size
1414 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1415 : })
1416 28 : }
1417 : }
1418 :
1419 : impl Timeline {
1420 : /// Entry point for new tiered compaction algorithm.
1421 : ///
1422 : /// All the real work is in the implementation in the pageserver_compaction
1423 : /// crate. The code here would apply to any algorithm implemented by the
1424 : /// same interface, but tiered is the only one at the moment.
1425 : ///
1426 : /// TODO: cancellation
1427 0 : pub(crate) async fn compact_tiered(
1428 0 : self: &Arc<Self>,
1429 0 : _cancel: &CancellationToken,
1430 0 : ctx: &RequestContext,
1431 0 : ) -> Result<(), CompactionError> {
1432 0 : let fanout = self.get_compaction_threshold() as u64;
1433 0 : let target_file_size = self.get_checkpoint_distance();
1434 :
1435 : // Find the top of the historical layers
1436 0 : let end_lsn = {
1437 0 : let guard = self.layers.read().await;
1438 0 : let layers = guard.layer_map()?;
1439 :
1440 0 : let l0_deltas = layers.level0_deltas();
1441 0 :
1442 0 : // As an optimization, if we find that there are too few L0 layers,
1443 0 : // bail out early. We know that the compaction algorithm would do
1444 0 : // nothing in that case.
1445 0 : if l0_deltas.len() < fanout as usize {
1446 : // doesn't need compacting
1447 0 : return Ok(());
1448 0 : }
1449 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1450 0 : };
1451 0 :
1452 0 : // Is the timeline being deleted?
1453 0 : if self.is_stopping() {
1454 0 : trace!("Dropping out of compaction on timeline shutdown");
1455 0 : return Err(CompactionError::ShuttingDown);
1456 0 : }
1457 :
1458 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1459 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1460 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1461 0 :
1462 0 : pageserver_compaction::compact_tiered::compact_tiered(
1463 0 : &mut adaptor,
1464 0 : end_lsn,
1465 0 : target_file_size,
1466 0 : fanout,
1467 0 : ctx,
1468 0 : )
1469 0 : .await
1470 : // TODO: compact_tiered needs to return CompactionError
1471 0 : .map_err(CompactionError::Other)?;
1472 :
1473 0 : adaptor.flush_updates().await?;
1474 0 : Ok(())
1475 0 : }
1476 :
1477 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1478 : ///
1479 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1480 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1481 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1482 : ///
1483 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1484 : ///
1485 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1486 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1487 : ///
1488 : /// The function will produce:
1489 : ///
1490 : /// ```plain
1491 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1492 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1493 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1494 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1495 : /// ```
1496 : ///
1497 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1498 630 : pub(crate) async fn generate_key_retention(
1499 630 : self: &Arc<Timeline>,
1500 630 : key: Key,
1501 630 : full_history: &[(Key, Lsn, Value)],
1502 630 : horizon: Lsn,
1503 630 : retain_lsn_below_horizon: &[Lsn],
1504 630 : delta_threshold_cnt: usize,
1505 630 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1506 630 : ) -> anyhow::Result<KeyHistoryRetention> {
1507 630 : // Pre-checks for the invariants
1508 630 : if cfg!(debug_assertions) {
1509 1530 : for (log_key, _, _) in full_history {
1510 900 : assert_eq!(log_key, &key, "mismatched key");
1511 : }
1512 630 : for i in 1..full_history.len() {
1513 270 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1514 270 : if full_history[i - 1].1 == full_history[i].1 {
1515 0 : assert!(
1516 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1517 0 : "unordered delta/image, or duplicated delta"
1518 : );
1519 270 : }
1520 : }
1521 : // There was an assertion for no base image that checks if the first
1522 : // record in the history is `will_init` before, but it was removed.
1523 : // This is explained in the test cases for generate_key_retention.
1524 : // Search "incomplete history" for more information.
1525 1410 : for lsn in retain_lsn_below_horizon {
1526 780 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1527 : }
1528 630 : for i in 1..retain_lsn_below_horizon.len() {
1529 356 : assert!(
1530 356 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1531 0 : "unordered LSN"
1532 : );
1533 : }
1534 0 : }
1535 630 : let has_ancestor = base_img_from_ancestor.is_some();
1536 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1537 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1538 630 : let (mut split_history, lsn_split_points) = {
1539 630 : let mut split_history = Vec::new();
1540 630 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1541 630 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1542 1410 : for lsn in retain_lsn_below_horizon {
1543 780 : lsn_split_points.push(*lsn);
1544 780 : }
1545 630 : lsn_split_points.push(horizon);
1546 630 : let mut current_idx = 0;
1547 1530 : for item @ (_, lsn, _) in full_history {
1548 1144 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1549 244 : current_idx += 1;
1550 244 : }
1551 900 : split_history[current_idx].push(item);
1552 : }
1553 630 : (split_history, lsn_split_points)
1554 : };
1555 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1556 2670 : for split_for_lsn in &mut split_history {
1557 2040 : let mut prev_lsn = None;
1558 2040 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1559 2040 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1560 900 : if let Some(prev_lsn) = &prev_lsn {
1561 118 : if *prev_lsn == lsn {
1562 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1563 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1564 : // drop this delta and keep the image.
1565 : //
1566 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1567 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1568 : // dropped.
1569 : //
1570 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1571 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1572 0 : continue;
1573 118 : }
1574 782 : }
1575 900 : prev_lsn = Some(lsn);
1576 900 : new_split_for_lsn.push(record);
1577 : }
1578 2040 : *split_for_lsn = new_split_for_lsn;
1579 : }
1580 : // Step 3: generate images when necessary
1581 630 : let mut retention = Vec::with_capacity(split_history.len());
1582 630 : let mut records_since_last_image = 0;
1583 630 : let batch_cnt = split_history.len();
1584 630 : assert!(
1585 630 : batch_cnt >= 2,
1586 0 : "should have at least below + above horizon batches"
1587 : );
1588 630 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1589 630 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1590 42 : replay_history.push((key, lsn, Value::Image(img)));
1591 588 : }
1592 :
1593 : /// Generate debug information for the replay history
1594 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1595 : use std::fmt::Write;
1596 0 : let mut output = String::new();
1597 0 : if let Some((key, _, _)) = replay_history.first() {
1598 0 : write!(output, "key={} ", key).unwrap();
1599 0 : let mut cnt = 0;
1600 0 : for (_, lsn, val) in replay_history {
1601 0 : if val.is_image() {
1602 0 : write!(output, "i@{} ", lsn).unwrap();
1603 0 : } else if val.will_init() {
1604 0 : write!(output, "di@{} ", lsn).unwrap();
1605 0 : } else {
1606 0 : write!(output, "d@{} ", lsn).unwrap();
1607 0 : }
1608 0 : cnt += 1;
1609 0 : if cnt >= 128 {
1610 0 : write!(output, "... and more").unwrap();
1611 0 : break;
1612 0 : }
1613 : }
1614 0 : } else {
1615 0 : write!(output, "<no history>").unwrap();
1616 0 : }
1617 0 : output
1618 0 : }
1619 :
1620 0 : fn generate_debug_trace(
1621 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1622 0 : full_history: &[(Key, Lsn, Value)],
1623 0 : lsns: &[Lsn],
1624 0 : horizon: Lsn,
1625 0 : ) -> String {
1626 : use std::fmt::Write;
1627 0 : let mut output = String::new();
1628 0 : if let Some(replay_history) = replay_history {
1629 0 : writeln!(
1630 0 : output,
1631 0 : "replay_history: {}",
1632 0 : generate_history_trace(replay_history)
1633 0 : )
1634 0 : .unwrap();
1635 0 : } else {
1636 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1637 0 : }
1638 0 : writeln!(
1639 0 : output,
1640 0 : "full_history: {}",
1641 0 : generate_history_trace(full_history)
1642 0 : )
1643 0 : .unwrap();
1644 0 : writeln!(
1645 0 : output,
1646 0 : "when processing: [{}] horizon={}",
1647 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1648 0 : horizon
1649 0 : )
1650 0 : .unwrap();
1651 0 : output
1652 0 : }
1653 :
1654 2040 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1655 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1656 2040 : records_since_last_image += split_for_lsn.len();
1657 2040 : let generate_image = if i == 0 && !has_ancestor {
1658 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1659 588 : true
1660 1452 : } else if i == batch_cnt - 1 {
1661 : // Do not generate images for the last batch (above horizon)
1662 630 : false
1663 822 : } else if records_since_last_image >= delta_threshold_cnt {
1664 : // Generate images when there are too many records
1665 6 : true
1666 : } else {
1667 816 : false
1668 : };
1669 2040 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1670 : // Only retain the items after the last image record
1671 2514 : for idx in (0..replay_history.len()).rev() {
1672 2514 : if replay_history[idx].2.will_init() {
1673 2040 : replay_history = replay_history[idx..].to_vec();
1674 2040 : break;
1675 474 : }
1676 : }
1677 2040 : if let Some((_, _, val)) = replay_history.first() {
1678 2040 : if !val.will_init() {
1679 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1680 0 : || {
1681 0 : generate_debug_trace(
1682 0 : Some(&replay_history),
1683 0 : full_history,
1684 0 : retain_lsn_below_horizon,
1685 0 : horizon,
1686 0 : )
1687 0 : },
1688 0 : );
1689 2040 : }
1690 0 : }
1691 2040 : if generate_image && records_since_last_image > 0 {
1692 594 : records_since_last_image = 0;
1693 594 : let replay_history_for_debug = if cfg!(debug_assertions) {
1694 594 : Some(replay_history.clone())
1695 : } else {
1696 0 : None
1697 : };
1698 594 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1699 594 : let history = std::mem::take(&mut replay_history);
1700 594 : let mut img = None;
1701 594 : let mut records = Vec::with_capacity(history.len());
1702 594 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1703 572 : img = Some((*lsn, val.clone()));
1704 572 : for (_, lsn, val) in history.into_iter().skip(1) {
1705 34 : let Value::WalRecord(rec) = val else {
1706 0 : return Err(anyhow::anyhow!(
1707 0 : "invalid record, first record is image, expect walrecords"
1708 0 : ))
1709 0 : .with_context(|| {
1710 0 : generate_debug_trace(
1711 0 : replay_history_for_debug_ref,
1712 0 : full_history,
1713 0 : retain_lsn_below_horizon,
1714 0 : horizon,
1715 0 : )
1716 0 : });
1717 : };
1718 34 : records.push((lsn, rec));
1719 : }
1720 : } else {
1721 36 : for (_, lsn, val) in history.into_iter() {
1722 36 : let Value::WalRecord(rec) = val else {
1723 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1724 0 : .with_context(|| generate_debug_trace(
1725 0 : replay_history_for_debug_ref,
1726 0 : full_history,
1727 0 : retain_lsn_below_horizon,
1728 0 : horizon,
1729 0 : ));
1730 : };
1731 36 : records.push((lsn, rec));
1732 : }
1733 : }
1734 594 : records.reverse();
1735 594 : let state = ValueReconstructState { img, records };
1736 594 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1737 594 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1738 594 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1739 594 : retention.push(vec![(request_lsn, Value::Image(img))]);
1740 1446 : } else {
1741 1446 : let deltas = split_for_lsn
1742 1446 : .iter()
1743 1446 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1744 1446 : .collect_vec();
1745 1446 : retention.push(deltas);
1746 1446 : }
1747 : }
1748 630 : let mut result = Vec::with_capacity(retention.len());
1749 630 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1750 2040 : for (idx, logs) in retention.into_iter().enumerate() {
1751 2040 : if idx == lsn_split_points.len() {
1752 630 : return Ok(KeyHistoryRetention {
1753 630 : below_horizon: result,
1754 630 : above_horizon: KeyLogAtLsn(logs),
1755 630 : });
1756 1410 : } else {
1757 1410 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1758 1410 : }
1759 : }
1760 0 : unreachable!("key retention is empty")
1761 630 : }
1762 :
1763 : /// Check how much space is left on the disk
1764 52 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
1765 52 : let tenants_dir = self.conf.tenants_path();
1766 :
1767 52 : let stat = Statvfs::get(&tenants_dir, None)
1768 52 : .context("statvfs failed, presumably directory got unlinked")?;
1769 :
1770 52 : let (avail_bytes, _) = stat.get_avail_total_bytes();
1771 52 :
1772 52 : Ok(avail_bytes)
1773 52 : }
1774 :
1775 : /// Check if the compaction can proceed safely without running out of space. We assume the size
1776 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
1777 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
1778 : /// compaction.
1779 52 : async fn check_compaction_space(
1780 52 : self: &Arc<Self>,
1781 52 : layer_selection: &[Layer],
1782 52 : ) -> anyhow::Result<()> {
1783 52 : let available_space = self.check_available_space().await?;
1784 52 : let mut remote_layer_size = 0;
1785 52 : let mut all_layer_size = 0;
1786 204 : for layer in layer_selection {
1787 152 : let needs_download = layer.needs_download().await?;
1788 152 : if needs_download.is_some() {
1789 0 : remote_layer_size += layer.layer_desc().file_size;
1790 152 : }
1791 152 : all_layer_size += layer.layer_desc().file_size;
1792 : }
1793 52 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
1794 52 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
1795 : {
1796 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
1797 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
1798 52 : }
1799 52 : Ok(())
1800 52 : }
1801 :
1802 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
1803 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
1804 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
1805 : /// here.
1806 54 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
1807 54 : let gc_cutoff_lsn = {
1808 54 : let gc_info = self.gc_info.read().unwrap();
1809 54 : gc_info.min_cutoff()
1810 54 : };
1811 54 :
1812 54 : // TODO: standby horizon should use leases so we don't really need to consider it here.
1813 54 : // let watermark = watermark.min(self.standby_horizon.load());
1814 54 :
1815 54 : // TODO: ensure the child branches will not use anything below the watermark, or consider
1816 54 : // them when computing the watermark.
1817 54 : gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
1818 54 : }
1819 :
1820 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
1821 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
1822 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
1823 : /// like a full compaction of the specified keyspace.
1824 0 : pub(crate) async fn gc_compaction_split_jobs(
1825 0 : self: &Arc<Self>,
1826 0 : job: GcCompactJob,
1827 0 : sub_compaction_max_job_size_mb: Option<u64>,
1828 0 : ) -> anyhow::Result<Vec<GcCompactJob>> {
1829 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
1830 0 : job.compact_lsn_range.end
1831 : } else {
1832 0 : self.get_gc_compaction_watermark()
1833 : };
1834 :
1835 : // Split compaction job to about 4GB each
1836 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
1837 0 : let sub_compaction_max_job_size_mb =
1838 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
1839 0 :
1840 0 : let mut compact_jobs = Vec::new();
1841 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
1842 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
1843 0 : let ((dense_ks, sparse_ks), _) = {
1844 0 : let Ok(partition) = self.partitioning.try_lock() else {
1845 0 : bail!("failed to acquire partition lock during gc-compaction");
1846 : };
1847 0 : partition.clone()
1848 : };
1849 : // Truncate the key range to be within user specified compaction range.
1850 0 : fn truncate_to(
1851 0 : source_start: &Key,
1852 0 : source_end: &Key,
1853 0 : target_start: &Key,
1854 0 : target_end: &Key,
1855 0 : ) -> Option<(Key, Key)> {
1856 0 : let start = source_start.max(target_start);
1857 0 : let end = source_end.min(target_end);
1858 0 : if start < end {
1859 0 : Some((*start, *end))
1860 : } else {
1861 0 : None
1862 : }
1863 0 : }
1864 0 : let mut split_key_ranges = Vec::new();
1865 0 : let ranges = dense_ks
1866 0 : .parts
1867 0 : .iter()
1868 0 : .map(|partition| partition.ranges.iter())
1869 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
1870 0 : .flatten()
1871 0 : .cloned()
1872 0 : .collect_vec();
1873 0 : for range in ranges.iter() {
1874 0 : let Some((start, end)) = truncate_to(
1875 0 : &range.start,
1876 0 : &range.end,
1877 0 : &job.compact_key_range.start,
1878 0 : &job.compact_key_range.end,
1879 0 : ) else {
1880 0 : continue;
1881 : };
1882 0 : split_key_ranges.push((start, end));
1883 : }
1884 0 : split_key_ranges.sort();
1885 0 : let guard = self.layers.read().await;
1886 0 : let layer_map = guard.layer_map()?;
1887 0 : let mut current_start = None;
1888 0 : let ranges_num = split_key_ranges.len();
1889 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
1890 0 : if current_start.is_none() {
1891 0 : current_start = Some(start);
1892 0 : }
1893 0 : let start = current_start.unwrap();
1894 0 : if start >= end {
1895 : // We have already processed this partition.
1896 0 : continue;
1897 0 : }
1898 0 : let res = layer_map.range_search(start..end, compact_below_lsn);
1899 0 : let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
1900 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
1901 : // Try to extend the compaction range so that we include at least one full layer file.
1902 0 : let extended_end = res
1903 0 : .found
1904 0 : .keys()
1905 0 : .map(|layer| layer.layer.key_range.end)
1906 0 : .min();
1907 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
1908 : // In this case, we simply use the specified key range end.
1909 0 : let end = if let Some(extended_end) = extended_end {
1910 0 : extended_end.max(end)
1911 : } else {
1912 0 : end
1913 : };
1914 0 : info!(
1915 0 : "splitting compaction job: {}..{}, estimated_size={}",
1916 : start, end, total_size
1917 : );
1918 0 : compact_jobs.push(GcCompactJob {
1919 0 : dry_run: job.dry_run,
1920 0 : compact_key_range: start..end,
1921 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
1922 0 : });
1923 0 : current_start = Some(end);
1924 0 : }
1925 : }
1926 0 : drop(guard);
1927 0 : Ok(compact_jobs)
1928 0 : }
1929 :
1930 : /// An experimental compaction building block that combines compaction with garbage collection.
1931 : ///
1932 : /// The current implementation picks all delta + image layers that are below or intersecting with
1933 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1934 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1935 : /// and create delta layers with all deltas >= gc horizon.
1936 : ///
1937 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
1938 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
1939 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
1940 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
1941 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
1942 : /// part of the range.
1943 : ///
1944 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
1945 : /// the LSN. Otherwise, it will use the gc cutoff by default.
1946 54 : pub(crate) async fn compact_with_gc(
1947 54 : self: &Arc<Self>,
1948 54 : cancel: &CancellationToken,
1949 54 : options: CompactOptions,
1950 54 : ctx: &RequestContext,
1951 54 : ) -> anyhow::Result<()> {
1952 54 : let sub_compaction = options.sub_compaction;
1953 54 : let job = GcCompactJob::from_compact_options(options.clone());
1954 54 : if sub_compaction {
1955 0 : info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
1956 0 : let jobs = self
1957 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
1958 0 : .await?;
1959 0 : let jobs_len = jobs.len();
1960 0 : for (idx, job) in jobs.into_iter().enumerate() {
1961 0 : info!(
1962 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
1963 0 : idx + 1,
1964 : jobs_len
1965 : );
1966 0 : self.compact_with_gc_inner(cancel, job, ctx).await?;
1967 : }
1968 0 : if jobs_len == 0 {
1969 0 : info!("no jobs to run, skipping gc bottom-most compaction");
1970 0 : }
1971 0 : return Ok(());
1972 54 : }
1973 54 : self.compact_with_gc_inner(cancel, job, ctx).await
1974 54 : }
1975 :
1976 54 : async fn compact_with_gc_inner(
1977 54 : self: &Arc<Self>,
1978 54 : cancel: &CancellationToken,
1979 54 : job: GcCompactJob,
1980 54 : ctx: &RequestContext,
1981 54 : ) -> anyhow::Result<()> {
1982 54 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1983 54 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1984 54 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1985 54 :
1986 54 : let gc_lock = async {
1987 54 : tokio::select! {
1988 54 : guard = self.gc_lock.lock() => Ok(guard),
1989 : // TODO: refactor to CompactionError to correctly pass cancelled error
1990 54 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1991 : }
1992 54 : };
1993 :
1994 54 : let gc_lock = crate::timed(
1995 54 : gc_lock,
1996 54 : "acquires gc lock",
1997 54 : std::time::Duration::from_secs(5),
1998 54 : )
1999 54 : .await?;
2000 :
2001 54 : let dry_run = job.dry_run;
2002 54 : let compact_key_range = job.compact_key_range;
2003 54 : let compact_lsn_range = job.compact_lsn_range;
2004 54 :
2005 54 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
2006 :
2007 54 : scopeguard::defer! {
2008 54 : info!("done enhanced gc bottom-most compaction");
2009 54 : };
2010 54 :
2011 54 : let mut stat = CompactionStatistics::default();
2012 :
2013 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
2014 : // The layer selection has the following properties:
2015 : // 1. If a layer is in the selection, all layers below it are in the selection.
2016 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
2017 52 : let job_desc = {
2018 54 : let guard = self.layers.read().await;
2019 54 : let layers = guard.layer_map()?;
2020 54 : let gc_info = self.gc_info.read().unwrap();
2021 54 : let mut retain_lsns_below_horizon = Vec::new();
2022 54 : let gc_cutoff = {
2023 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2024 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2025 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2026 : // to get the truth data.
2027 54 : let real_gc_cutoff = self.get_gc_compaction_watermark();
2028 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2029 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2030 : // the real cutoff.
2031 54 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2032 48 : real_gc_cutoff
2033 : } else {
2034 6 : compact_lsn_range.end
2035 : };
2036 54 : if gc_cutoff > real_gc_cutoff {
2037 4 : warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
2038 4 : gc_cutoff = real_gc_cutoff;
2039 50 : }
2040 54 : gc_cutoff
2041 : };
2042 70 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
2043 70 : if lsn < &gc_cutoff {
2044 70 : retain_lsns_below_horizon.push(*lsn);
2045 70 : }
2046 : }
2047 54 : for lsn in gc_info.leases.keys() {
2048 0 : if lsn < &gc_cutoff {
2049 0 : retain_lsns_below_horizon.push(*lsn);
2050 0 : }
2051 : }
2052 54 : let mut selected_layers: Vec<Layer> = Vec::new();
2053 54 : drop(gc_info);
2054 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
2055 54 : let Some(max_layer_lsn) = layers
2056 54 : .iter_historic_layers()
2057 244 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
2058 208 : .map(|desc| desc.get_lsn_range().end)
2059 54 : .max()
2060 : else {
2061 0 : info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
2062 0 : return Ok(());
2063 : };
2064 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
2065 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
2066 : // it is a branch.
2067 54 : let Some(min_layer_lsn) = layers
2068 54 : .iter_historic_layers()
2069 244 : .filter(|desc| {
2070 244 : if compact_lsn_range.start == Lsn::INVALID {
2071 198 : true // select all layers below if start == Lsn(0)
2072 : } else {
2073 46 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
2074 : }
2075 244 : })
2076 226 : .map(|desc| desc.get_lsn_range().start)
2077 54 : .min()
2078 : else {
2079 0 : info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
2080 0 : return Ok(());
2081 : };
2082 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
2083 : // layers to compact.
2084 54 : let mut rewrite_layers = Vec::new();
2085 244 : for desc in layers.iter_historic_layers() {
2086 244 : if desc.get_lsn_range().end <= max_layer_lsn
2087 208 : && desc.get_lsn_range().start >= min_layer_lsn
2088 190 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
2089 : {
2090 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
2091 : // even if it might contain extra keys
2092 152 : selected_layers.push(guard.get_from_desc(&desc));
2093 152 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
2094 152 : // to overlap image layers)
2095 152 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
2096 2 : {
2097 2 : rewrite_layers.push(desc);
2098 150 : }
2099 92 : }
2100 : }
2101 54 : if selected_layers.is_empty() {
2102 2 : info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
2103 2 : return Ok(());
2104 52 : }
2105 52 : retain_lsns_below_horizon.sort();
2106 52 : GcCompactionJobDescription {
2107 52 : selected_layers,
2108 52 : gc_cutoff,
2109 52 : retain_lsns_below_horizon,
2110 52 : min_layer_lsn,
2111 52 : max_layer_lsn,
2112 52 : compaction_key_range: compact_key_range,
2113 52 : rewrite_layers,
2114 52 : }
2115 : };
2116 52 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
2117 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
2118 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
2119 8 : (true, job_desc.min_layer_lsn)
2120 44 : } else if self.ancestor_timeline.is_some() {
2121 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
2122 : // LSN ranges all the way to the ancestor timeline.
2123 2 : (true, self.ancestor_lsn)
2124 : } else {
2125 42 : let res = job_desc
2126 42 : .retain_lsns_below_horizon
2127 42 : .first()
2128 42 : .copied()
2129 42 : .unwrap_or(job_desc.gc_cutoff);
2130 42 : if cfg!(debug_assertions) {
2131 42 : assert_eq!(
2132 42 : res,
2133 42 : job_desc
2134 42 : .retain_lsns_below_horizon
2135 42 : .iter()
2136 42 : .min()
2137 42 : .copied()
2138 42 : .unwrap_or(job_desc.gc_cutoff)
2139 42 : );
2140 0 : }
2141 42 : (false, res)
2142 : };
2143 52 : info!(
2144 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
2145 0 : job_desc.selected_layers.len(),
2146 0 : job_desc.rewrite_layers.len(),
2147 : job_desc.max_layer_lsn,
2148 : job_desc.min_layer_lsn,
2149 : job_desc.gc_cutoff,
2150 : lowest_retain_lsn,
2151 : job_desc.compaction_key_range.start,
2152 : job_desc.compaction_key_range.end,
2153 : has_data_below,
2154 : );
2155 :
2156 204 : for layer in &job_desc.selected_layers {
2157 152 : debug!("read layer: {}", layer.layer_desc().key());
2158 : }
2159 54 : for layer in &job_desc.rewrite_layers {
2160 2 : debug!("rewrite layer: {}", layer.key());
2161 : }
2162 :
2163 52 : self.check_compaction_space(&job_desc.selected_layers)
2164 52 : .await?;
2165 :
2166 : // Generate statistics for the compaction
2167 204 : for layer in &job_desc.selected_layers {
2168 152 : let desc = layer.layer_desc();
2169 152 : if desc.is_delta() {
2170 86 : stat.visit_delta_layer(desc.file_size());
2171 86 : } else {
2172 66 : stat.visit_image_layer(desc.file_size());
2173 66 : }
2174 : }
2175 :
2176 : // Step 1: construct a k-merge iterator over all layers.
2177 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
2178 52 : let layer_names = job_desc
2179 52 : .selected_layers
2180 52 : .iter()
2181 152 : .map(|layer| layer.layer_desc().layer_name())
2182 52 : .collect_vec();
2183 52 : if let Some(err) = check_valid_layermap(&layer_names) {
2184 0 : bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
2185 52 : }
2186 52 : // The maximum LSN we are processing in this compaction loop
2187 52 : let end_lsn = job_desc
2188 52 : .selected_layers
2189 52 : .iter()
2190 152 : .map(|l| l.layer_desc().lsn_range.end)
2191 52 : .max()
2192 52 : .unwrap();
2193 52 : let mut delta_layers = Vec::new();
2194 52 : let mut image_layers = Vec::new();
2195 52 : let mut downloaded_layers = Vec::new();
2196 52 : let mut total_downloaded_size = 0;
2197 52 : let mut total_layer_size = 0;
2198 204 : for layer in &job_desc.selected_layers {
2199 152 : if layer.needs_download().await?.is_some() {
2200 0 : total_downloaded_size += layer.layer_desc().file_size;
2201 152 : }
2202 152 : total_layer_size += layer.layer_desc().file_size;
2203 152 : let resident_layer = layer.download_and_keep_resident().await?;
2204 152 : downloaded_layers.push(resident_layer);
2205 : }
2206 52 : info!(
2207 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
2208 0 : total_downloaded_size,
2209 0 : total_layer_size,
2210 0 : total_downloaded_size as f64 / total_layer_size as f64
2211 : );
2212 204 : for resident_layer in &downloaded_layers {
2213 152 : if resident_layer.layer_desc().is_delta() {
2214 86 : let layer = resident_layer.get_as_delta(ctx).await?;
2215 86 : delta_layers.push(layer);
2216 : } else {
2217 66 : let layer = resident_layer.get_as_image(ctx).await?;
2218 66 : image_layers.push(layer);
2219 : }
2220 : }
2221 52 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
2222 52 : let mut merge_iter = FilterIterator::create(
2223 52 : MergeIterator::create(&delta_layers, &image_layers, ctx),
2224 52 : dense_ks,
2225 52 : sparse_ks,
2226 52 : )?;
2227 :
2228 : // Step 2: Produce images+deltas.
2229 52 : let mut accumulated_values = Vec::new();
2230 52 : let mut last_key: Option<Key> = None;
2231 :
2232 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2233 : // when some condition meet.
2234 52 : let mut image_layer_writer = if !has_data_below {
2235 : Some(
2236 42 : SplitImageLayerWriter::new(
2237 42 : self.conf,
2238 42 : self.timeline_id,
2239 42 : self.tenant_shard_id,
2240 42 : job_desc.compaction_key_range.start,
2241 42 : lowest_retain_lsn,
2242 42 : self.get_compaction_target_size(),
2243 42 : ctx,
2244 42 : )
2245 42 : .await?,
2246 : )
2247 : } else {
2248 10 : None
2249 : };
2250 :
2251 52 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
2252 52 : self.conf,
2253 52 : self.timeline_id,
2254 52 : self.tenant_shard_id,
2255 52 : lowest_retain_lsn..end_lsn,
2256 52 : self.get_compaction_target_size(),
2257 52 : )
2258 52 : .await?;
2259 :
2260 : #[derive(Default)]
2261 : struct RewritingLayers {
2262 : before: Option<DeltaLayerWriter>,
2263 : after: Option<DeltaLayerWriter>,
2264 : }
2265 52 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
2266 :
2267 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
2268 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
2269 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
2270 : ///
2271 : /// This function unifies the cases so that later code doesn't have to think about it.
2272 : ///
2273 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2274 : /// is needed for reconstruction. This should be fixed in the future.
2275 : ///
2276 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2277 : /// images.
2278 622 : async fn get_ancestor_image(
2279 622 : this_tline: &Arc<Timeline>,
2280 622 : key: Key,
2281 622 : ctx: &RequestContext,
2282 622 : has_data_below: bool,
2283 622 : history_lsn_point: Lsn,
2284 622 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2285 622 : if !has_data_below {
2286 584 : return Ok(None);
2287 38 : };
2288 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2289 : // as much existing code as possible.
2290 38 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
2291 38 : Ok(Some((key, history_lsn_point, img)))
2292 622 : }
2293 :
2294 : // Actually, we can decide not to write to the image layer at all at this point because
2295 : // the key and LSN range are determined. However, to keep things simple here, we still
2296 : // create this writer, and discard the writer in the end.
2297 :
2298 966 : while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
2299 914 : if cancel.is_cancelled() {
2300 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2301 914 : }
2302 914 : if self.shard_identity.is_key_disposable(&key) {
2303 : // If this shard does not need to store this key, simply skip it.
2304 : //
2305 : // This is not handled in the filter iterator because shard is determined by hash.
2306 : // Therefore, it does not give us any performance benefit to do things like skip
2307 : // a whole layer file as handling key spaces (ranges).
2308 0 : if cfg!(debug_assertions) {
2309 0 : let shard = self.shard_identity.shard_index();
2310 0 : let owner = self.shard_identity.get_shard_number(&key);
2311 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
2312 0 : }
2313 0 : continue;
2314 914 : }
2315 914 : if !job_desc.compaction_key_range.contains(&key) {
2316 64 : if !desc.is_delta {
2317 60 : continue;
2318 4 : }
2319 4 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
2320 4 : let rewriter = if key < job_desc.compaction_key_range.start {
2321 0 : if rewriter.before.is_none() {
2322 0 : rewriter.before = Some(
2323 0 : DeltaLayerWriter::new(
2324 0 : self.conf,
2325 0 : self.timeline_id,
2326 0 : self.tenant_shard_id,
2327 0 : desc.key_range.start,
2328 0 : desc.lsn_range.clone(),
2329 0 : ctx,
2330 0 : )
2331 0 : .await?,
2332 : );
2333 0 : }
2334 0 : rewriter.before.as_mut().unwrap()
2335 4 : } else if key >= job_desc.compaction_key_range.end {
2336 4 : if rewriter.after.is_none() {
2337 2 : rewriter.after = Some(
2338 2 : DeltaLayerWriter::new(
2339 2 : self.conf,
2340 2 : self.timeline_id,
2341 2 : self.tenant_shard_id,
2342 2 : job_desc.compaction_key_range.end,
2343 2 : desc.lsn_range.clone(),
2344 2 : ctx,
2345 2 : )
2346 2 : .await?,
2347 : );
2348 2 : }
2349 4 : rewriter.after.as_mut().unwrap()
2350 : } else {
2351 0 : unreachable!()
2352 : };
2353 4 : rewriter.put_value(key, lsn, val, ctx).await?;
2354 4 : continue;
2355 850 : }
2356 850 : match val {
2357 610 : Value::Image(_) => stat.visit_image_key(&val),
2358 240 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2359 : }
2360 850 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2361 280 : if last_key.is_none() {
2362 52 : last_key = Some(key);
2363 228 : }
2364 280 : accumulated_values.push((key, lsn, val));
2365 : } else {
2366 570 : let last_key: &mut Key = last_key.as_mut().unwrap();
2367 570 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
2368 570 : let retention = self
2369 570 : .generate_key_retention(
2370 570 : *last_key,
2371 570 : &accumulated_values,
2372 570 : job_desc.gc_cutoff,
2373 570 : &job_desc.retain_lsns_below_horizon,
2374 570 : COMPACTION_DELTA_THRESHOLD,
2375 570 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
2376 570 : .await?,
2377 : )
2378 570 : .await?;
2379 570 : retention
2380 570 : .pipe_to(
2381 570 : *last_key,
2382 570 : &mut delta_layer_writer,
2383 570 : image_layer_writer.as_mut(),
2384 570 : &mut stat,
2385 570 : ctx,
2386 570 : )
2387 570 : .await?;
2388 570 : accumulated_values.clear();
2389 570 : *last_key = key;
2390 570 : accumulated_values.push((key, lsn, val));
2391 : }
2392 : }
2393 :
2394 : // TODO: move the below part to the loop body
2395 52 : let last_key = last_key.expect("no keys produced during compaction");
2396 52 : stat.on_unique_key_visited();
2397 :
2398 52 : let retention = self
2399 52 : .generate_key_retention(
2400 52 : last_key,
2401 52 : &accumulated_values,
2402 52 : job_desc.gc_cutoff,
2403 52 : &job_desc.retain_lsns_below_horizon,
2404 52 : COMPACTION_DELTA_THRESHOLD,
2405 52 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
2406 : )
2407 52 : .await?;
2408 52 : retention
2409 52 : .pipe_to(
2410 52 : last_key,
2411 52 : &mut delta_layer_writer,
2412 52 : image_layer_writer.as_mut(),
2413 52 : &mut stat,
2414 52 : ctx,
2415 52 : )
2416 52 : .await?;
2417 : // end: move the above part to the loop body
2418 :
2419 52 : let mut rewrote_delta_layers = Vec::new();
2420 54 : for (key, writers) in delta_layer_rewriters {
2421 2 : if let Some(delta_writer_before) = writers.before {
2422 0 : let (desc, path) = delta_writer_before
2423 0 : .finish(job_desc.compaction_key_range.start, ctx)
2424 0 : .await?;
2425 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2426 0 : rewrote_delta_layers.push(layer);
2427 2 : }
2428 2 : if let Some(delta_writer_after) = writers.after {
2429 2 : let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
2430 2 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2431 2 : rewrote_delta_layers.push(layer);
2432 0 : }
2433 : }
2434 :
2435 74 : let discard = |key: &PersistentLayerKey| {
2436 74 : let key = key.clone();
2437 74 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2438 74 : };
2439 :
2440 52 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2441 42 : if !dry_run {
2442 38 : let end_key = job_desc.compaction_key_range.end;
2443 38 : writer
2444 38 : .finish_with_discard_fn(self, ctx, end_key, discard)
2445 38 : .await?
2446 : } else {
2447 4 : drop(writer);
2448 4 : Vec::new()
2449 : }
2450 : } else {
2451 10 : Vec::new()
2452 : };
2453 :
2454 52 : let produced_delta_layers = if !dry_run {
2455 48 : delta_layer_writer
2456 48 : .finish_with_discard_fn(self, ctx, discard)
2457 48 : .await?
2458 : } else {
2459 4 : drop(delta_layer_writer);
2460 4 : Vec::new()
2461 : };
2462 :
2463 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
2464 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
2465 52 : let mut compact_to = Vec::new();
2466 52 : let mut keep_layers = HashSet::new();
2467 52 : let produced_delta_layers_len = produced_delta_layers.len();
2468 52 : let produced_image_layers_len = produced_image_layers.len();
2469 88 : for action in produced_delta_layers {
2470 36 : match action {
2471 22 : BatchWriterResult::Produced(layer) => {
2472 22 : if cfg!(debug_assertions) {
2473 22 : info!("produced delta layer: {}", layer.layer_desc().key());
2474 0 : }
2475 22 : stat.produce_delta_layer(layer.layer_desc().file_size());
2476 22 : compact_to.push(layer);
2477 : }
2478 14 : BatchWriterResult::Discarded(l) => {
2479 14 : if cfg!(debug_assertions) {
2480 14 : info!("discarded delta layer: {}", l);
2481 0 : }
2482 14 : keep_layers.insert(l);
2483 14 : stat.discard_delta_layer();
2484 : }
2485 : }
2486 : }
2487 54 : for layer in &rewrote_delta_layers {
2488 2 : debug!(
2489 0 : "produced rewritten delta layer: {}",
2490 0 : layer.layer_desc().key()
2491 : );
2492 : }
2493 52 : compact_to.extend(rewrote_delta_layers);
2494 90 : for action in produced_image_layers {
2495 38 : match action {
2496 30 : BatchWriterResult::Produced(layer) => {
2497 30 : debug!("produced image layer: {}", layer.layer_desc().key());
2498 30 : stat.produce_image_layer(layer.layer_desc().file_size());
2499 30 : compact_to.push(layer);
2500 : }
2501 8 : BatchWriterResult::Discarded(l) => {
2502 8 : debug!("discarded image layer: {}", l);
2503 8 : keep_layers.insert(l);
2504 8 : stat.discard_image_layer();
2505 : }
2506 : }
2507 : }
2508 :
2509 52 : let mut layer_selection = job_desc.selected_layers;
2510 :
2511 : // Partial compaction might select more data than it processes, e.g., if
2512 : // the compaction_key_range only partially overlaps:
2513 : //
2514 : // [---compaction_key_range---]
2515 : // [---A----][----B----][----C----][----D----]
2516 : //
2517 : // For delta layers, we will rewrite the layers so that it is cut exactly at
2518 : // the compaction key range, so we can always discard them. However, for image
2519 : // layers, as we do not rewrite them for now, we need to handle them differently.
2520 : // Assume image layers A, B, C, D are all in the `layer_selection`.
2521 : //
2522 : // The created image layers contain whatever is needed from B, C, and from
2523 : // `----]` of A, and from `[---` of D.
2524 : //
2525 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
2526 : // keep that data.
2527 : //
2528 : // The solution for now is to keep A and D completely if they are image layers.
2529 : // (layer_selection is what we'll remove from the layer map, so, retain what
2530 : // is _not_ fully covered by compaction_key_range).
2531 204 : for layer in &layer_selection {
2532 152 : if !layer.layer_desc().is_delta() {
2533 66 : if !overlaps_with(
2534 66 : &layer.layer_desc().key_range,
2535 66 : &job_desc.compaction_key_range,
2536 66 : ) {
2537 0 : bail!("violated constraint: image layer outside of compaction key range");
2538 66 : }
2539 66 : if !fully_contains(
2540 66 : &job_desc.compaction_key_range,
2541 66 : &layer.layer_desc().key_range,
2542 66 : ) {
2543 8 : keep_layers.insert(layer.layer_desc().key());
2544 58 : }
2545 86 : }
2546 : }
2547 :
2548 152 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2549 52 :
2550 52 : info!(
2551 0 : "gc-compaction statistics: {}",
2552 0 : serde_json::to_string(&stat)?
2553 : );
2554 :
2555 52 : if dry_run {
2556 4 : return Ok(());
2557 48 : }
2558 48 :
2559 48 : info!(
2560 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2561 0 : produced_delta_layers_len,
2562 0 : produced_image_layers_len,
2563 0 : layer_selection.len()
2564 : );
2565 :
2566 : // Step 3: Place back to the layer map.
2567 :
2568 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
2569 48 : let all_layers = {
2570 48 : let guard = self.layers.read().await;
2571 48 : let layer_map = guard.layer_map()?;
2572 48 : layer_map.iter_historic_layers().collect_vec()
2573 48 : };
2574 48 :
2575 48 : let mut final_layers = all_layers
2576 48 : .iter()
2577 214 : .map(|layer| layer.layer_name())
2578 48 : .collect::<HashSet<_>>();
2579 152 : for layer in &layer_selection {
2580 104 : final_layers.remove(&layer.layer_desc().layer_name());
2581 104 : }
2582 102 : for layer in &compact_to {
2583 54 : final_layers.insert(layer.layer_desc().layer_name());
2584 54 : }
2585 48 : let final_layers = final_layers.into_iter().collect_vec();
2586 :
2587 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
2588 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
2589 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
2590 48 : if let Some(err) = check_valid_layermap(&final_layers) {
2591 0 : bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
2592 48 : }
2593 :
2594 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
2595 : // operate on L1 layers.
2596 : {
2597 48 : let mut guard = self.layers.write().await;
2598 48 : guard
2599 48 : .open_mut()?
2600 48 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2601 48 : };
2602 48 :
2603 48 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
2604 48 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
2605 48 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
2606 48 : // be batched into `schedule_compaction_update`.
2607 48 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
2608 48 : self.schedule_uploads(disk_consistent_lsn, None)?;
2609 48 : self.remote_client
2610 48 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2611 :
2612 48 : drop(gc_lock);
2613 48 :
2614 48 : Ok(())
2615 54 : }
2616 : }
2617 :
2618 : struct TimelineAdaptor {
2619 : timeline: Arc<Timeline>,
2620 :
2621 : keyspace: (Lsn, KeySpace),
2622 :
2623 : new_deltas: Vec<ResidentLayer>,
2624 : new_images: Vec<ResidentLayer>,
2625 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2626 : }
2627 :
2628 : impl TimelineAdaptor {
2629 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2630 0 : Self {
2631 0 : timeline: timeline.clone(),
2632 0 : keyspace,
2633 0 : new_images: Vec::new(),
2634 0 : new_deltas: Vec::new(),
2635 0 : layers_to_delete: Vec::new(),
2636 0 : }
2637 0 : }
2638 :
2639 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2640 0 : let layers_to_delete = {
2641 0 : let guard = self.timeline.layers.read().await;
2642 0 : self.layers_to_delete
2643 0 : .iter()
2644 0 : .map(|x| guard.get_from_desc(x))
2645 0 : .collect::<Vec<Layer>>()
2646 0 : };
2647 0 : self.timeline
2648 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2649 0 : .await?;
2650 :
2651 0 : self.timeline
2652 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2653 :
2654 0 : self.new_deltas.clear();
2655 0 : self.layers_to_delete.clear();
2656 0 : Ok(())
2657 0 : }
2658 : }
2659 :
2660 : #[derive(Clone)]
2661 : struct ResidentDeltaLayer(ResidentLayer);
2662 : #[derive(Clone)]
2663 : struct ResidentImageLayer(ResidentLayer);
2664 :
2665 : impl CompactionJobExecutor for TimelineAdaptor {
2666 : type Key = pageserver_api::key::Key;
2667 :
2668 : type Layer = OwnArc<PersistentLayerDesc>;
2669 : type DeltaLayer = ResidentDeltaLayer;
2670 : type ImageLayer = ResidentImageLayer;
2671 :
2672 : type RequestContext = crate::context::RequestContext;
2673 :
2674 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2675 0 : self.timeline.get_shard_identity()
2676 0 : }
2677 :
2678 0 : async fn get_layers(
2679 0 : &mut self,
2680 0 : key_range: &Range<Key>,
2681 0 : lsn_range: &Range<Lsn>,
2682 0 : _ctx: &RequestContext,
2683 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2684 0 : self.flush_updates().await?;
2685 :
2686 0 : let guard = self.timeline.layers.read().await;
2687 0 : let layer_map = guard.layer_map()?;
2688 :
2689 0 : let result = layer_map
2690 0 : .iter_historic_layers()
2691 0 : .filter(|l| {
2692 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2693 0 : })
2694 0 : .map(OwnArc)
2695 0 : .collect();
2696 0 : Ok(result)
2697 0 : }
2698 :
2699 0 : async fn get_keyspace(
2700 0 : &mut self,
2701 0 : key_range: &Range<Key>,
2702 0 : lsn: Lsn,
2703 0 : _ctx: &RequestContext,
2704 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2705 0 : if lsn == self.keyspace.0 {
2706 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2707 0 : &self.keyspace.1.ranges,
2708 0 : key_range,
2709 0 : ))
2710 : } else {
2711 : // The current compaction implementation only ever requests the key space
2712 : // at the compaction end LSN.
2713 0 : anyhow::bail!("keyspace not available for requested lsn");
2714 : }
2715 0 : }
2716 :
2717 0 : async fn downcast_delta_layer(
2718 0 : &self,
2719 0 : layer: &OwnArc<PersistentLayerDesc>,
2720 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2721 0 : // this is a lot more complex than a simple downcast...
2722 0 : if layer.is_delta() {
2723 0 : let l = {
2724 0 : let guard = self.timeline.layers.read().await;
2725 0 : guard.get_from_desc(layer)
2726 : };
2727 0 : let result = l.download_and_keep_resident().await?;
2728 :
2729 0 : Ok(Some(ResidentDeltaLayer(result)))
2730 : } else {
2731 0 : Ok(None)
2732 : }
2733 0 : }
2734 :
2735 0 : async fn create_image(
2736 0 : &mut self,
2737 0 : lsn: Lsn,
2738 0 : key_range: &Range<Key>,
2739 0 : ctx: &RequestContext,
2740 0 : ) -> anyhow::Result<()> {
2741 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2742 0 : }
2743 :
2744 0 : async fn create_delta(
2745 0 : &mut self,
2746 0 : lsn_range: &Range<Lsn>,
2747 0 : key_range: &Range<Key>,
2748 0 : input_layers: &[ResidentDeltaLayer],
2749 0 : ctx: &RequestContext,
2750 0 : ) -> anyhow::Result<()> {
2751 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2752 :
2753 0 : let mut all_entries = Vec::new();
2754 0 : for dl in input_layers.iter() {
2755 0 : all_entries.extend(dl.load_keys(ctx).await?);
2756 : }
2757 :
2758 : // The current stdlib sorting implementation is designed in a way where it is
2759 : // particularly fast where the slice is made up of sorted sub-ranges.
2760 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2761 :
2762 0 : let mut writer = DeltaLayerWriter::new(
2763 0 : self.timeline.conf,
2764 0 : self.timeline.timeline_id,
2765 0 : self.timeline.tenant_shard_id,
2766 0 : key_range.start,
2767 0 : lsn_range.clone(),
2768 0 : ctx,
2769 0 : )
2770 0 : .await?;
2771 :
2772 0 : let mut dup_values = 0;
2773 0 :
2774 0 : // This iterator walks through all key-value pairs from all the layers
2775 0 : // we're compacting, in key, LSN order.
2776 0 : let mut prev: Option<(Key, Lsn)> = None;
2777 : for &DeltaEntry {
2778 0 : key, lsn, ref val, ..
2779 0 : } in all_entries.iter()
2780 : {
2781 0 : if prev == Some((key, lsn)) {
2782 : // This is a duplicate. Skip it.
2783 : //
2784 : // It can happen if compaction is interrupted after writing some
2785 : // layers but not all, and we are compacting the range again.
2786 : // The calculations in the algorithm assume that there are no
2787 : // duplicates, so the math on targeted file size is likely off,
2788 : // and we will create smaller files than expected.
2789 0 : dup_values += 1;
2790 0 : continue;
2791 0 : }
2792 :
2793 0 : let value = val.load(ctx).await?;
2794 :
2795 0 : writer.put_value(key, lsn, value, ctx).await?;
2796 :
2797 0 : prev = Some((key, lsn));
2798 : }
2799 :
2800 0 : if dup_values > 0 {
2801 0 : warn!("delta layer created with {} duplicate values", dup_values);
2802 0 : }
2803 :
2804 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2805 0 : Err(anyhow::anyhow!(
2806 0 : "failpoint delta-layer-writer-fail-before-finish"
2807 0 : ))
2808 0 : });
2809 :
2810 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2811 0 : let new_delta_layer =
2812 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2813 :
2814 0 : self.new_deltas.push(new_delta_layer);
2815 0 : Ok(())
2816 0 : }
2817 :
2818 0 : async fn delete_layer(
2819 0 : &mut self,
2820 0 : layer: &OwnArc<PersistentLayerDesc>,
2821 0 : _ctx: &RequestContext,
2822 0 : ) -> anyhow::Result<()> {
2823 0 : self.layers_to_delete.push(layer.clone().0);
2824 0 : Ok(())
2825 0 : }
2826 : }
2827 :
2828 : impl TimelineAdaptor {
2829 0 : async fn create_image_impl(
2830 0 : &mut self,
2831 0 : lsn: Lsn,
2832 0 : key_range: &Range<Key>,
2833 0 : ctx: &RequestContext,
2834 0 : ) -> Result<(), CreateImageLayersError> {
2835 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2836 :
2837 0 : let image_layer_writer = ImageLayerWriter::new(
2838 0 : self.timeline.conf,
2839 0 : self.timeline.timeline_id,
2840 0 : self.timeline.tenant_shard_id,
2841 0 : key_range,
2842 0 : lsn,
2843 0 : ctx,
2844 0 : )
2845 0 : .await?;
2846 :
2847 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2848 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2849 0 : "failpoint image-layer-writer-fail-before-finish"
2850 0 : )))
2851 0 : });
2852 :
2853 0 : let keyspace = KeySpace {
2854 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2855 : };
2856 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2857 0 : let start = Key::MIN;
2858 : let ImageLayerCreationOutcome {
2859 0 : image,
2860 : next_start_key: _,
2861 0 : } = self
2862 0 : .timeline
2863 0 : .create_image_layer_for_rel_blocks(
2864 0 : &keyspace,
2865 0 : image_layer_writer,
2866 0 : lsn,
2867 0 : ctx,
2868 0 : key_range.clone(),
2869 0 : start,
2870 0 : )
2871 0 : .await?;
2872 :
2873 0 : if let Some(image_layer) = image {
2874 0 : self.new_images.push(image_layer);
2875 0 : }
2876 :
2877 0 : timer.stop_and_record();
2878 0 :
2879 0 : Ok(())
2880 0 : }
2881 : }
2882 :
2883 : impl CompactionRequestContext for crate::context::RequestContext {}
2884 :
2885 : #[derive(Debug, Clone)]
2886 : pub struct OwnArc<T>(pub Arc<T>);
2887 :
2888 : impl<T> Deref for OwnArc<T> {
2889 : type Target = <Arc<T> as Deref>::Target;
2890 0 : fn deref(&self) -> &Self::Target {
2891 0 : &self.0
2892 0 : }
2893 : }
2894 :
2895 : impl<T> AsRef<T> for OwnArc<T> {
2896 0 : fn as_ref(&self) -> &T {
2897 0 : self.0.as_ref()
2898 0 : }
2899 : }
2900 :
2901 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2902 0 : fn key_range(&self) -> &Range<Key> {
2903 0 : &self.key_range
2904 0 : }
2905 0 : fn lsn_range(&self) -> &Range<Lsn> {
2906 0 : &self.lsn_range
2907 0 : }
2908 0 : fn file_size(&self) -> u64 {
2909 0 : self.file_size
2910 0 : }
2911 0 : fn short_id(&self) -> std::string::String {
2912 0 : self.as_ref().short_id().to_string()
2913 0 : }
2914 0 : fn is_delta(&self) -> bool {
2915 0 : self.as_ref().is_delta()
2916 0 : }
2917 : }
2918 :
2919 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2920 0 : fn key_range(&self) -> &Range<Key> {
2921 0 : &self.layer_desc().key_range
2922 0 : }
2923 0 : fn lsn_range(&self) -> &Range<Lsn> {
2924 0 : &self.layer_desc().lsn_range
2925 0 : }
2926 0 : fn file_size(&self) -> u64 {
2927 0 : self.layer_desc().file_size
2928 0 : }
2929 0 : fn short_id(&self) -> std::string::String {
2930 0 : self.layer_desc().short_id().to_string()
2931 0 : }
2932 0 : fn is_delta(&self) -> bool {
2933 0 : true
2934 0 : }
2935 : }
2936 :
2937 : use crate::tenant::timeline::DeltaEntry;
2938 :
2939 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2940 0 : fn key_range(&self) -> &Range<Key> {
2941 0 : &self.0.layer_desc().key_range
2942 0 : }
2943 0 : fn lsn_range(&self) -> &Range<Lsn> {
2944 0 : &self.0.layer_desc().lsn_range
2945 0 : }
2946 0 : fn file_size(&self) -> u64 {
2947 0 : self.0.layer_desc().file_size
2948 0 : }
2949 0 : fn short_id(&self) -> std::string::String {
2950 0 : self.0.layer_desc().short_id().to_string()
2951 0 : }
2952 0 : fn is_delta(&self) -> bool {
2953 0 : true
2954 0 : }
2955 : }
2956 :
2957 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2958 : type DeltaEntry<'a> = DeltaEntry<'a>;
2959 :
2960 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2961 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
2962 0 : }
2963 : }
2964 :
2965 : impl CompactionLayer<Key> for ResidentImageLayer {
2966 0 : fn key_range(&self) -> &Range<Key> {
2967 0 : &self.0.layer_desc().key_range
2968 0 : }
2969 0 : fn lsn_range(&self) -> &Range<Lsn> {
2970 0 : &self.0.layer_desc().lsn_range
2971 0 : }
2972 0 : fn file_size(&self) -> u64 {
2973 0 : self.0.layer_desc().file_size
2974 0 : }
2975 0 : fn short_id(&self) -> std::string::String {
2976 0 : self.0.layer_desc().short_id().to_string()
2977 0 : }
2978 0 : fn is_delta(&self) -> bool {
2979 0 : false
2980 0 : }
2981 : }
2982 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|