Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use fail::fail_point;
20 : use itertools::Itertools;
21 : use pageserver_api::key::KEY_SIZE;
22 : use pageserver_api::keyspace::ShardedRange;
23 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
24 : use serde::Serialize;
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::{debug, info, info_span, trace, warn, Instrument};
27 : use utils::id::TimelineId;
28 :
29 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
30 : use crate::page_cache;
31 : use crate::statvfs::Statvfs;
32 : use crate::tenant::remote_timeline_client::WaitCompletionError;
33 : use crate::tenant::storage_layer::batch_split_writer::{
34 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
35 : };
36 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
37 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
38 : use crate::tenant::storage_layer::{
39 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
40 : };
41 : use crate::tenant::timeline::ImageLayerCreationOutcome;
42 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
43 : use crate::tenant::timeline::{Layer, ResidentLayer};
44 : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
45 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
46 : use pageserver_api::config::tenant_conf_defaults::{
47 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
48 : };
49 :
50 : use pageserver_api::key::Key;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::record::NeonWalRecord;
53 : use pageserver_api::value::Value;
54 :
55 : use utils::lsn::Lsn;
56 :
57 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
58 : use pageserver_compaction::interface::*;
59 :
60 : use super::CompactionError;
61 :
62 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
63 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
64 :
65 : /// A scheduled compaction task.
66 : pub(crate) struct ScheduledCompactionTask {
67 : /// It's unfortunate that we need to store a compact options struct here because the only outer
68 : /// API we can call here is `compact_with_options` which does a few setup calls before starting the
69 : /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
70 : pub options: CompactOptions,
71 : /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
72 : pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
73 : /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
74 : pub gc_block: Option<gc_block::Guard>,
75 : }
76 :
77 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
78 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
79 : /// called.
80 : #[derive(Debug, Clone)]
81 : pub(crate) struct GcCompactJob {
82 : pub dry_run: bool,
83 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
84 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
85 : pub compact_key_range: Range<Key>,
86 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
87 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
88 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
89 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
90 : pub compact_lsn_range: Range<Lsn>,
91 : }
92 :
93 : impl GcCompactJob {
94 54 : pub fn from_compact_options(options: CompactOptions) -> Self {
95 54 : GcCompactJob {
96 54 : dry_run: options.flags.contains(CompactFlags::DryRun),
97 54 : compact_key_range: options
98 54 : .compact_key_range
99 54 : .map(|x| x.into())
100 54 : .unwrap_or(Key::MIN..Key::MAX),
101 54 : compact_lsn_range: options
102 54 : .compact_lsn_range
103 54 : .map(|x| x.into())
104 54 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
105 54 : }
106 54 : }
107 : }
108 :
109 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
110 : /// and contains the exact layers we want to compact.
111 : pub struct GcCompactionJobDescription {
112 : /// All layers to read in the compaction job
113 : selected_layers: Vec<Layer>,
114 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
115 : /// keep all deltas <= this LSN or generate an image == this LSN.
116 : gc_cutoff: Lsn,
117 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
118 : /// generate an image == this LSN.
119 : retain_lsns_below_horizon: Vec<Lsn>,
120 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
121 : /// \>= this LSN will be kept and will not be rewritten.
122 : max_layer_lsn: Lsn,
123 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
124 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
125 : /// k-merge within gc-compaction.
126 : min_layer_lsn: Lsn,
127 : /// Only compact layers overlapping with this range.
128 : compaction_key_range: Range<Key>,
129 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
130 : /// This field is here solely for debugging. The field will not be read once the compaction
131 : /// description is generated.
132 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
133 : }
134 :
135 : /// The result of bottom-most compaction for a single key at each LSN.
136 : #[derive(Debug)]
137 : #[cfg_attr(test, derive(PartialEq))]
138 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
139 :
140 : /// The result of bottom-most compaction.
141 : #[derive(Debug)]
142 : #[cfg_attr(test, derive(PartialEq))]
143 : pub(crate) struct KeyHistoryRetention {
144 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
145 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
146 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
147 : pub(crate) above_horizon: KeyLogAtLsn,
148 : }
149 :
150 : impl KeyHistoryRetention {
151 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
152 : ///
153 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
154 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
155 : /// And we have branches at LSN 0x10, 0x20, 0x30.
156 : /// Then we delete branch @ 0x20.
157 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
158 : /// And that wouldnt' change the shape of the layer.
159 : ///
160 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
161 : ///
162 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
163 74 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
164 74 : if dry_run {
165 0 : return true;
166 74 : }
167 74 : let guard = tline.layers.read().await;
168 74 : if !guard.contains_key(key) {
169 52 : return false;
170 22 : }
171 22 : let layer_generation = guard.get_from_key(key).metadata().generation;
172 22 : drop(guard);
173 22 : if layer_generation == tline.generation {
174 22 : info!(
175 : key=%key,
176 : ?layer_generation,
177 0 : "discard layer due to duplicated layer key in the same generation",
178 : );
179 22 : true
180 : } else {
181 0 : false
182 : }
183 74 : }
184 :
185 : /// Pipe a history of a single key to the writers.
186 : ///
187 : /// If `image_writer` is none, the images will be placed into the delta layers.
188 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
189 : #[allow(clippy::too_many_arguments)]
190 622 : async fn pipe_to(
191 622 : self,
192 622 : key: Key,
193 622 : delta_writer: &mut SplitDeltaLayerWriter,
194 622 : mut image_writer: Option<&mut SplitImageLayerWriter>,
195 622 : stat: &mut CompactionStatistics,
196 622 : ctx: &RequestContext,
197 622 : ) -> anyhow::Result<()> {
198 622 : let mut first_batch = true;
199 2012 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
200 1390 : if first_batch {
201 622 : if logs.len() == 1 && logs[0].1.is_image() {
202 584 : let Value::Image(img) = &logs[0].1 else {
203 0 : unreachable!()
204 : };
205 584 : stat.produce_image_key(img);
206 584 : if let Some(image_writer) = image_writer.as_mut() {
207 584 : image_writer.put_image(key, img.clone(), ctx).await?;
208 : } else {
209 0 : delta_writer
210 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
211 0 : .await?;
212 : }
213 : } else {
214 66 : for (lsn, val) in logs {
215 28 : stat.produce_key(&val);
216 28 : delta_writer.put_value(key, lsn, val, ctx).await?;
217 : }
218 : }
219 622 : first_batch = false;
220 : } else {
221 884 : for (lsn, val) in logs {
222 116 : stat.produce_key(&val);
223 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
224 : }
225 : }
226 : }
227 622 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
228 680 : for (lsn, val) in above_horizon_logs {
229 58 : stat.produce_key(&val);
230 58 : delta_writer.put_value(key, lsn, val, ctx).await?;
231 : }
232 622 : Ok(())
233 622 : }
234 : }
235 :
236 : #[derive(Debug, Serialize, Default)]
237 : struct CompactionStatisticsNumSize {
238 : num: u64,
239 : size: u64,
240 : }
241 :
242 : #[derive(Debug, Serialize, Default)]
243 : pub struct CompactionStatistics {
244 : delta_layer_visited: CompactionStatisticsNumSize,
245 : image_layer_visited: CompactionStatisticsNumSize,
246 : delta_layer_produced: CompactionStatisticsNumSize,
247 : image_layer_produced: CompactionStatisticsNumSize,
248 : num_delta_layer_discarded: usize,
249 : num_image_layer_discarded: usize,
250 : num_unique_keys_visited: usize,
251 : wal_keys_visited: CompactionStatisticsNumSize,
252 : image_keys_visited: CompactionStatisticsNumSize,
253 : wal_produced: CompactionStatisticsNumSize,
254 : image_produced: CompactionStatisticsNumSize,
255 : }
256 :
257 : impl CompactionStatistics {
258 1042 : fn estimated_size_of_value(val: &Value) -> usize {
259 432 : match val {
260 610 : Value::Image(img) => img.len(),
261 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
262 432 : _ => std::mem::size_of::<NeonWalRecord>(),
263 : }
264 1042 : }
265 1636 : fn estimated_size_of_key() -> usize {
266 1636 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
267 1636 : }
268 86 : fn visit_delta_layer(&mut self, size: u64) {
269 86 : self.delta_layer_visited.num += 1;
270 86 : self.delta_layer_visited.size += size;
271 86 : }
272 66 : fn visit_image_layer(&mut self, size: u64) {
273 66 : self.image_layer_visited.num += 1;
274 66 : self.image_layer_visited.size += size;
275 66 : }
276 622 : fn on_unique_key_visited(&mut self) {
277 622 : self.num_unique_keys_visited += 1;
278 622 : }
279 240 : fn visit_wal_key(&mut self, val: &Value) {
280 240 : self.wal_keys_visited.num += 1;
281 240 : self.wal_keys_visited.size +=
282 240 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
283 240 : }
284 610 : fn visit_image_key(&mut self, val: &Value) {
285 610 : self.image_keys_visited.num += 1;
286 610 : self.image_keys_visited.size +=
287 610 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
288 610 : }
289 202 : fn produce_key(&mut self, val: &Value) {
290 202 : match val {
291 10 : Value::Image(img) => self.produce_image_key(img),
292 192 : Value::WalRecord(_) => self.produce_wal_key(val),
293 : }
294 202 : }
295 192 : fn produce_wal_key(&mut self, val: &Value) {
296 192 : self.wal_produced.num += 1;
297 192 : self.wal_produced.size +=
298 192 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
299 192 : }
300 594 : fn produce_image_key(&mut self, val: &Bytes) {
301 594 : self.image_produced.num += 1;
302 594 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
303 594 : }
304 14 : fn discard_delta_layer(&mut self) {
305 14 : self.num_delta_layer_discarded += 1;
306 14 : }
307 8 : fn discard_image_layer(&mut self) {
308 8 : self.num_image_layer_discarded += 1;
309 8 : }
310 22 : fn produce_delta_layer(&mut self, size: u64) {
311 22 : self.delta_layer_produced.num += 1;
312 22 : self.delta_layer_produced.size += size;
313 22 : }
314 30 : fn produce_image_layer(&mut self, size: u64) {
315 30 : self.image_layer_produced.num += 1;
316 30 : self.image_layer_produced.size += size;
317 30 : }
318 : }
319 :
320 : impl Timeline {
321 : /// TODO: cancellation
322 : ///
323 : /// Returns whether the compaction has pending tasks.
324 364 : pub(crate) async fn compact_legacy(
325 364 : self: &Arc<Self>,
326 364 : cancel: &CancellationToken,
327 364 : options: CompactOptions,
328 364 : ctx: &RequestContext,
329 364 : ) -> Result<bool, CompactionError> {
330 364 : if options
331 364 : .flags
332 364 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
333 : {
334 0 : self.compact_with_gc(cancel, options, ctx)
335 0 : .await
336 0 : .map_err(CompactionError::Other)?;
337 0 : return Ok(false);
338 364 : }
339 364 :
340 364 : if options.flags.contains(CompactFlags::DryRun) {
341 0 : return Err(CompactionError::Other(anyhow!(
342 0 : "dry-run mode is not supported for legacy compaction for now"
343 0 : )));
344 364 : }
345 364 :
346 364 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
347 : // maybe useful in the future? could implement this at some point
348 0 : return Err(CompactionError::Other(anyhow!(
349 0 : "compaction range is not supported for legacy compaction for now"
350 0 : )));
351 364 : }
352 364 :
353 364 : // High level strategy for compaction / image creation:
354 364 : //
355 364 : // 1. First, calculate the desired "partitioning" of the
356 364 : // currently in-use key space. The goal is to partition the
357 364 : // key space into roughly fixed-size chunks, but also take into
358 364 : // account any existing image layers, and try to align the
359 364 : // chunk boundaries with the existing image layers to avoid
360 364 : // too much churn. Also try to align chunk boundaries with
361 364 : // relation boundaries. In principle, we don't know about
362 364 : // relation boundaries here, we just deal with key-value
363 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
364 364 : // map relations into key-value pairs. But in practice we know
365 364 : // that 'field6' is the block number, and the fields 1-5
366 364 : // identify a relation. This is just an optimization,
367 364 : // though.
368 364 : //
369 364 : // 2. Once we know the partitioning, for each partition,
370 364 : // decide if it's time to create a new image layer. The
371 364 : // criteria is: there has been too much "churn" since the last
372 364 : // image layer? The "churn" is fuzzy concept, it's a
373 364 : // combination of too many delta files, or too much WAL in
374 364 : // total in the delta file. Or perhaps: if creating an image
375 364 : // file would allow to delete some older files.
376 364 : //
377 364 : // 3. After that, we compact all level0 delta files if there
378 364 : // are too many of them. While compacting, we also garbage
379 364 : // collect any page versions that are no longer needed because
380 364 : // of the new image layers we created in step 2.
381 364 : //
382 364 : // TODO: This high level strategy hasn't been implemented yet.
383 364 : // Below are functions compact_level0() and create_image_layers()
384 364 : // but they are a bit ad hoc and don't quite work like it's explained
385 364 : // above. Rewrite it.
386 364 :
387 364 : // Is the timeline being deleted?
388 364 : if self.is_stopping() {
389 0 : trace!("Dropping out of compaction on timeline shutdown");
390 0 : return Err(CompactionError::ShuttingDown);
391 364 : }
392 364 :
393 364 : let target_file_size = self.get_checkpoint_distance();
394 :
395 : // Define partitioning schema if needed
396 :
397 : // FIXME: the match should only cover repartitioning, not the next steps
398 364 : let (partition_count, has_pending_tasks) = match self
399 364 : .repartition(
400 364 : self.get_last_record_lsn(),
401 364 : self.get_compaction_target_size(),
402 364 : options.flags,
403 364 : ctx,
404 364 : )
405 364 : .await
406 : {
407 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
408 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
409 364 : let image_ctx = RequestContextBuilder::extend(ctx)
410 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
411 364 : .build();
412 364 :
413 364 : // 2. Compact
414 364 : let timer = self.metrics.compact_time_histo.start_timer();
415 364 : let fully_compacted = self
416 364 : .compact_level0(
417 364 : target_file_size,
418 364 : options.flags.contains(CompactFlags::ForceL0Compaction),
419 364 : ctx,
420 364 : )
421 364 : .await?;
422 364 : timer.stop_and_record();
423 364 :
424 364 : let mut partitioning = dense_partitioning;
425 364 : partitioning
426 364 : .parts
427 364 : .extend(sparse_partitioning.into_dense().parts);
428 364 :
429 364 : // 3. Create new image layers for partitions that have been modified
430 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
431 364 : if fully_compacted {
432 364 : let image_layers = self
433 364 : .create_image_layers(
434 364 : &partitioning,
435 364 : lsn,
436 364 : if options
437 364 : .flags
438 364 : .contains(CompactFlags::ForceImageLayerCreation)
439 : {
440 14 : ImageLayerCreationMode::Force
441 : } else {
442 350 : ImageLayerCreationMode::Try
443 : },
444 364 : &image_ctx,
445 364 : )
446 364 : .await?;
447 :
448 364 : self.upload_new_image_layers(image_layers)?;
449 : } else {
450 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
451 : }
452 364 : (partitioning.parts.len(), !fully_compacted)
453 : }
454 0 : Err(err) => {
455 0 : // no partitioning? This is normal, if the timeline was just created
456 0 : // as an empty timeline. Also in unit tests, when we use the timeline
457 0 : // as a simple key-value store, ignoring the datadir layout. Log the
458 0 : // error but continue.
459 0 : //
460 0 : // Suppress error when it's due to cancellation
461 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
462 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
463 0 : }
464 0 : (1, false)
465 : }
466 : };
467 :
468 364 : if self.shard_identity.count >= ShardCount::new(2) {
469 : // Limit the number of layer rewrites to the number of partitions: this means its
470 : // runtime should be comparable to a full round of image layer creations, rather than
471 : // being potentially much longer.
472 0 : let rewrite_max = partition_count;
473 0 :
474 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
475 364 : }
476 :
477 364 : Ok(has_pending_tasks)
478 364 : }
479 :
480 : /// Check for layers that are elegible to be rewritten:
481 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
482 : /// we don't indefinitely retain keys in this shard that aren't needed.
483 : /// - For future use: layers beyond pitr_interval that are in formats we would
484 : /// rather not maintain compatibility with indefinitely.
485 : ///
486 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
487 : /// how much work it will try to do in each compaction pass.
488 0 : async fn compact_shard_ancestors(
489 0 : self: &Arc<Self>,
490 0 : rewrite_max: usize,
491 0 : ctx: &RequestContext,
492 0 : ) -> Result<(), CompactionError> {
493 0 : let mut drop_layers = Vec::new();
494 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
495 0 :
496 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
497 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
498 0 : // pitr_interval, for example because a branchpoint references it.
499 0 : //
500 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
501 0 : // are rewriting layers.
502 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
503 0 :
504 0 : tracing::info!(
505 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
506 0 : *latest_gc_cutoff,
507 0 : self.gc_info.read().unwrap().cutoffs.time
508 : );
509 :
510 0 : let layers = self.layers.read().await;
511 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
512 0 : let layer = layers.get_from_desc(&layer_desc);
513 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
514 : // This layer does not belong to a historic ancestor, no need to re-image it.
515 0 : continue;
516 0 : }
517 0 :
518 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
519 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
520 0 : let layer_local_page_count = sharded_range.page_count();
521 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
522 0 : if layer_local_page_count == 0 {
523 : // This ancestral layer only covers keys that belong to other shards.
524 : // We include the full metadata in the log: if we had some critical bug that caused
525 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
526 0 : info!(%layer, old_metadata=?layer.metadata(),
527 0 : "dropping layer after shard split, contains no keys for this shard.",
528 : );
529 :
530 0 : if cfg!(debug_assertions) {
531 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
532 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
533 : // should be !is_key_disposable()
534 0 : let range = layer_desc.get_key_range();
535 0 : let mut key = range.start;
536 0 : while key < range.end {
537 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
538 0 : key = key.next();
539 : }
540 0 : }
541 :
542 0 : drop_layers.push(layer);
543 0 : continue;
544 0 : } else if layer_local_page_count != u32::MAX
545 0 : && layer_local_page_count == layer_raw_page_count
546 : {
547 0 : debug!(%layer,
548 0 : "layer is entirely shard local ({} keys), no need to filter it",
549 : layer_local_page_count
550 : );
551 0 : continue;
552 0 : }
553 0 :
554 0 : // Don't bother re-writing a layer unless it will at least halve its size
555 0 : if layer_local_page_count != u32::MAX
556 0 : && layer_local_page_count > layer_raw_page_count / 2
557 : {
558 0 : debug!(%layer,
559 0 : "layer is already mostly local ({}/{}), not rewriting",
560 : layer_local_page_count,
561 : layer_raw_page_count
562 : );
563 0 : }
564 :
565 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
566 : // without incurring the I/O cost of a rewrite.
567 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
568 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
569 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
570 0 : continue;
571 0 : }
572 0 :
573 0 : if layer_desc.is_delta() {
574 : // We do not yet implement rewrite of delta layers
575 0 : debug!(%layer, "Skipping rewrite of delta layer");
576 0 : continue;
577 0 : }
578 0 :
579 0 : // Only rewrite layers if their generations differ. This guarantees:
580 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
581 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
582 0 : if layer.metadata().generation == self.generation {
583 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
584 0 : continue;
585 0 : }
586 0 :
587 0 : if layers_to_rewrite.len() >= rewrite_max {
588 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
589 0 : layers_to_rewrite.len()
590 : );
591 0 : continue;
592 0 : }
593 0 :
594 0 : // Fall through: all our conditions for doing a rewrite passed.
595 0 : layers_to_rewrite.push(layer);
596 : }
597 :
598 : // Drop read lock on layer map before we start doing time-consuming I/O
599 0 : drop(layers);
600 0 :
601 0 : let mut replace_image_layers = Vec::new();
602 :
603 0 : for layer in layers_to_rewrite {
604 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
605 0 : let mut image_layer_writer = ImageLayerWriter::new(
606 0 : self.conf,
607 0 : self.timeline_id,
608 0 : self.tenant_shard_id,
609 0 : &layer.layer_desc().key_range,
610 0 : layer.layer_desc().image_layer_lsn(),
611 0 : ctx,
612 0 : )
613 0 : .await
614 0 : .map_err(CompactionError::Other)?;
615 :
616 : // Safety of layer rewrites:
617 : // - We are writing to a different local file path than we are reading from, so the old Layer
618 : // cannot interfere with the new one.
619 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
620 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
621 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
622 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
623 : // - Any readers that have a reference to the old layer will keep it alive until they are done
624 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
625 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
626 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
627 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
628 : // - ingestion, which only inserts layers, therefore cannot collide with us.
629 0 : let resident = layer.download_and_keep_resident().await?;
630 :
631 0 : let keys_written = resident
632 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
633 0 : .await?;
634 :
635 0 : if keys_written > 0 {
636 0 : let (desc, path) = image_layer_writer
637 0 : .finish(ctx)
638 0 : .await
639 0 : .map_err(CompactionError::Other)?;
640 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
641 0 : .map_err(CompactionError::Other)?;
642 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
643 0 : layer.metadata().file_size,
644 0 : new_layer.metadata().file_size);
645 :
646 0 : replace_image_layers.push((layer, new_layer));
647 0 : } else {
648 0 : // Drop the old layer. Usually for this case we would already have noticed that
649 0 : // the layer has no data for us with the ShardedRange check above, but
650 0 : drop_layers.push(layer);
651 0 : }
652 : }
653 :
654 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
655 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
656 : // to remote index) and be removed. This is inefficient but safe.
657 0 : fail::fail_point!("compact-shard-ancestors-localonly");
658 0 :
659 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
660 0 : self.rewrite_layers(replace_image_layers, drop_layers)
661 0 : .await?;
662 :
663 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
664 0 :
665 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
666 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
667 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
668 0 : // load.
669 0 : match self.remote_client.wait_completion().await {
670 0 : Ok(()) => (),
671 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
672 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
673 0 : return Err(CompactionError::ShuttingDown)
674 : }
675 : }
676 :
677 0 : fail::fail_point!("compact-shard-ancestors-persistent");
678 0 :
679 0 : Ok(())
680 0 : }
681 :
682 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
683 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
684 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
685 : ///
686 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
687 : /// that we know won't be needed for reads.
688 202 : pub(super) async fn update_layer_visibility(
689 202 : &self,
690 202 : ) -> Result<(), super::layer_manager::Shutdown> {
691 202 : let head_lsn = self.get_last_record_lsn();
692 :
693 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
694 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
695 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
696 : // they will be subject to L0->L1 compaction in the near future.
697 202 : let layer_manager = self.layers.read().await;
698 202 : let layer_map = layer_manager.layer_map()?;
699 :
700 202 : let readable_points = {
701 202 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
702 202 :
703 202 : let mut readable_points = Vec::with_capacity(children.len() + 1);
704 202 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
705 0 : if *is_offloaded == MaybeOffloaded::Yes {
706 0 : continue;
707 0 : }
708 0 : readable_points.push(*child_lsn);
709 : }
710 202 : readable_points.push(head_lsn);
711 202 : readable_points
712 202 : };
713 202 :
714 202 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
715 524 : for (layer_desc, visibility) in layer_visibility {
716 322 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
717 322 : let layer = layer_manager.get_from_desc(&layer_desc);
718 322 : layer.set_visibility(visibility);
719 322 : }
720 :
721 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
722 : // avoid assuming that everything at a branch point is visible.
723 202 : drop(covered);
724 202 : Ok(())
725 202 : }
726 :
727 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
728 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
729 364 : async fn compact_level0(
730 364 : self: &Arc<Self>,
731 364 : target_file_size: u64,
732 364 : force_compaction_ignore_threshold: bool,
733 364 : ctx: &RequestContext,
734 364 : ) -> Result<bool, CompactionError> {
735 : let CompactLevel0Phase1Result {
736 364 : new_layers,
737 364 : deltas_to_compact,
738 364 : fully_compacted,
739 : } = {
740 364 : let phase1_span = info_span!("compact_level0_phase1");
741 364 : let ctx = ctx.attached_child();
742 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
743 364 : version: Some(2),
744 364 : tenant_id: Some(self.tenant_shard_id),
745 364 : timeline_id: Some(self.timeline_id),
746 364 : ..Default::default()
747 364 : };
748 364 :
749 364 : let begin = tokio::time::Instant::now();
750 364 : let phase1_layers_locked = self.layers.read().await;
751 364 : let now = tokio::time::Instant::now();
752 364 : stats.read_lock_acquisition_micros =
753 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
754 364 : self.compact_level0_phase1(
755 364 : phase1_layers_locked,
756 364 : stats,
757 364 : target_file_size,
758 364 : force_compaction_ignore_threshold,
759 364 : &ctx,
760 364 : )
761 364 : .instrument(phase1_span)
762 364 : .await?
763 : };
764 :
765 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
766 : // nothing to do
767 336 : return Ok(true);
768 28 : }
769 28 :
770 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
771 28 : .await?;
772 28 : Ok(fully_compacted)
773 364 : }
774 :
775 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
776 364 : async fn compact_level0_phase1<'a>(
777 364 : self: &'a Arc<Self>,
778 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
779 364 : mut stats: CompactLevel0Phase1StatsBuilder,
780 364 : target_file_size: u64,
781 364 : force_compaction_ignore_threshold: bool,
782 364 : ctx: &RequestContext,
783 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
784 364 : stats.read_lock_held_spawn_blocking_startup_micros =
785 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
786 364 : let layers = guard.layer_map()?;
787 364 : let level0_deltas = layers.level0_deltas();
788 364 : stats.level0_deltas_count = Some(level0_deltas.len());
789 364 :
790 364 : // Only compact if enough layers have accumulated.
791 364 : let threshold = self.get_compaction_threshold();
792 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
793 336 : if force_compaction_ignore_threshold {
794 0 : if !level0_deltas.is_empty() {
795 0 : info!(
796 0 : level0_deltas = level0_deltas.len(),
797 0 : threshold, "too few deltas to compact, but forcing compaction"
798 : );
799 : } else {
800 0 : info!(
801 0 : level0_deltas = level0_deltas.len(),
802 0 : threshold, "too few deltas to compact, cannot force compaction"
803 : );
804 0 : return Ok(CompactLevel0Phase1Result::default());
805 : }
806 : } else {
807 336 : debug!(
808 0 : level0_deltas = level0_deltas.len(),
809 0 : threshold, "too few deltas to compact"
810 : );
811 336 : return Ok(CompactLevel0Phase1Result::default());
812 : }
813 28 : }
814 :
815 28 : let mut level0_deltas = level0_deltas
816 28 : .iter()
817 402 : .map(|x| guard.get_from_desc(x))
818 28 : .collect::<Vec<_>>();
819 28 :
820 28 : // Gather the files to compact in this iteration.
821 28 : //
822 28 : // Start with the oldest Level 0 delta file, and collect any other
823 28 : // level 0 files that form a contiguous sequence, such that the end
824 28 : // LSN of previous file matches the start LSN of the next file.
825 28 : //
826 28 : // Note that if the files don't form such a sequence, we might
827 28 : // "compact" just a single file. That's a bit pointless, but it allows
828 28 : // us to get rid of the level 0 file, and compact the other files on
829 28 : // the next iteration. This could probably made smarter, but such
830 28 : // "gaps" in the sequence of level 0 files should only happen in case
831 28 : // of a crash, partial download from cloud storage, or something like
832 28 : // that, so it's not a big deal in practice.
833 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
834 28 : let mut level0_deltas_iter = level0_deltas.iter();
835 28 :
836 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
837 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
838 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
839 28 :
840 28 : // Accumulate the size of layers in `deltas_to_compact`
841 28 : let mut deltas_to_compact_bytes = 0;
842 28 :
843 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
844 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
845 28 : // work in this function to only operate on this much delta data at once.
846 28 : //
847 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
848 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
849 28 : // still let them compact a full stack of L0s in one go.
850 28 : let delta_size_limit = std::cmp::max(
851 28 : self.get_compaction_threshold(),
852 28 : DEFAULT_COMPACTION_THRESHOLD,
853 28 : ) as u64
854 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
855 28 :
856 28 : let mut fully_compacted = true;
857 28 :
858 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
859 402 : for l in level0_deltas_iter {
860 374 : let lsn_range = &l.layer_desc().lsn_range;
861 374 :
862 374 : if lsn_range.start != prev_lsn_end {
863 0 : break;
864 374 : }
865 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
866 374 : deltas_to_compact_bytes += l.metadata().file_size;
867 374 : prev_lsn_end = lsn_range.end;
868 374 :
869 374 : if deltas_to_compact_bytes >= delta_size_limit {
870 0 : info!(
871 0 : l0_deltas_selected = deltas_to_compact.len(),
872 0 : l0_deltas_total = level0_deltas.len(),
873 0 : "L0 compaction picker hit max delta layer size limit: {}",
874 : delta_size_limit
875 : );
876 0 : fully_compacted = false;
877 0 :
878 0 : // Proceed with compaction, but only a subset of L0s
879 0 : break;
880 374 : }
881 : }
882 28 : let lsn_range = Range {
883 28 : start: deltas_to_compact
884 28 : .first()
885 28 : .unwrap()
886 28 : .layer_desc()
887 28 : .lsn_range
888 28 : .start,
889 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
890 28 : };
891 28 :
892 28 : info!(
893 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
894 0 : lsn_range.start,
895 0 : lsn_range.end,
896 0 : deltas_to_compact.len(),
897 0 : level0_deltas.len()
898 : );
899 :
900 402 : for l in deltas_to_compact.iter() {
901 402 : info!("compact includes {l}");
902 : }
903 :
904 : // We don't need the original list of layers anymore. Drop it so that
905 : // we don't accidentally use it later in the function.
906 28 : drop(level0_deltas);
907 28 :
908 28 : stats.read_lock_held_prerequisites_micros = stats
909 28 : .read_lock_held_spawn_blocking_startup_micros
910 28 : .till_now();
911 :
912 : // TODO: replace with streaming k-merge
913 28 : let all_keys = {
914 28 : let mut all_keys = Vec::new();
915 402 : for l in deltas_to_compact.iter() {
916 402 : if self.cancel.is_cancelled() {
917 0 : return Err(CompactionError::ShuttingDown);
918 402 : }
919 402 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
920 402 : let keys = delta
921 402 : .index_entries(ctx)
922 402 : .await
923 402 : .map_err(CompactionError::Other)?;
924 402 : all_keys.extend(keys);
925 : }
926 : // The current stdlib sorting implementation is designed in a way where it is
927 : // particularly fast where the slice is made up of sorted sub-ranges.
928 4423826 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
929 28 : all_keys
930 28 : };
931 28 :
932 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
933 :
934 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
935 : //
936 : // A hole is a key range for which this compaction doesn't have any WAL records.
937 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
938 : // cover the hole, but actually don't contain any WAL records for that key range.
939 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
940 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
941 : //
942 : // The algorithm chooses holes as follows.
943 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
944 : // - Filter: min threshold on range length
945 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
946 : //
947 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
948 : #[derive(PartialEq, Eq)]
949 : struct Hole {
950 : key_range: Range<Key>,
951 : coverage_size: usize,
952 : }
953 28 : let holes: Vec<Hole> = {
954 : use std::cmp::Ordering;
955 : impl Ord for Hole {
956 0 : fn cmp(&self, other: &Self) -> Ordering {
957 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
958 0 : }
959 : }
960 : impl PartialOrd for Hole {
961 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
962 0 : Some(self.cmp(other))
963 0 : }
964 : }
965 28 : let max_holes = deltas_to_compact.len();
966 28 : let last_record_lsn = self.get_last_record_lsn();
967 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
968 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
969 28 : // min-heap (reserve space for one more element added before eviction)
970 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
971 28 : let mut prev: Option<Key> = None;
972 :
973 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
974 2064038 : if let Some(prev_key) = prev {
975 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
976 : // compaction is the gap between data key and metadata keys.
977 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
978 0 : && !Key::is_metadata_key(&prev_key)
979 : {
980 0 : let key_range = prev_key..next_key;
981 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
982 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
983 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
984 0 : // That is why it is better to measure size of hole as number of covering image layers.
985 0 : let coverage_size =
986 0 : layers.image_coverage(&key_range, last_record_lsn).len();
987 0 : if coverage_size >= min_hole_coverage_size {
988 0 : heap.push(Hole {
989 0 : key_range,
990 0 : coverage_size,
991 0 : });
992 0 : if heap.len() > max_holes {
993 0 : heap.pop(); // remove smallest hole
994 0 : }
995 0 : }
996 2064010 : }
997 28 : }
998 2064038 : prev = Some(next_key.next());
999 : }
1000 28 : let mut holes = heap.into_vec();
1001 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1002 28 : holes
1003 28 : };
1004 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1005 28 : drop_rlock(guard);
1006 28 :
1007 28 : if self.cancel.is_cancelled() {
1008 0 : return Err(CompactionError::ShuttingDown);
1009 28 : }
1010 28 :
1011 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1012 :
1013 : // This iterator walks through all key-value pairs from all the layers
1014 : // we're compacting, in key, LSN order.
1015 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1016 : // then the Value::Image is ordered before Value::WalRecord.
1017 28 : let mut all_values_iter = {
1018 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1019 402 : for l in deltas_to_compact.iter() {
1020 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1021 402 : deltas.push(l);
1022 : }
1023 28 : MergeIterator::create(&deltas, &[], ctx)
1024 28 : };
1025 28 :
1026 28 : // This iterator walks through all keys and is needed to calculate size used by each key
1027 28 : let mut all_keys_iter = all_keys
1028 28 : .iter()
1029 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1030 2064010 : .coalesce(|mut prev, cur| {
1031 2064010 : // Coalesce keys that belong to the same key pair.
1032 2064010 : // This ensures that compaction doesn't put them
1033 2064010 : // into different layer files.
1034 2064010 : // Still limit this by the target file size,
1035 2064010 : // so that we keep the size of the files in
1036 2064010 : // check.
1037 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
1038 40038 : prev.2 += cur.2;
1039 40038 : Ok(prev)
1040 : } else {
1041 2023972 : Err((prev, cur))
1042 : }
1043 2064010 : });
1044 28 :
1045 28 : // Merge the contents of all the input delta layers into a new set
1046 28 : // of delta layers, based on the current partitioning.
1047 28 : //
1048 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1049 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1050 28 : // would be too large. In that case, we also split on the LSN dimension.
1051 28 : //
1052 28 : // LSN
1053 28 : // ^
1054 28 : // |
1055 28 : // | +-----------+ +--+--+--+--+
1056 28 : // | | | | | | | |
1057 28 : // | +-----------+ | | | | |
1058 28 : // | | | | | | | |
1059 28 : // | +-----------+ ==> | | | | |
1060 28 : // | | | | | | | |
1061 28 : // | +-----------+ | | | | |
1062 28 : // | | | | | | | |
1063 28 : // | +-----------+ +--+--+--+--+
1064 28 : // |
1065 28 : // +--------------> key
1066 28 : //
1067 28 : //
1068 28 : // If one key (X) has a lot of page versions:
1069 28 : //
1070 28 : // LSN
1071 28 : // ^
1072 28 : // | (X)
1073 28 : // | +-----------+ +--+--+--+--+
1074 28 : // | | | | | | | |
1075 28 : // | +-----------+ | | +--+ |
1076 28 : // | | | | | | | |
1077 28 : // | +-----------+ ==> | | | | |
1078 28 : // | | | | | +--+ |
1079 28 : // | +-----------+ | | | | |
1080 28 : // | | | | | | | |
1081 28 : // | +-----------+ +--+--+--+--+
1082 28 : // |
1083 28 : // +--------------> key
1084 28 : // TODO: this actually divides the layers into fixed-size chunks, not
1085 28 : // based on the partitioning.
1086 28 : //
1087 28 : // TODO: we should also opportunistically materialize and
1088 28 : // garbage collect what we can.
1089 28 : let mut new_layers = Vec::new();
1090 28 : let mut prev_key: Option<Key> = None;
1091 28 : let mut writer: Option<DeltaLayerWriter> = None;
1092 28 : let mut key_values_total_size = 0u64;
1093 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1094 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1095 28 : let mut next_hole = 0; // index of next hole in holes vector
1096 28 :
1097 28 : let mut keys = 0;
1098 :
1099 2064066 : while let Some((key, lsn, value)) = all_values_iter
1100 2064066 : .next()
1101 2064066 : .await
1102 2064066 : .map_err(CompactionError::Other)?
1103 : {
1104 2064038 : keys += 1;
1105 2064038 :
1106 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1107 : // avoid hitting the cancellation token on every key. in benches, we end up
1108 : // shuffling an order of million keys per layer, this means we'll check it
1109 : // around tens of times per layer.
1110 0 : return Err(CompactionError::ShuttingDown);
1111 2064038 : }
1112 2064038 :
1113 2064038 : let same_key = prev_key == Some(key);
1114 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1115 2064038 : if !same_key || lsn == dup_end_lsn {
1116 2024000 : let mut next_key_size = 0u64;
1117 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1118 2024000 : dup_start_lsn = Lsn::INVALID;
1119 2024000 : if !same_key {
1120 2024000 : dup_end_lsn = Lsn::INVALID;
1121 2024000 : }
1122 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1123 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1124 2024000 : next_key_size = next_size;
1125 2024000 : if key != next_key {
1126 2023972 : if dup_end_lsn.is_valid() {
1127 0 : // We are writting segment with duplicates:
1128 0 : // place all remaining values of this key in separate segment
1129 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1130 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1131 2023972 : }
1132 2023972 : break;
1133 28 : }
1134 28 : key_values_total_size += next_size;
1135 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1136 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1137 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1138 : // Split key between multiple layers: such layer can contain only single key
1139 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1140 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1141 : } else {
1142 0 : lsn // start with the first LSN for this key
1143 : };
1144 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1145 0 : break;
1146 28 : }
1147 : }
1148 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1149 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1150 0 : dup_start_lsn = dup_end_lsn;
1151 0 : dup_end_lsn = lsn_range.end;
1152 2024000 : }
1153 2024000 : if writer.is_some() {
1154 2023972 : let written_size = writer.as_mut().unwrap().size();
1155 2023972 : let contains_hole =
1156 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1157 : // check if key cause layer overflow or contains hole...
1158 2023972 : if is_dup_layer
1159 2023972 : || dup_end_lsn.is_valid()
1160 2023972 : || written_size + key_values_total_size > target_file_size
1161 2023692 : || contains_hole
1162 : {
1163 : // ... if so, flush previous layer and prepare to write new one
1164 280 : let (desc, path) = writer
1165 280 : .take()
1166 280 : .unwrap()
1167 280 : .finish(prev_key.unwrap().next(), ctx)
1168 280 : .await
1169 280 : .map_err(CompactionError::Other)?;
1170 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1171 280 : .map_err(CompactionError::Other)?;
1172 :
1173 280 : new_layers.push(new_delta);
1174 280 : writer = None;
1175 280 :
1176 280 : if contains_hole {
1177 0 : // skip hole
1178 0 : next_hole += 1;
1179 280 : }
1180 2023692 : }
1181 28 : }
1182 : // Remember size of key value because at next iteration we will access next item
1183 2024000 : key_values_total_size = next_key_size;
1184 40038 : }
1185 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1186 0 : Err(CompactionError::Other(anyhow::anyhow!(
1187 0 : "failpoint delta-layer-writer-fail-before-finish"
1188 0 : )))
1189 2064038 : });
1190 :
1191 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1192 2064038 : if writer.is_none() {
1193 308 : if self.cancel.is_cancelled() {
1194 : // to be somewhat responsive to cancellation, check for each new layer
1195 0 : return Err(CompactionError::ShuttingDown);
1196 308 : }
1197 : // Create writer if not initiaized yet
1198 308 : writer = Some(
1199 : DeltaLayerWriter::new(
1200 308 : self.conf,
1201 308 : self.timeline_id,
1202 308 : self.tenant_shard_id,
1203 308 : key,
1204 308 : if dup_end_lsn.is_valid() {
1205 : // this is a layer containing slice of values of the same key
1206 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1207 0 : dup_start_lsn..dup_end_lsn
1208 : } else {
1209 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1210 308 : lsn_range.clone()
1211 : },
1212 308 : ctx,
1213 308 : )
1214 308 : .await
1215 308 : .map_err(CompactionError::Other)?,
1216 : );
1217 :
1218 308 : keys = 0;
1219 2063730 : }
1220 :
1221 2064038 : writer
1222 2064038 : .as_mut()
1223 2064038 : .unwrap()
1224 2064038 : .put_value(key, lsn, value, ctx)
1225 2064038 : .await
1226 2064038 : .map_err(CompactionError::Other)?;
1227 : } else {
1228 0 : let shard = self.shard_identity.shard_index();
1229 0 : let owner = self.shard_identity.get_shard_number(&key);
1230 0 : if cfg!(debug_assertions) {
1231 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
1232 0 : }
1233 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
1234 : }
1235 :
1236 2064038 : if !new_layers.is_empty() {
1237 19786 : fail_point!("after-timeline-compacted-first-L1");
1238 2044252 : }
1239 :
1240 2064038 : prev_key = Some(key);
1241 : }
1242 28 : if let Some(writer) = writer {
1243 28 : let (desc, path) = writer
1244 28 : .finish(prev_key.unwrap().next(), ctx)
1245 28 : .await
1246 28 : .map_err(CompactionError::Other)?;
1247 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1248 28 : .map_err(CompactionError::Other)?;
1249 28 : new_layers.push(new_delta);
1250 0 : }
1251 :
1252 : // Sync layers
1253 28 : if !new_layers.is_empty() {
1254 : // Print a warning if the created layer is larger than double the target size
1255 : // Add two pages for potential overhead. This should in theory be already
1256 : // accounted for in the target calculation, but for very small targets,
1257 : // we still might easily hit the limit otherwise.
1258 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1259 308 : for layer in new_layers.iter() {
1260 308 : if layer.layer_desc().file_size > warn_limit {
1261 0 : warn!(
1262 : %layer,
1263 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1264 : );
1265 308 : }
1266 : }
1267 :
1268 : // The writer.finish() above already did the fsync of the inodes.
1269 : // We just need to fsync the directory in which these inodes are linked,
1270 : // which we know to be the timeline directory.
1271 : //
1272 : // We use fatal_err() below because the after writer.finish() returns with success,
1273 : // the in-memory state of the filesystem already has the layer file in its final place,
1274 : // and subsequent pageserver code could think it's durable while it really isn't.
1275 28 : let timeline_dir = VirtualFile::open(
1276 28 : &self
1277 28 : .conf
1278 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1279 28 : ctx,
1280 28 : )
1281 28 : .await
1282 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1283 28 : timeline_dir
1284 28 : .sync_all()
1285 28 : .await
1286 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1287 0 : }
1288 :
1289 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1290 28 : stats.new_deltas_count = Some(new_layers.len());
1291 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1292 28 :
1293 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1294 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1295 : {
1296 28 : Ok(stats_json) => {
1297 28 : info!(
1298 0 : stats_json = stats_json.as_str(),
1299 0 : "compact_level0_phase1 stats available"
1300 : )
1301 : }
1302 0 : Err(e) => {
1303 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1304 : }
1305 : }
1306 :
1307 : // Without this, rustc complains about deltas_to_compact still
1308 : // being borrowed when we `.into_iter()` below.
1309 28 : drop(all_values_iter);
1310 28 :
1311 28 : Ok(CompactLevel0Phase1Result {
1312 28 : new_layers,
1313 28 : deltas_to_compact: deltas_to_compact
1314 28 : .into_iter()
1315 402 : .map(|x| x.drop_eviction_guard())
1316 28 : .collect::<Vec<_>>(),
1317 28 : fully_compacted,
1318 28 : })
1319 364 : }
1320 : }
1321 :
1322 : #[derive(Default)]
1323 : struct CompactLevel0Phase1Result {
1324 : new_layers: Vec<ResidentLayer>,
1325 : deltas_to_compact: Vec<Layer>,
1326 : // Whether we have included all L0 layers, or selected only part of them due to the
1327 : // L0 compaction size limit.
1328 : fully_compacted: bool,
1329 : }
1330 :
1331 : #[derive(Default)]
1332 : struct CompactLevel0Phase1StatsBuilder {
1333 : version: Option<u64>,
1334 : tenant_id: Option<TenantShardId>,
1335 : timeline_id: Option<TimelineId>,
1336 : read_lock_acquisition_micros: DurationRecorder,
1337 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1338 : read_lock_held_key_sort_micros: DurationRecorder,
1339 : read_lock_held_prerequisites_micros: DurationRecorder,
1340 : read_lock_held_compute_holes_micros: DurationRecorder,
1341 : read_lock_drop_micros: DurationRecorder,
1342 : write_layer_files_micros: DurationRecorder,
1343 : level0_deltas_count: Option<usize>,
1344 : new_deltas_count: Option<usize>,
1345 : new_deltas_size: Option<u64>,
1346 : }
1347 :
1348 : #[derive(serde::Serialize)]
1349 : struct CompactLevel0Phase1Stats {
1350 : version: u64,
1351 : tenant_id: TenantShardId,
1352 : timeline_id: TimelineId,
1353 : read_lock_acquisition_micros: RecordedDuration,
1354 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1355 : read_lock_held_key_sort_micros: RecordedDuration,
1356 : read_lock_held_prerequisites_micros: RecordedDuration,
1357 : read_lock_held_compute_holes_micros: RecordedDuration,
1358 : read_lock_drop_micros: RecordedDuration,
1359 : write_layer_files_micros: RecordedDuration,
1360 : level0_deltas_count: usize,
1361 : new_deltas_count: usize,
1362 : new_deltas_size: u64,
1363 : }
1364 :
1365 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1366 : type Error = anyhow::Error;
1367 :
1368 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1369 28 : Ok(Self {
1370 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1371 28 : tenant_id: value
1372 28 : .tenant_id
1373 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1374 28 : timeline_id: value
1375 28 : .timeline_id
1376 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1377 28 : read_lock_acquisition_micros: value
1378 28 : .read_lock_acquisition_micros
1379 28 : .into_recorded()
1380 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1381 28 : read_lock_held_spawn_blocking_startup_micros: value
1382 28 : .read_lock_held_spawn_blocking_startup_micros
1383 28 : .into_recorded()
1384 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1385 28 : read_lock_held_key_sort_micros: value
1386 28 : .read_lock_held_key_sort_micros
1387 28 : .into_recorded()
1388 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1389 28 : read_lock_held_prerequisites_micros: value
1390 28 : .read_lock_held_prerequisites_micros
1391 28 : .into_recorded()
1392 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1393 28 : read_lock_held_compute_holes_micros: value
1394 28 : .read_lock_held_compute_holes_micros
1395 28 : .into_recorded()
1396 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1397 28 : read_lock_drop_micros: value
1398 28 : .read_lock_drop_micros
1399 28 : .into_recorded()
1400 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1401 28 : write_layer_files_micros: value
1402 28 : .write_layer_files_micros
1403 28 : .into_recorded()
1404 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1405 28 : level0_deltas_count: value
1406 28 : .level0_deltas_count
1407 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1408 28 : new_deltas_count: value
1409 28 : .new_deltas_count
1410 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1411 28 : new_deltas_size: value
1412 28 : .new_deltas_size
1413 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1414 : })
1415 28 : }
1416 : }
1417 :
1418 : impl Timeline {
1419 : /// Entry point for new tiered compaction algorithm.
1420 : ///
1421 : /// All the real work is in the implementation in the pageserver_compaction
1422 : /// crate. The code here would apply to any algorithm implemented by the
1423 : /// same interface, but tiered is the only one at the moment.
1424 : ///
1425 : /// TODO: cancellation
1426 0 : pub(crate) async fn compact_tiered(
1427 0 : self: &Arc<Self>,
1428 0 : _cancel: &CancellationToken,
1429 0 : ctx: &RequestContext,
1430 0 : ) -> Result<(), CompactionError> {
1431 0 : let fanout = self.get_compaction_threshold() as u64;
1432 0 : let target_file_size = self.get_checkpoint_distance();
1433 :
1434 : // Find the top of the historical layers
1435 0 : let end_lsn = {
1436 0 : let guard = self.layers.read().await;
1437 0 : let layers = guard.layer_map()?;
1438 :
1439 0 : let l0_deltas = layers.level0_deltas();
1440 0 :
1441 0 : // As an optimization, if we find that there are too few L0 layers,
1442 0 : // bail out early. We know that the compaction algorithm would do
1443 0 : // nothing in that case.
1444 0 : if l0_deltas.len() < fanout as usize {
1445 : // doesn't need compacting
1446 0 : return Ok(());
1447 0 : }
1448 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1449 0 : };
1450 0 :
1451 0 : // Is the timeline being deleted?
1452 0 : if self.is_stopping() {
1453 0 : trace!("Dropping out of compaction on timeline shutdown");
1454 0 : return Err(CompactionError::ShuttingDown);
1455 0 : }
1456 :
1457 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1458 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1459 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1460 0 :
1461 0 : pageserver_compaction::compact_tiered::compact_tiered(
1462 0 : &mut adaptor,
1463 0 : end_lsn,
1464 0 : target_file_size,
1465 0 : fanout,
1466 0 : ctx,
1467 0 : )
1468 0 : .await
1469 : // TODO: compact_tiered needs to return CompactionError
1470 0 : .map_err(CompactionError::Other)?;
1471 :
1472 0 : adaptor.flush_updates().await?;
1473 0 : Ok(())
1474 0 : }
1475 :
1476 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1477 : ///
1478 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1479 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1480 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1481 : ///
1482 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1483 : ///
1484 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1485 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1486 : ///
1487 : /// The function will produce:
1488 : ///
1489 : /// ```plain
1490 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1491 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1492 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1493 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1494 : /// ```
1495 : ///
1496 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1497 630 : pub(crate) async fn generate_key_retention(
1498 630 : self: &Arc<Timeline>,
1499 630 : key: Key,
1500 630 : full_history: &[(Key, Lsn, Value)],
1501 630 : horizon: Lsn,
1502 630 : retain_lsn_below_horizon: &[Lsn],
1503 630 : delta_threshold_cnt: usize,
1504 630 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1505 630 : ) -> anyhow::Result<KeyHistoryRetention> {
1506 630 : // Pre-checks for the invariants
1507 630 : if cfg!(debug_assertions) {
1508 1530 : for (log_key, _, _) in full_history {
1509 900 : assert_eq!(log_key, &key, "mismatched key");
1510 : }
1511 630 : for i in 1..full_history.len() {
1512 270 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1513 270 : if full_history[i - 1].1 == full_history[i].1 {
1514 0 : assert!(
1515 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1516 0 : "unordered delta/image, or duplicated delta"
1517 : );
1518 270 : }
1519 : }
1520 : // There was an assertion for no base image that checks if the first
1521 : // record in the history is `will_init` before, but it was removed.
1522 : // This is explained in the test cases for generate_key_retention.
1523 : // Search "incomplete history" for more information.
1524 1410 : for lsn in retain_lsn_below_horizon {
1525 780 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1526 : }
1527 630 : for i in 1..retain_lsn_below_horizon.len() {
1528 356 : assert!(
1529 356 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1530 0 : "unordered LSN"
1531 : );
1532 : }
1533 0 : }
1534 630 : let has_ancestor = base_img_from_ancestor.is_some();
1535 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1536 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1537 630 : let (mut split_history, lsn_split_points) = {
1538 630 : let mut split_history = Vec::new();
1539 630 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1540 630 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1541 1410 : for lsn in retain_lsn_below_horizon {
1542 780 : lsn_split_points.push(*lsn);
1543 780 : }
1544 630 : lsn_split_points.push(horizon);
1545 630 : let mut current_idx = 0;
1546 1530 : for item @ (_, lsn, _) in full_history {
1547 1144 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1548 244 : current_idx += 1;
1549 244 : }
1550 900 : split_history[current_idx].push(item);
1551 : }
1552 630 : (split_history, lsn_split_points)
1553 : };
1554 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1555 2670 : for split_for_lsn in &mut split_history {
1556 2040 : let mut prev_lsn = None;
1557 2040 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1558 2040 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1559 900 : if let Some(prev_lsn) = &prev_lsn {
1560 118 : if *prev_lsn == lsn {
1561 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1562 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1563 : // drop this delta and keep the image.
1564 : //
1565 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1566 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1567 : // dropped.
1568 : //
1569 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1570 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1571 0 : continue;
1572 118 : }
1573 782 : }
1574 900 : prev_lsn = Some(lsn);
1575 900 : new_split_for_lsn.push(record);
1576 : }
1577 2040 : *split_for_lsn = new_split_for_lsn;
1578 : }
1579 : // Step 3: generate images when necessary
1580 630 : let mut retention = Vec::with_capacity(split_history.len());
1581 630 : let mut records_since_last_image = 0;
1582 630 : let batch_cnt = split_history.len();
1583 630 : assert!(
1584 630 : batch_cnt >= 2,
1585 0 : "should have at least below + above horizon batches"
1586 : );
1587 630 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1588 630 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1589 42 : replay_history.push((key, lsn, Value::Image(img)));
1590 588 : }
1591 :
1592 : /// Generate debug information for the replay history
1593 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1594 : use std::fmt::Write;
1595 0 : let mut output = String::new();
1596 0 : if let Some((key, _, _)) = replay_history.first() {
1597 0 : write!(output, "key={} ", key).unwrap();
1598 0 : let mut cnt = 0;
1599 0 : for (_, lsn, val) in replay_history {
1600 0 : if val.is_image() {
1601 0 : write!(output, "i@{} ", lsn).unwrap();
1602 0 : } else if val.will_init() {
1603 0 : write!(output, "di@{} ", lsn).unwrap();
1604 0 : } else {
1605 0 : write!(output, "d@{} ", lsn).unwrap();
1606 0 : }
1607 0 : cnt += 1;
1608 0 : if cnt >= 128 {
1609 0 : write!(output, "... and more").unwrap();
1610 0 : break;
1611 0 : }
1612 : }
1613 0 : } else {
1614 0 : write!(output, "<no history>").unwrap();
1615 0 : }
1616 0 : output
1617 0 : }
1618 :
1619 0 : fn generate_debug_trace(
1620 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1621 0 : full_history: &[(Key, Lsn, Value)],
1622 0 : lsns: &[Lsn],
1623 0 : horizon: Lsn,
1624 0 : ) -> String {
1625 : use std::fmt::Write;
1626 0 : let mut output = String::new();
1627 0 : if let Some(replay_history) = replay_history {
1628 0 : writeln!(
1629 0 : output,
1630 0 : "replay_history: {}",
1631 0 : generate_history_trace(replay_history)
1632 0 : )
1633 0 : .unwrap();
1634 0 : } else {
1635 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1636 0 : }
1637 0 : writeln!(
1638 0 : output,
1639 0 : "full_history: {}",
1640 0 : generate_history_trace(full_history)
1641 0 : )
1642 0 : .unwrap();
1643 0 : writeln!(
1644 0 : output,
1645 0 : "when processing: [{}] horizon={}",
1646 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1647 0 : horizon
1648 0 : )
1649 0 : .unwrap();
1650 0 : output
1651 0 : }
1652 :
1653 2040 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1654 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1655 2040 : records_since_last_image += split_for_lsn.len();
1656 2040 : let generate_image = if i == 0 && !has_ancestor {
1657 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1658 588 : true
1659 1452 : } else if i == batch_cnt - 1 {
1660 : // Do not generate images for the last batch (above horizon)
1661 630 : false
1662 822 : } else if records_since_last_image >= delta_threshold_cnt {
1663 : // Generate images when there are too many records
1664 6 : true
1665 : } else {
1666 816 : false
1667 : };
1668 2040 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1669 : // Only retain the items after the last image record
1670 2514 : for idx in (0..replay_history.len()).rev() {
1671 2514 : if replay_history[idx].2.will_init() {
1672 2040 : replay_history = replay_history[idx..].to_vec();
1673 2040 : break;
1674 474 : }
1675 : }
1676 2040 : if let Some((_, _, val)) = replay_history.first() {
1677 2040 : if !val.will_init() {
1678 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1679 0 : || {
1680 0 : generate_debug_trace(
1681 0 : Some(&replay_history),
1682 0 : full_history,
1683 0 : retain_lsn_below_horizon,
1684 0 : horizon,
1685 0 : )
1686 0 : },
1687 0 : );
1688 2040 : }
1689 0 : }
1690 2040 : if generate_image && records_since_last_image > 0 {
1691 594 : records_since_last_image = 0;
1692 594 : let replay_history_for_debug = if cfg!(debug_assertions) {
1693 594 : Some(replay_history.clone())
1694 : } else {
1695 0 : None
1696 : };
1697 594 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1698 594 : let history = std::mem::take(&mut replay_history);
1699 594 : let mut img = None;
1700 594 : let mut records = Vec::with_capacity(history.len());
1701 594 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1702 572 : img = Some((*lsn, val.clone()));
1703 572 : for (_, lsn, val) in history.into_iter().skip(1) {
1704 34 : let Value::WalRecord(rec) = val else {
1705 0 : return Err(anyhow::anyhow!(
1706 0 : "invalid record, first record is image, expect walrecords"
1707 0 : ))
1708 0 : .with_context(|| {
1709 0 : generate_debug_trace(
1710 0 : replay_history_for_debug_ref,
1711 0 : full_history,
1712 0 : retain_lsn_below_horizon,
1713 0 : horizon,
1714 0 : )
1715 0 : });
1716 : };
1717 34 : records.push((lsn, rec));
1718 : }
1719 : } else {
1720 36 : for (_, lsn, val) in history.into_iter() {
1721 36 : let Value::WalRecord(rec) = val else {
1722 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1723 0 : .with_context(|| generate_debug_trace(
1724 0 : replay_history_for_debug_ref,
1725 0 : full_history,
1726 0 : retain_lsn_below_horizon,
1727 0 : horizon,
1728 0 : ));
1729 : };
1730 36 : records.push((lsn, rec));
1731 : }
1732 : }
1733 594 : records.reverse();
1734 594 : let state = ValueReconstructState { img, records };
1735 594 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1736 594 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1737 594 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1738 594 : retention.push(vec![(request_lsn, Value::Image(img))]);
1739 1446 : } else {
1740 1446 : let deltas = split_for_lsn
1741 1446 : .iter()
1742 1446 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1743 1446 : .collect_vec();
1744 1446 : retention.push(deltas);
1745 1446 : }
1746 : }
1747 630 : let mut result = Vec::with_capacity(retention.len());
1748 630 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1749 2040 : for (idx, logs) in retention.into_iter().enumerate() {
1750 2040 : if idx == lsn_split_points.len() {
1751 630 : return Ok(KeyHistoryRetention {
1752 630 : below_horizon: result,
1753 630 : above_horizon: KeyLogAtLsn(logs),
1754 630 : });
1755 1410 : } else {
1756 1410 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1757 1410 : }
1758 : }
1759 0 : unreachable!("key retention is empty")
1760 630 : }
1761 :
1762 : /// Check how much space is left on the disk
1763 52 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
1764 52 : let tenants_dir = self.conf.tenants_path();
1765 :
1766 52 : let stat = Statvfs::get(&tenants_dir, None)
1767 52 : .context("statvfs failed, presumably directory got unlinked")?;
1768 :
1769 52 : let (avail_bytes, _) = stat.get_avail_total_bytes();
1770 52 :
1771 52 : Ok(avail_bytes)
1772 52 : }
1773 :
1774 : /// Check if the compaction can proceed safely without running out of space. We assume the size
1775 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
1776 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
1777 : /// compaction.
1778 52 : async fn check_compaction_space(
1779 52 : self: &Arc<Self>,
1780 52 : layer_selection: &[Layer],
1781 52 : ) -> anyhow::Result<()> {
1782 52 : let available_space = self.check_available_space().await?;
1783 52 : let mut remote_layer_size = 0;
1784 52 : let mut all_layer_size = 0;
1785 204 : for layer in layer_selection {
1786 152 : let needs_download = layer.needs_download().await?;
1787 152 : if needs_download.is_some() {
1788 0 : remote_layer_size += layer.layer_desc().file_size;
1789 152 : }
1790 152 : all_layer_size += layer.layer_desc().file_size;
1791 : }
1792 52 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
1793 52 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
1794 : {
1795 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
1796 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
1797 52 : }
1798 52 : Ok(())
1799 52 : }
1800 :
1801 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
1802 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
1803 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
1804 : /// like a full compaction of the specified keyspace.
1805 0 : pub(crate) async fn gc_compaction_split_jobs(
1806 0 : self: &Arc<Self>,
1807 0 : job: GcCompactJob,
1808 0 : sub_compaction_max_job_size_mb: Option<u64>,
1809 0 : ) -> anyhow::Result<Vec<GcCompactJob>> {
1810 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
1811 0 : job.compact_lsn_range.end
1812 : } else {
1813 0 : *self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
1814 : };
1815 :
1816 : // Split compaction job to about 4GB each
1817 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
1818 0 : let sub_compaction_max_job_size_mb =
1819 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
1820 0 :
1821 0 : let mut compact_jobs = Vec::new();
1822 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
1823 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
1824 0 : let ((dense_ks, sparse_ks), _) = {
1825 0 : let Ok(partition) = self.partitioning.try_lock() else {
1826 0 : bail!("failed to acquire partition lock during gc-compaction");
1827 : };
1828 0 : partition.clone()
1829 : };
1830 : // Truncate the key range to be within user specified compaction range.
1831 0 : fn truncate_to(
1832 0 : source_start: &Key,
1833 0 : source_end: &Key,
1834 0 : target_start: &Key,
1835 0 : target_end: &Key,
1836 0 : ) -> Option<(Key, Key)> {
1837 0 : let start = source_start.max(target_start);
1838 0 : let end = source_end.min(target_end);
1839 0 : if start < end {
1840 0 : Some((*start, *end))
1841 : } else {
1842 0 : None
1843 : }
1844 0 : }
1845 0 : let mut split_key_ranges = Vec::new();
1846 0 : let ranges = dense_ks
1847 0 : .parts
1848 0 : .iter()
1849 0 : .map(|partition| partition.ranges.iter())
1850 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
1851 0 : .flatten()
1852 0 : .cloned()
1853 0 : .collect_vec();
1854 0 : for range in ranges.iter() {
1855 0 : let Some((start, end)) = truncate_to(
1856 0 : &range.start,
1857 0 : &range.end,
1858 0 : &job.compact_key_range.start,
1859 0 : &job.compact_key_range.end,
1860 0 : ) else {
1861 0 : continue;
1862 : };
1863 0 : split_key_ranges.push((start, end));
1864 : }
1865 0 : split_key_ranges.sort();
1866 0 : let guard = self.layers.read().await;
1867 0 : let layer_map = guard.layer_map()?;
1868 0 : let mut current_start = None;
1869 0 : let ranges_num = split_key_ranges.len();
1870 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
1871 0 : if current_start.is_none() {
1872 0 : current_start = Some(start);
1873 0 : }
1874 0 : let start = current_start.unwrap();
1875 0 : if start >= end {
1876 : // We have already processed this partition.
1877 0 : continue;
1878 0 : }
1879 0 : let res = layer_map.range_search(start..end, compact_below_lsn);
1880 0 : let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
1881 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
1882 : // Try to extend the compaction range so that we include at least one full layer file.
1883 0 : let extended_end = res
1884 0 : .found
1885 0 : .keys()
1886 0 : .map(|layer| layer.layer.key_range.end)
1887 0 : .min();
1888 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
1889 : // In this case, we simply use the specified key range end.
1890 0 : let end = if let Some(extended_end) = extended_end {
1891 0 : extended_end.max(end)
1892 : } else {
1893 0 : end
1894 : };
1895 0 : info!(
1896 0 : "splitting compaction job: {}..{}, estimated_size={}",
1897 : start, end, total_size
1898 : );
1899 0 : compact_jobs.push(GcCompactJob {
1900 0 : dry_run: job.dry_run,
1901 0 : compact_key_range: start..end,
1902 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
1903 0 : });
1904 0 : current_start = Some(end);
1905 0 : }
1906 : }
1907 0 : drop(guard);
1908 0 : Ok(compact_jobs)
1909 0 : }
1910 :
1911 : /// An experimental compaction building block that combines compaction with garbage collection.
1912 : ///
1913 : /// The current implementation picks all delta + image layers that are below or intersecting with
1914 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1915 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1916 : /// and create delta layers with all deltas >= gc horizon.
1917 : ///
1918 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
1919 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
1920 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
1921 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
1922 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
1923 : /// part of the range.
1924 : ///
1925 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
1926 : /// the LSN. Otherwise, it will use the gc cutoff by default.
1927 54 : pub(crate) async fn compact_with_gc(
1928 54 : self: &Arc<Self>,
1929 54 : cancel: &CancellationToken,
1930 54 : options: CompactOptions,
1931 54 : ctx: &RequestContext,
1932 54 : ) -> anyhow::Result<()> {
1933 54 : let sub_compaction = options.sub_compaction;
1934 54 : let job = GcCompactJob::from_compact_options(options.clone());
1935 54 : if sub_compaction {
1936 0 : info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
1937 0 : let jobs = self
1938 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
1939 0 : .await?;
1940 0 : let jobs_len = jobs.len();
1941 0 : for (idx, job) in jobs.into_iter().enumerate() {
1942 0 : info!(
1943 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
1944 0 : idx + 1,
1945 : jobs_len
1946 : );
1947 0 : self.compact_with_gc_inner(cancel, job, ctx).await?;
1948 : }
1949 0 : if jobs_len == 0 {
1950 0 : info!("no jobs to run, skipping gc bottom-most compaction");
1951 0 : }
1952 0 : return Ok(());
1953 54 : }
1954 54 : self.compact_with_gc_inner(cancel, job, ctx).await
1955 54 : }
1956 :
1957 54 : async fn compact_with_gc_inner(
1958 54 : self: &Arc<Self>,
1959 54 : cancel: &CancellationToken,
1960 54 : job: GcCompactJob,
1961 54 : ctx: &RequestContext,
1962 54 : ) -> anyhow::Result<()> {
1963 54 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1964 54 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1965 54 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1966 54 :
1967 54 : let gc_lock = async {
1968 54 : tokio::select! {
1969 54 : guard = self.gc_lock.lock() => Ok(guard),
1970 : // TODO: refactor to CompactionError to correctly pass cancelled error
1971 54 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1972 : }
1973 54 : };
1974 :
1975 54 : let gc_lock = crate::timed(
1976 54 : gc_lock,
1977 54 : "acquires gc lock",
1978 54 : std::time::Duration::from_secs(5),
1979 54 : )
1980 54 : .await?;
1981 :
1982 54 : let dry_run = job.dry_run;
1983 54 : let compact_key_range = job.compact_key_range;
1984 54 : let compact_lsn_range = job.compact_lsn_range;
1985 54 :
1986 54 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
1987 :
1988 54 : scopeguard::defer! {
1989 54 : info!("done enhanced gc bottom-most compaction");
1990 54 : };
1991 54 :
1992 54 : let mut stat = CompactionStatistics::default();
1993 :
1994 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1995 : // The layer selection has the following properties:
1996 : // 1. If a layer is in the selection, all layers below it are in the selection.
1997 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1998 52 : let job_desc = {
1999 54 : let guard = self.layers.read().await;
2000 54 : let layers = guard.layer_map()?;
2001 54 : let gc_info = self.gc_info.read().unwrap();
2002 54 : let mut retain_lsns_below_horizon = Vec::new();
2003 54 : let gc_cutoff = {
2004 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2005 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2006 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2007 : // to get the truth data.
2008 54 : let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
2009 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2010 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2011 : // the real cutoff.
2012 54 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2013 48 : real_gc_cutoff
2014 : } else {
2015 6 : compact_lsn_range.end
2016 : };
2017 54 : if gc_cutoff > real_gc_cutoff {
2018 4 : warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
2019 4 : gc_cutoff = real_gc_cutoff;
2020 50 : }
2021 54 : gc_cutoff
2022 : };
2023 70 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
2024 70 : if lsn < &gc_cutoff {
2025 70 : retain_lsns_below_horizon.push(*lsn);
2026 70 : }
2027 : }
2028 54 : for lsn in gc_info.leases.keys() {
2029 0 : if lsn < &gc_cutoff {
2030 0 : retain_lsns_below_horizon.push(*lsn);
2031 0 : }
2032 : }
2033 54 : let mut selected_layers: Vec<Layer> = Vec::new();
2034 54 : drop(gc_info);
2035 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
2036 54 : let Some(max_layer_lsn) = layers
2037 54 : .iter_historic_layers()
2038 244 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
2039 208 : .map(|desc| desc.get_lsn_range().end)
2040 54 : .max()
2041 : else {
2042 0 : info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
2043 0 : return Ok(());
2044 : };
2045 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
2046 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
2047 : // it is a branch.
2048 54 : let Some(min_layer_lsn) = layers
2049 54 : .iter_historic_layers()
2050 244 : .filter(|desc| {
2051 244 : if compact_lsn_range.start == Lsn::INVALID {
2052 198 : true // select all layers below if start == Lsn(0)
2053 : } else {
2054 46 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
2055 : }
2056 244 : })
2057 226 : .map(|desc| desc.get_lsn_range().start)
2058 54 : .min()
2059 : else {
2060 0 : info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
2061 0 : return Ok(());
2062 : };
2063 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
2064 : // layers to compact.
2065 54 : let mut rewrite_layers = Vec::new();
2066 244 : for desc in layers.iter_historic_layers() {
2067 244 : if desc.get_lsn_range().end <= max_layer_lsn
2068 208 : && desc.get_lsn_range().start >= min_layer_lsn
2069 190 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
2070 : {
2071 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
2072 : // even if it might contain extra keys
2073 152 : selected_layers.push(guard.get_from_desc(&desc));
2074 152 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
2075 152 : // to overlap image layers)
2076 152 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
2077 2 : {
2078 2 : rewrite_layers.push(desc);
2079 150 : }
2080 92 : }
2081 : }
2082 54 : if selected_layers.is_empty() {
2083 2 : info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
2084 2 : return Ok(());
2085 52 : }
2086 52 : retain_lsns_below_horizon.sort();
2087 52 : GcCompactionJobDescription {
2088 52 : selected_layers,
2089 52 : gc_cutoff,
2090 52 : retain_lsns_below_horizon,
2091 52 : min_layer_lsn,
2092 52 : max_layer_lsn,
2093 52 : compaction_key_range: compact_key_range,
2094 52 : rewrite_layers,
2095 52 : }
2096 : };
2097 52 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
2098 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
2099 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
2100 8 : (true, job_desc.min_layer_lsn)
2101 44 : } else if self.ancestor_timeline.is_some() {
2102 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
2103 : // LSN ranges all the way to the ancestor timeline.
2104 2 : (true, self.ancestor_lsn)
2105 : } else {
2106 42 : let res = job_desc
2107 42 : .retain_lsns_below_horizon
2108 42 : .first()
2109 42 : .copied()
2110 42 : .unwrap_or(job_desc.gc_cutoff);
2111 42 : if cfg!(debug_assertions) {
2112 42 : assert_eq!(
2113 42 : res,
2114 42 : job_desc
2115 42 : .retain_lsns_below_horizon
2116 42 : .iter()
2117 42 : .min()
2118 42 : .copied()
2119 42 : .unwrap_or(job_desc.gc_cutoff)
2120 42 : );
2121 0 : }
2122 42 : (false, res)
2123 : };
2124 52 : info!(
2125 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
2126 0 : job_desc.selected_layers.len(),
2127 0 : job_desc.rewrite_layers.len(),
2128 : job_desc.max_layer_lsn,
2129 : job_desc.min_layer_lsn,
2130 : job_desc.gc_cutoff,
2131 : lowest_retain_lsn,
2132 : job_desc.compaction_key_range.start,
2133 : job_desc.compaction_key_range.end,
2134 : has_data_below,
2135 : );
2136 :
2137 204 : for layer in &job_desc.selected_layers {
2138 152 : debug!("read layer: {}", layer.layer_desc().key());
2139 : }
2140 54 : for layer in &job_desc.rewrite_layers {
2141 2 : debug!("rewrite layer: {}", layer.key());
2142 : }
2143 :
2144 52 : self.check_compaction_space(&job_desc.selected_layers)
2145 52 : .await?;
2146 :
2147 : // Generate statistics for the compaction
2148 204 : for layer in &job_desc.selected_layers {
2149 152 : let desc = layer.layer_desc();
2150 152 : if desc.is_delta() {
2151 86 : stat.visit_delta_layer(desc.file_size());
2152 86 : } else {
2153 66 : stat.visit_image_layer(desc.file_size());
2154 66 : }
2155 : }
2156 :
2157 : // Step 1: construct a k-merge iterator over all layers.
2158 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
2159 : // disable the check for now because we need to adjust the check for partial compactions, will enable later.
2160 : // let layer_names = job_desc
2161 : // .selected_layers
2162 : // .iter()
2163 : // .map(|layer| layer.layer_desc().layer_name())
2164 : // .collect_vec();
2165 : // if let Some(err) = check_valid_layermap(&layer_names) {
2166 : // warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
2167 : // }
2168 : // The maximum LSN we are processing in this compaction loop
2169 52 : let end_lsn = job_desc
2170 52 : .selected_layers
2171 52 : .iter()
2172 152 : .map(|l| l.layer_desc().lsn_range.end)
2173 52 : .max()
2174 52 : .unwrap();
2175 52 : let mut delta_layers = Vec::new();
2176 52 : let mut image_layers = Vec::new();
2177 52 : let mut downloaded_layers = Vec::new();
2178 52 : let mut total_downloaded_size = 0;
2179 52 : let mut total_layer_size = 0;
2180 204 : for layer in &job_desc.selected_layers {
2181 152 : if layer.needs_download().await?.is_some() {
2182 0 : total_downloaded_size += layer.layer_desc().file_size;
2183 152 : }
2184 152 : total_layer_size += layer.layer_desc().file_size;
2185 152 : let resident_layer = layer.download_and_keep_resident().await?;
2186 152 : downloaded_layers.push(resident_layer);
2187 : }
2188 52 : info!(
2189 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
2190 0 : total_downloaded_size,
2191 0 : total_layer_size,
2192 0 : total_downloaded_size as f64 / total_layer_size as f64
2193 : );
2194 204 : for resident_layer in &downloaded_layers {
2195 152 : if resident_layer.layer_desc().is_delta() {
2196 86 : let layer = resident_layer.get_as_delta(ctx).await?;
2197 86 : delta_layers.push(layer);
2198 : } else {
2199 66 : let layer = resident_layer.get_as_image(ctx).await?;
2200 66 : image_layers.push(layer);
2201 : }
2202 : }
2203 52 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
2204 52 : let mut merge_iter = FilterIterator::create(
2205 52 : MergeIterator::create(&delta_layers, &image_layers, ctx),
2206 52 : dense_ks,
2207 52 : sparse_ks,
2208 52 : )?;
2209 :
2210 : // Step 2: Produce images+deltas.
2211 52 : let mut accumulated_values = Vec::new();
2212 52 : let mut last_key: Option<Key> = None;
2213 :
2214 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2215 : // when some condition meet.
2216 52 : let mut image_layer_writer = if !has_data_below {
2217 : Some(
2218 42 : SplitImageLayerWriter::new(
2219 42 : self.conf,
2220 42 : self.timeline_id,
2221 42 : self.tenant_shard_id,
2222 42 : job_desc.compaction_key_range.start,
2223 42 : lowest_retain_lsn,
2224 42 : self.get_compaction_target_size(),
2225 42 : ctx,
2226 42 : )
2227 42 : .await?,
2228 : )
2229 : } else {
2230 10 : None
2231 : };
2232 :
2233 52 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
2234 52 : self.conf,
2235 52 : self.timeline_id,
2236 52 : self.tenant_shard_id,
2237 52 : lowest_retain_lsn..end_lsn,
2238 52 : self.get_compaction_target_size(),
2239 52 : )
2240 52 : .await?;
2241 :
2242 : #[derive(Default)]
2243 : struct RewritingLayers {
2244 : before: Option<DeltaLayerWriter>,
2245 : after: Option<DeltaLayerWriter>,
2246 : }
2247 52 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
2248 :
2249 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
2250 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
2251 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
2252 : ///
2253 : /// This function unifies the cases so that later code doesn't have to think about it.
2254 : ///
2255 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2256 : /// is needed for reconstruction. This should be fixed in the future.
2257 : ///
2258 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2259 : /// images.
2260 622 : async fn get_ancestor_image(
2261 622 : this_tline: &Arc<Timeline>,
2262 622 : key: Key,
2263 622 : ctx: &RequestContext,
2264 622 : has_data_below: bool,
2265 622 : history_lsn_point: Lsn,
2266 622 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2267 622 : if !has_data_below {
2268 584 : return Ok(None);
2269 38 : };
2270 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2271 : // as much existing code as possible.
2272 38 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
2273 38 : Ok(Some((key, history_lsn_point, img)))
2274 622 : }
2275 :
2276 : // Actually, we can decide not to write to the image layer at all at this point because
2277 : // the key and LSN range are determined. However, to keep things simple here, we still
2278 : // create this writer, and discard the writer in the end.
2279 :
2280 966 : while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
2281 914 : if cancel.is_cancelled() {
2282 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2283 914 : }
2284 914 : if self.shard_identity.is_key_disposable(&key) {
2285 : // If this shard does not need to store this key, simply skip it.
2286 : //
2287 : // This is not handled in the filter iterator because shard is determined by hash.
2288 : // Therefore, it does not give us any performance benefit to do things like skip
2289 : // a whole layer file as handling key spaces (ranges).
2290 0 : if cfg!(debug_assertions) {
2291 0 : let shard = self.shard_identity.shard_index();
2292 0 : let owner = self.shard_identity.get_shard_number(&key);
2293 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
2294 0 : }
2295 0 : continue;
2296 914 : }
2297 914 : if !job_desc.compaction_key_range.contains(&key) {
2298 64 : if !desc.is_delta {
2299 60 : continue;
2300 4 : }
2301 4 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
2302 4 : let rewriter = if key < job_desc.compaction_key_range.start {
2303 0 : if rewriter.before.is_none() {
2304 0 : rewriter.before = Some(
2305 0 : DeltaLayerWriter::new(
2306 0 : self.conf,
2307 0 : self.timeline_id,
2308 0 : self.tenant_shard_id,
2309 0 : desc.key_range.start,
2310 0 : desc.lsn_range.clone(),
2311 0 : ctx,
2312 0 : )
2313 0 : .await?,
2314 : );
2315 0 : }
2316 0 : rewriter.before.as_mut().unwrap()
2317 4 : } else if key >= job_desc.compaction_key_range.end {
2318 4 : if rewriter.after.is_none() {
2319 2 : rewriter.after = Some(
2320 2 : DeltaLayerWriter::new(
2321 2 : self.conf,
2322 2 : self.timeline_id,
2323 2 : self.tenant_shard_id,
2324 2 : job_desc.compaction_key_range.end,
2325 2 : desc.lsn_range.clone(),
2326 2 : ctx,
2327 2 : )
2328 2 : .await?,
2329 : );
2330 2 : }
2331 4 : rewriter.after.as_mut().unwrap()
2332 : } else {
2333 0 : unreachable!()
2334 : };
2335 4 : rewriter.put_value(key, lsn, val, ctx).await?;
2336 4 : continue;
2337 850 : }
2338 850 : match val {
2339 610 : Value::Image(_) => stat.visit_image_key(&val),
2340 240 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2341 : }
2342 850 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2343 280 : if last_key.is_none() {
2344 52 : last_key = Some(key);
2345 228 : }
2346 280 : accumulated_values.push((key, lsn, val));
2347 : } else {
2348 570 : let last_key: &mut Key = last_key.as_mut().unwrap();
2349 570 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
2350 570 : let retention = self
2351 570 : .generate_key_retention(
2352 570 : *last_key,
2353 570 : &accumulated_values,
2354 570 : job_desc.gc_cutoff,
2355 570 : &job_desc.retain_lsns_below_horizon,
2356 570 : COMPACTION_DELTA_THRESHOLD,
2357 570 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
2358 570 : .await?,
2359 : )
2360 570 : .await?;
2361 570 : retention
2362 570 : .pipe_to(
2363 570 : *last_key,
2364 570 : &mut delta_layer_writer,
2365 570 : image_layer_writer.as_mut(),
2366 570 : &mut stat,
2367 570 : ctx,
2368 570 : )
2369 570 : .await?;
2370 570 : accumulated_values.clear();
2371 570 : *last_key = key;
2372 570 : accumulated_values.push((key, lsn, val));
2373 : }
2374 : }
2375 :
2376 : // TODO: move the below part to the loop body
2377 52 : let last_key = last_key.expect("no keys produced during compaction");
2378 52 : stat.on_unique_key_visited();
2379 :
2380 52 : let retention = self
2381 52 : .generate_key_retention(
2382 52 : last_key,
2383 52 : &accumulated_values,
2384 52 : job_desc.gc_cutoff,
2385 52 : &job_desc.retain_lsns_below_horizon,
2386 52 : COMPACTION_DELTA_THRESHOLD,
2387 52 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
2388 : )
2389 52 : .await?;
2390 52 : retention
2391 52 : .pipe_to(
2392 52 : last_key,
2393 52 : &mut delta_layer_writer,
2394 52 : image_layer_writer.as_mut(),
2395 52 : &mut stat,
2396 52 : ctx,
2397 52 : )
2398 52 : .await?;
2399 : // end: move the above part to the loop body
2400 :
2401 52 : let mut rewrote_delta_layers = Vec::new();
2402 54 : for (key, writers) in delta_layer_rewriters {
2403 2 : if let Some(delta_writer_before) = writers.before {
2404 0 : let (desc, path) = delta_writer_before
2405 0 : .finish(job_desc.compaction_key_range.start, ctx)
2406 0 : .await?;
2407 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2408 0 : rewrote_delta_layers.push(layer);
2409 2 : }
2410 2 : if let Some(delta_writer_after) = writers.after {
2411 2 : let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
2412 2 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2413 2 : rewrote_delta_layers.push(layer);
2414 0 : }
2415 : }
2416 :
2417 74 : let discard = |key: &PersistentLayerKey| {
2418 74 : let key = key.clone();
2419 74 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2420 74 : };
2421 :
2422 52 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2423 42 : if !dry_run {
2424 38 : let end_key = job_desc.compaction_key_range.end;
2425 38 : writer
2426 38 : .finish_with_discard_fn(self, ctx, end_key, discard)
2427 38 : .await?
2428 : } else {
2429 4 : drop(writer);
2430 4 : Vec::new()
2431 : }
2432 : } else {
2433 10 : Vec::new()
2434 : };
2435 :
2436 52 : let produced_delta_layers = if !dry_run {
2437 48 : delta_layer_writer
2438 48 : .finish_with_discard_fn(self, ctx, discard)
2439 48 : .await?
2440 : } else {
2441 4 : drop(delta_layer_writer);
2442 4 : Vec::new()
2443 : };
2444 :
2445 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
2446 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
2447 52 : let mut compact_to = Vec::new();
2448 52 : let mut keep_layers = HashSet::new();
2449 52 : let produced_delta_layers_len = produced_delta_layers.len();
2450 52 : let produced_image_layers_len = produced_image_layers.len();
2451 88 : for action in produced_delta_layers {
2452 36 : match action {
2453 22 : BatchWriterResult::Produced(layer) => {
2454 22 : if cfg!(debug_assertions) {
2455 22 : info!("produced delta layer: {}", layer.layer_desc().key());
2456 0 : }
2457 22 : stat.produce_delta_layer(layer.layer_desc().file_size());
2458 22 : compact_to.push(layer);
2459 : }
2460 14 : BatchWriterResult::Discarded(l) => {
2461 14 : if cfg!(debug_assertions) {
2462 14 : info!("discarded delta layer: {}", l);
2463 0 : }
2464 14 : keep_layers.insert(l);
2465 14 : stat.discard_delta_layer();
2466 : }
2467 : }
2468 : }
2469 54 : for layer in &rewrote_delta_layers {
2470 2 : debug!(
2471 0 : "produced rewritten delta layer: {}",
2472 0 : layer.layer_desc().key()
2473 : );
2474 : }
2475 52 : compact_to.extend(rewrote_delta_layers);
2476 90 : for action in produced_image_layers {
2477 38 : match action {
2478 30 : BatchWriterResult::Produced(layer) => {
2479 30 : debug!("produced image layer: {}", layer.layer_desc().key());
2480 30 : stat.produce_image_layer(layer.layer_desc().file_size());
2481 30 : compact_to.push(layer);
2482 : }
2483 8 : BatchWriterResult::Discarded(l) => {
2484 8 : debug!("discarded image layer: {}", l);
2485 8 : keep_layers.insert(l);
2486 8 : stat.discard_image_layer();
2487 : }
2488 : }
2489 : }
2490 :
2491 52 : let mut layer_selection = job_desc.selected_layers;
2492 :
2493 : // Partial compaction might select more data than it processes, e.g., if
2494 : // the compaction_key_range only partially overlaps:
2495 : //
2496 : // [---compaction_key_range---]
2497 : // [---A----][----B----][----C----][----D----]
2498 : //
2499 : // For delta layers, we will rewrite the layers so that it is cut exactly at
2500 : // the compaction key range, so we can always discard them. However, for image
2501 : // layers, as we do not rewrite them for now, we need to handle them differently.
2502 : // Assume image layers A, B, C, D are all in the `layer_selection`.
2503 : //
2504 : // The created image layers contain whatever is needed from B, C, and from
2505 : // `----]` of A, and from `[---` of D.
2506 : //
2507 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
2508 : // keep that data.
2509 : //
2510 : // The solution for now is to keep A and D completely if they are image layers.
2511 : // (layer_selection is what we'll remove from the layer map, so, retain what
2512 : // is _not_ fully covered by compaction_key_range).
2513 204 : for layer in &layer_selection {
2514 152 : if !layer.layer_desc().is_delta() {
2515 66 : if !overlaps_with(
2516 66 : &layer.layer_desc().key_range,
2517 66 : &job_desc.compaction_key_range,
2518 66 : ) {
2519 0 : bail!("violated constraint: image layer outside of compaction key range");
2520 66 : }
2521 66 : if !fully_contains(
2522 66 : &job_desc.compaction_key_range,
2523 66 : &layer.layer_desc().key_range,
2524 66 : ) {
2525 8 : keep_layers.insert(layer.layer_desc().key());
2526 58 : }
2527 86 : }
2528 : }
2529 :
2530 152 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2531 52 :
2532 52 : info!(
2533 0 : "gc-compaction statistics: {}",
2534 0 : serde_json::to_string(&stat)?
2535 : );
2536 :
2537 52 : if dry_run {
2538 4 : return Ok(());
2539 48 : }
2540 48 :
2541 48 : info!(
2542 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2543 0 : produced_delta_layers_len,
2544 0 : produced_image_layers_len,
2545 0 : layer_selection.len()
2546 : );
2547 :
2548 : // Step 3: Place back to the layer map.
2549 : {
2550 : // TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
2551 48 : let mut guard = self.layers.write().await;
2552 48 : guard
2553 48 : .open_mut()?
2554 48 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2555 48 : };
2556 48 : self.remote_client
2557 48 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2558 :
2559 48 : drop(gc_lock);
2560 48 :
2561 48 : Ok(())
2562 54 : }
2563 : }
2564 :
2565 : struct TimelineAdaptor {
2566 : timeline: Arc<Timeline>,
2567 :
2568 : keyspace: (Lsn, KeySpace),
2569 :
2570 : new_deltas: Vec<ResidentLayer>,
2571 : new_images: Vec<ResidentLayer>,
2572 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2573 : }
2574 :
2575 : impl TimelineAdaptor {
2576 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2577 0 : Self {
2578 0 : timeline: timeline.clone(),
2579 0 : keyspace,
2580 0 : new_images: Vec::new(),
2581 0 : new_deltas: Vec::new(),
2582 0 : layers_to_delete: Vec::new(),
2583 0 : }
2584 0 : }
2585 :
2586 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2587 0 : let layers_to_delete = {
2588 0 : let guard = self.timeline.layers.read().await;
2589 0 : self.layers_to_delete
2590 0 : .iter()
2591 0 : .map(|x| guard.get_from_desc(x))
2592 0 : .collect::<Vec<Layer>>()
2593 0 : };
2594 0 : self.timeline
2595 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2596 0 : .await?;
2597 :
2598 0 : self.timeline
2599 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2600 :
2601 0 : self.new_deltas.clear();
2602 0 : self.layers_to_delete.clear();
2603 0 : Ok(())
2604 0 : }
2605 : }
2606 :
2607 : #[derive(Clone)]
2608 : struct ResidentDeltaLayer(ResidentLayer);
2609 : #[derive(Clone)]
2610 : struct ResidentImageLayer(ResidentLayer);
2611 :
2612 : impl CompactionJobExecutor for TimelineAdaptor {
2613 : type Key = pageserver_api::key::Key;
2614 :
2615 : type Layer = OwnArc<PersistentLayerDesc>;
2616 : type DeltaLayer = ResidentDeltaLayer;
2617 : type ImageLayer = ResidentImageLayer;
2618 :
2619 : type RequestContext = crate::context::RequestContext;
2620 :
2621 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2622 0 : self.timeline.get_shard_identity()
2623 0 : }
2624 :
2625 0 : async fn get_layers(
2626 0 : &mut self,
2627 0 : key_range: &Range<Key>,
2628 0 : lsn_range: &Range<Lsn>,
2629 0 : _ctx: &RequestContext,
2630 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2631 0 : self.flush_updates().await?;
2632 :
2633 0 : let guard = self.timeline.layers.read().await;
2634 0 : let layer_map = guard.layer_map()?;
2635 :
2636 0 : let result = layer_map
2637 0 : .iter_historic_layers()
2638 0 : .filter(|l| {
2639 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2640 0 : })
2641 0 : .map(OwnArc)
2642 0 : .collect();
2643 0 : Ok(result)
2644 0 : }
2645 :
2646 0 : async fn get_keyspace(
2647 0 : &mut self,
2648 0 : key_range: &Range<Key>,
2649 0 : lsn: Lsn,
2650 0 : _ctx: &RequestContext,
2651 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2652 0 : if lsn == self.keyspace.0 {
2653 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2654 0 : &self.keyspace.1.ranges,
2655 0 : key_range,
2656 0 : ))
2657 : } else {
2658 : // The current compaction implementation only ever requests the key space
2659 : // at the compaction end LSN.
2660 0 : anyhow::bail!("keyspace not available for requested lsn");
2661 : }
2662 0 : }
2663 :
2664 0 : async fn downcast_delta_layer(
2665 0 : &self,
2666 0 : layer: &OwnArc<PersistentLayerDesc>,
2667 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2668 0 : // this is a lot more complex than a simple downcast...
2669 0 : if layer.is_delta() {
2670 0 : let l = {
2671 0 : let guard = self.timeline.layers.read().await;
2672 0 : guard.get_from_desc(layer)
2673 : };
2674 0 : let result = l.download_and_keep_resident().await?;
2675 :
2676 0 : Ok(Some(ResidentDeltaLayer(result)))
2677 : } else {
2678 0 : Ok(None)
2679 : }
2680 0 : }
2681 :
2682 0 : async fn create_image(
2683 0 : &mut self,
2684 0 : lsn: Lsn,
2685 0 : key_range: &Range<Key>,
2686 0 : ctx: &RequestContext,
2687 0 : ) -> anyhow::Result<()> {
2688 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2689 0 : }
2690 :
2691 0 : async fn create_delta(
2692 0 : &mut self,
2693 0 : lsn_range: &Range<Lsn>,
2694 0 : key_range: &Range<Key>,
2695 0 : input_layers: &[ResidentDeltaLayer],
2696 0 : ctx: &RequestContext,
2697 0 : ) -> anyhow::Result<()> {
2698 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2699 :
2700 0 : let mut all_entries = Vec::new();
2701 0 : for dl in input_layers.iter() {
2702 0 : all_entries.extend(dl.load_keys(ctx).await?);
2703 : }
2704 :
2705 : // The current stdlib sorting implementation is designed in a way where it is
2706 : // particularly fast where the slice is made up of sorted sub-ranges.
2707 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2708 :
2709 0 : let mut writer = DeltaLayerWriter::new(
2710 0 : self.timeline.conf,
2711 0 : self.timeline.timeline_id,
2712 0 : self.timeline.tenant_shard_id,
2713 0 : key_range.start,
2714 0 : lsn_range.clone(),
2715 0 : ctx,
2716 0 : )
2717 0 : .await?;
2718 :
2719 0 : let mut dup_values = 0;
2720 0 :
2721 0 : // This iterator walks through all key-value pairs from all the layers
2722 0 : // we're compacting, in key, LSN order.
2723 0 : let mut prev: Option<(Key, Lsn)> = None;
2724 : for &DeltaEntry {
2725 0 : key, lsn, ref val, ..
2726 0 : } in all_entries.iter()
2727 : {
2728 0 : if prev == Some((key, lsn)) {
2729 : // This is a duplicate. Skip it.
2730 : //
2731 : // It can happen if compaction is interrupted after writing some
2732 : // layers but not all, and we are compacting the range again.
2733 : // The calculations in the algorithm assume that there are no
2734 : // duplicates, so the math on targeted file size is likely off,
2735 : // and we will create smaller files than expected.
2736 0 : dup_values += 1;
2737 0 : continue;
2738 0 : }
2739 :
2740 0 : let value = val.load(ctx).await?;
2741 :
2742 0 : writer.put_value(key, lsn, value, ctx).await?;
2743 :
2744 0 : prev = Some((key, lsn));
2745 : }
2746 :
2747 0 : if dup_values > 0 {
2748 0 : warn!("delta layer created with {} duplicate values", dup_values);
2749 0 : }
2750 :
2751 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2752 0 : Err(anyhow::anyhow!(
2753 0 : "failpoint delta-layer-writer-fail-before-finish"
2754 0 : ))
2755 0 : });
2756 :
2757 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2758 0 : let new_delta_layer =
2759 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2760 :
2761 0 : self.new_deltas.push(new_delta_layer);
2762 0 : Ok(())
2763 0 : }
2764 :
2765 0 : async fn delete_layer(
2766 0 : &mut self,
2767 0 : layer: &OwnArc<PersistentLayerDesc>,
2768 0 : _ctx: &RequestContext,
2769 0 : ) -> anyhow::Result<()> {
2770 0 : self.layers_to_delete.push(layer.clone().0);
2771 0 : Ok(())
2772 0 : }
2773 : }
2774 :
2775 : impl TimelineAdaptor {
2776 0 : async fn create_image_impl(
2777 0 : &mut self,
2778 0 : lsn: Lsn,
2779 0 : key_range: &Range<Key>,
2780 0 : ctx: &RequestContext,
2781 0 : ) -> Result<(), CreateImageLayersError> {
2782 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2783 :
2784 0 : let image_layer_writer = ImageLayerWriter::new(
2785 0 : self.timeline.conf,
2786 0 : self.timeline.timeline_id,
2787 0 : self.timeline.tenant_shard_id,
2788 0 : key_range,
2789 0 : lsn,
2790 0 : ctx,
2791 0 : )
2792 0 : .await?;
2793 :
2794 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2795 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2796 0 : "failpoint image-layer-writer-fail-before-finish"
2797 0 : )))
2798 0 : });
2799 :
2800 0 : let keyspace = KeySpace {
2801 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2802 : };
2803 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2804 0 : let start = Key::MIN;
2805 : let ImageLayerCreationOutcome {
2806 0 : image,
2807 : next_start_key: _,
2808 0 : } = self
2809 0 : .timeline
2810 0 : .create_image_layer_for_rel_blocks(
2811 0 : &keyspace,
2812 0 : image_layer_writer,
2813 0 : lsn,
2814 0 : ctx,
2815 0 : key_range.clone(),
2816 0 : start,
2817 0 : )
2818 0 : .await?;
2819 :
2820 0 : if let Some(image_layer) = image {
2821 0 : self.new_images.push(image_layer);
2822 0 : }
2823 :
2824 0 : timer.stop_and_record();
2825 0 :
2826 0 : Ok(())
2827 0 : }
2828 : }
2829 :
2830 : impl CompactionRequestContext for crate::context::RequestContext {}
2831 :
2832 : #[derive(Debug, Clone)]
2833 : pub struct OwnArc<T>(pub Arc<T>);
2834 :
2835 : impl<T> Deref for OwnArc<T> {
2836 : type Target = <Arc<T> as Deref>::Target;
2837 0 : fn deref(&self) -> &Self::Target {
2838 0 : &self.0
2839 0 : }
2840 : }
2841 :
2842 : impl<T> AsRef<T> for OwnArc<T> {
2843 0 : fn as_ref(&self) -> &T {
2844 0 : self.0.as_ref()
2845 0 : }
2846 : }
2847 :
2848 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2849 0 : fn key_range(&self) -> &Range<Key> {
2850 0 : &self.key_range
2851 0 : }
2852 0 : fn lsn_range(&self) -> &Range<Lsn> {
2853 0 : &self.lsn_range
2854 0 : }
2855 0 : fn file_size(&self) -> u64 {
2856 0 : self.file_size
2857 0 : }
2858 0 : fn short_id(&self) -> std::string::String {
2859 0 : self.as_ref().short_id().to_string()
2860 0 : }
2861 0 : fn is_delta(&self) -> bool {
2862 0 : self.as_ref().is_delta()
2863 0 : }
2864 : }
2865 :
2866 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2867 0 : fn key_range(&self) -> &Range<Key> {
2868 0 : &self.layer_desc().key_range
2869 0 : }
2870 0 : fn lsn_range(&self) -> &Range<Lsn> {
2871 0 : &self.layer_desc().lsn_range
2872 0 : }
2873 0 : fn file_size(&self) -> u64 {
2874 0 : self.layer_desc().file_size
2875 0 : }
2876 0 : fn short_id(&self) -> std::string::String {
2877 0 : self.layer_desc().short_id().to_string()
2878 0 : }
2879 0 : fn is_delta(&self) -> bool {
2880 0 : true
2881 0 : }
2882 : }
2883 :
2884 : use crate::tenant::timeline::DeltaEntry;
2885 :
2886 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2887 0 : fn key_range(&self) -> &Range<Key> {
2888 0 : &self.0.layer_desc().key_range
2889 0 : }
2890 0 : fn lsn_range(&self) -> &Range<Lsn> {
2891 0 : &self.0.layer_desc().lsn_range
2892 0 : }
2893 0 : fn file_size(&self) -> u64 {
2894 0 : self.0.layer_desc().file_size
2895 0 : }
2896 0 : fn short_id(&self) -> std::string::String {
2897 0 : self.0.layer_desc().short_id().to_string()
2898 0 : }
2899 0 : fn is_delta(&self) -> bool {
2900 0 : true
2901 0 : }
2902 : }
2903 :
2904 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2905 : type DeltaEntry<'a> = DeltaEntry<'a>;
2906 :
2907 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2908 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
2909 0 : }
2910 : }
2911 :
2912 : impl CompactionLayer<Key> for ResidentImageLayer {
2913 0 : fn key_range(&self) -> &Range<Key> {
2914 0 : &self.0.layer_desc().key_range
2915 0 : }
2916 0 : fn lsn_range(&self) -> &Range<Lsn> {
2917 0 : &self.0.layer_desc().lsn_range
2918 0 : }
2919 0 : fn file_size(&self) -> u64 {
2920 0 : self.0.layer_desc().file_size
2921 0 : }
2922 0 : fn short_id(&self) -> std::string::String {
2923 0 : self.0.layer_desc().short_id().to_string()
2924 0 : }
2925 0 : fn is_delta(&self) -> bool {
2926 0 : false
2927 0 : }
2928 : }
2929 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|