Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
25 : use serde::Serialize;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{debug, info, info_span, trace, warn, Instrument};
28 : use utils::id::TimelineId;
29 :
30 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
31 : use crate::page_cache;
32 : use crate::statvfs::Statvfs;
33 : use crate::tenant::checks::check_valid_layermap;
34 : use crate::tenant::remote_timeline_client::WaitCompletionError;
35 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
36 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
37 : use crate::tenant::storage_layer::split_writer::{
38 : SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
39 : };
40 : use crate::tenant::storage_layer::{
41 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
42 : };
43 : use crate::tenant::timeline::ImageLayerCreationOutcome;
44 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
45 : use crate::tenant::timeline::{Layer, ResidentLayer};
46 : use crate::tenant::{DeltaLayer, MaybeOffloaded};
47 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
48 : use pageserver_api::config::tenant_conf_defaults::{
49 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
50 : };
51 :
52 : use crate::keyspace::KeySpace;
53 : use crate::repository::{Key, Value};
54 : use crate::walrecord::NeonWalRecord;
55 :
56 : use utils::lsn::Lsn;
57 :
58 : use pageserver_compaction::helpers::overlaps_with;
59 : use pageserver_compaction::interface::*;
60 :
61 : use super::CompactionError;
62 :
63 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
64 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
65 :
66 : /// The result of bottom-most compaction for a single key at each LSN.
67 : #[derive(Debug)]
68 : #[cfg_attr(test, derive(PartialEq))]
69 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
70 :
71 : /// The result of bottom-most compaction.
72 : #[derive(Debug)]
73 : #[cfg_attr(test, derive(PartialEq))]
74 : pub(crate) struct KeyHistoryRetention {
75 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
76 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
77 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
78 : pub(crate) above_horizon: KeyLogAtLsn,
79 : }
80 :
81 : impl KeyHistoryRetention {
82 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
83 : ///
84 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
85 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
86 : /// And we have branches at LSN 0x10, 0x20, 0x30.
87 : /// Then we delete branch @ 0x20.
88 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
89 : /// And that wouldnt' change the shape of the layer.
90 : ///
91 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
92 : ///
93 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
94 38 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
95 38 : if dry_run {
96 0 : return true;
97 38 : }
98 38 : let guard = tline.layers.read().await;
99 38 : if !guard.contains_key(key) {
100 22 : return false;
101 16 : }
102 16 : let layer_generation = guard.get_from_key(key).metadata().generation;
103 16 : drop(guard);
104 16 : if layer_generation == tline.generation {
105 16 : info!(
106 : key=%key,
107 : ?layer_generation,
108 0 : "discard layer due to duplicated layer key in the same generation",
109 : );
110 16 : true
111 : } else {
112 0 : false
113 : }
114 38 : }
115 :
116 : /// Pipe a history of a single key to the writers.
117 : ///
118 : /// If `image_writer` is none, the images will be placed into the delta layers.
119 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
120 : #[allow(clippy::too_many_arguments)]
121 422 : async fn pipe_to(
122 422 : self,
123 422 : key: Key,
124 422 : tline: &Arc<Timeline>,
125 422 : delta_writer: &mut SplitDeltaLayerWriter,
126 422 : mut image_writer: Option<&mut SplitImageLayerWriter>,
127 422 : stat: &mut CompactionStatistics,
128 422 : dry_run: bool,
129 422 : ctx: &RequestContext,
130 422 : ) -> anyhow::Result<()> {
131 422 : let mut first_batch = true;
132 422 : let discard = |key: &PersistentLayerKey| {
133 0 : let key = key.clone();
134 0 : async move { Self::discard_key(&key, tline, dry_run).await }
135 0 : };
136 1402 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
137 980 : if first_batch {
138 422 : if logs.len() == 1 && logs[0].1.is_image() {
139 408 : let Value::Image(img) = &logs[0].1 else {
140 0 : unreachable!()
141 : };
142 408 : stat.produce_image_key(img);
143 408 : if let Some(image_writer) = image_writer.as_mut() {
144 410 : image_writer.put_image(key, img.clone(), ctx).await?;
145 : } else {
146 0 : delta_writer
147 0 : .put_value_with_discard_fn(
148 0 : key,
149 0 : cutoff_lsn,
150 0 : Value::Image(img.clone()),
151 0 : tline,
152 0 : ctx,
153 0 : discard,
154 0 : )
155 0 : .await?;
156 : }
157 : } else {
158 28 : for (lsn, val) in logs {
159 14 : stat.produce_key(&val);
160 14 : delta_writer
161 14 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
162 1 : .await?;
163 : }
164 : }
165 422 : first_batch = false;
166 : } else {
167 640 : for (lsn, val) in logs {
168 82 : stat.produce_key(&val);
169 82 : delta_writer
170 82 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
171 8 : .await?;
172 : }
173 : }
174 : }
175 422 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
176 454 : for (lsn, val) in above_horizon_logs {
177 32 : stat.produce_key(&val);
178 32 : delta_writer
179 32 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
180 2 : .await?;
181 : }
182 422 : Ok(())
183 422 : }
184 : }
185 :
186 : #[derive(Debug, Serialize, Default)]
187 : struct CompactionStatisticsNumSize {
188 : num: u64,
189 : size: u64,
190 : }
191 :
192 : #[derive(Debug, Serialize, Default)]
193 : pub struct CompactionStatistics {
194 : delta_layer_visited: CompactionStatisticsNumSize,
195 : image_layer_visited: CompactionStatisticsNumSize,
196 : delta_layer_produced: CompactionStatisticsNumSize,
197 : image_layer_produced: CompactionStatisticsNumSize,
198 : num_delta_layer_discarded: usize,
199 : num_image_layer_discarded: usize,
200 : num_unique_keys_visited: usize,
201 : wal_keys_visited: CompactionStatisticsNumSize,
202 : image_keys_visited: CompactionStatisticsNumSize,
203 : wal_produced: CompactionStatisticsNumSize,
204 : image_produced: CompactionStatisticsNumSize,
205 : }
206 :
207 : impl CompactionStatistics {
208 686 : fn estimated_size_of_value(val: &Value) -> usize {
209 266 : match val {
210 420 : Value::Image(img) => img.len(),
211 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
212 266 : _ => std::mem::size_of::<NeonWalRecord>(),
213 : }
214 686 : }
215 1096 : fn estimated_size_of_key() -> usize {
216 1096 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
217 1096 : }
218 46 : fn visit_delta_layer(&mut self, size: u64) {
219 46 : self.delta_layer_visited.num += 1;
220 46 : self.delta_layer_visited.size += size;
221 46 : }
222 36 : fn visit_image_layer(&mut self, size: u64) {
223 36 : self.image_layer_visited.num += 1;
224 36 : self.image_layer_visited.size += size;
225 36 : }
226 422 : fn on_unique_key_visited(&mut self) {
227 422 : self.num_unique_keys_visited += 1;
228 422 : }
229 140 : fn visit_wal_key(&mut self, val: &Value) {
230 140 : self.wal_keys_visited.num += 1;
231 140 : self.wal_keys_visited.size +=
232 140 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
233 140 : }
234 420 : fn visit_image_key(&mut self, val: &Value) {
235 420 : self.image_keys_visited.num += 1;
236 420 : self.image_keys_visited.size +=
237 420 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
238 420 : }
239 128 : fn produce_key(&mut self, val: &Value) {
240 128 : match val {
241 2 : Value::Image(img) => self.produce_image_key(img),
242 126 : Value::WalRecord(_) => self.produce_wal_key(val),
243 : }
244 128 : }
245 126 : fn produce_wal_key(&mut self, val: &Value) {
246 126 : self.wal_produced.num += 1;
247 126 : self.wal_produced.size +=
248 126 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
249 126 : }
250 410 : fn produce_image_key(&mut self, val: &Bytes) {
251 410 : self.image_produced.num += 1;
252 410 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
253 410 : }
254 8 : fn discard_delta_layer(&mut self) {
255 8 : self.num_delta_layer_discarded += 1;
256 8 : }
257 8 : fn discard_image_layer(&mut self) {
258 8 : self.num_image_layer_discarded += 1;
259 8 : }
260 10 : fn produce_delta_layer(&mut self, size: u64) {
261 10 : self.delta_layer_produced.num += 1;
262 10 : self.delta_layer_produced.size += size;
263 10 : }
264 12 : fn produce_image_layer(&mut self, size: u64) {
265 12 : self.image_layer_produced.num += 1;
266 12 : self.image_layer_produced.size += size;
267 12 : }
268 : }
269 :
270 : impl Timeline {
271 : /// TODO: cancellation
272 : ///
273 : /// Returns whether the compaction has pending tasks.
274 364 : pub(crate) async fn compact_legacy(
275 364 : self: &Arc<Self>,
276 364 : cancel: &CancellationToken,
277 364 : flags: EnumSet<CompactFlags>,
278 364 : ctx: &RequestContext,
279 364 : ) -> Result<bool, CompactionError> {
280 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
281 0 : self.compact_with_gc(cancel, flags, ctx)
282 0 : .await
283 0 : .map_err(CompactionError::Other)?;
284 0 : return Ok(false);
285 364 : }
286 364 :
287 364 : if flags.contains(CompactFlags::DryRun) {
288 0 : return Err(CompactionError::Other(anyhow!(
289 0 : "dry-run mode is not supported for legacy compaction for now"
290 0 : )));
291 364 : }
292 364 :
293 364 : // High level strategy for compaction / image creation:
294 364 : //
295 364 : // 1. First, calculate the desired "partitioning" of the
296 364 : // currently in-use key space. The goal is to partition the
297 364 : // key space into roughly fixed-size chunks, but also take into
298 364 : // account any existing image layers, and try to align the
299 364 : // chunk boundaries with the existing image layers to avoid
300 364 : // too much churn. Also try to align chunk boundaries with
301 364 : // relation boundaries. In principle, we don't know about
302 364 : // relation boundaries here, we just deal with key-value
303 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
304 364 : // map relations into key-value pairs. But in practice we know
305 364 : // that 'field6' is the block number, and the fields 1-5
306 364 : // identify a relation. This is just an optimization,
307 364 : // though.
308 364 : //
309 364 : // 2. Once we know the partitioning, for each partition,
310 364 : // decide if it's time to create a new image layer. The
311 364 : // criteria is: there has been too much "churn" since the last
312 364 : // image layer? The "churn" is fuzzy concept, it's a
313 364 : // combination of too many delta files, or too much WAL in
314 364 : // total in the delta file. Or perhaps: if creating an image
315 364 : // file would allow to delete some older files.
316 364 : //
317 364 : // 3. After that, we compact all level0 delta files if there
318 364 : // are too many of them. While compacting, we also garbage
319 364 : // collect any page versions that are no longer needed because
320 364 : // of the new image layers we created in step 2.
321 364 : //
322 364 : // TODO: This high level strategy hasn't been implemented yet.
323 364 : // Below are functions compact_level0() and create_image_layers()
324 364 : // but they are a bit ad hoc and don't quite work like it's explained
325 364 : // above. Rewrite it.
326 364 :
327 364 : // Is the timeline being deleted?
328 364 : if self.is_stopping() {
329 0 : trace!("Dropping out of compaction on timeline shutdown");
330 0 : return Err(CompactionError::ShuttingDown);
331 364 : }
332 364 :
333 364 : let target_file_size = self.get_checkpoint_distance();
334 :
335 : // Define partitioning schema if needed
336 :
337 : // FIXME: the match should only cover repartitioning, not the next steps
338 364 : let (partition_count, has_pending_tasks) = match self
339 364 : .repartition(
340 364 : self.get_last_record_lsn(),
341 364 : self.get_compaction_target_size(),
342 364 : flags,
343 364 : ctx,
344 364 : )
345 15808 : .await
346 : {
347 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
348 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
349 364 : let image_ctx = RequestContextBuilder::extend(ctx)
350 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
351 364 : .build();
352 364 :
353 364 : // 2. Compact
354 364 : let timer = self.metrics.compact_time_histo.start_timer();
355 364 : let fully_compacted = self
356 364 : .compact_level0(
357 364 : target_file_size,
358 364 : flags.contains(CompactFlags::ForceL0Compaction),
359 364 : ctx,
360 364 : )
361 9900 : .await?;
362 364 : timer.stop_and_record();
363 364 :
364 364 : let mut partitioning = dense_partitioning;
365 364 : partitioning
366 364 : .parts
367 364 : .extend(sparse_partitioning.into_dense().parts);
368 364 :
369 364 : // 3. Create new image layers for partitions that have been modified
370 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
371 364 : if fully_compacted {
372 364 : let image_layers = self
373 364 : .create_image_layers(
374 364 : &partitioning,
375 364 : lsn,
376 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
377 14 : ImageLayerCreationMode::Force
378 : } else {
379 350 : ImageLayerCreationMode::Try
380 : },
381 364 : &image_ctx,
382 : )
383 11969 : .await?;
384 :
385 364 : self.upload_new_image_layers(image_layers)?;
386 : } else {
387 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
388 : }
389 364 : (partitioning.parts.len(), !fully_compacted)
390 : }
391 0 : Err(err) => {
392 0 : // no partitioning? This is normal, if the timeline was just created
393 0 : // as an empty timeline. Also in unit tests, when we use the timeline
394 0 : // as a simple key-value store, ignoring the datadir layout. Log the
395 0 : // error but continue.
396 0 : //
397 0 : // Suppress error when it's due to cancellation
398 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
399 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
400 0 : }
401 0 : (1, false)
402 : }
403 : };
404 :
405 364 : if self.shard_identity.count >= ShardCount::new(2) {
406 : // Limit the number of layer rewrites to the number of partitions: this means its
407 : // runtime should be comparable to a full round of image layer creations, rather than
408 : // being potentially much longer.
409 0 : let rewrite_max = partition_count;
410 0 :
411 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
412 364 : }
413 :
414 364 : Ok(has_pending_tasks)
415 364 : }
416 :
417 : /// Check for layers that are elegible to be rewritten:
418 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
419 : /// we don't indefinitely retain keys in this shard that aren't needed.
420 : /// - For future use: layers beyond pitr_interval that are in formats we would
421 : /// rather not maintain compatibility with indefinitely.
422 : ///
423 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
424 : /// how much work it will try to do in each compaction pass.
425 0 : async fn compact_shard_ancestors(
426 0 : self: &Arc<Self>,
427 0 : rewrite_max: usize,
428 0 : ctx: &RequestContext,
429 0 : ) -> Result<(), CompactionError> {
430 0 : let mut drop_layers = Vec::new();
431 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
432 0 :
433 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
434 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
435 0 : // pitr_interval, for example because a branchpoint references it.
436 0 : //
437 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
438 0 : // are rewriting layers.
439 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
440 0 :
441 0 : tracing::info!(
442 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
443 0 : *latest_gc_cutoff,
444 0 : self.gc_info.read().unwrap().cutoffs.time
445 : );
446 :
447 0 : let layers = self.layers.read().await;
448 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
449 0 : let layer = layers.get_from_desc(&layer_desc);
450 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
451 : // This layer does not belong to a historic ancestor, no need to re-image it.
452 0 : continue;
453 0 : }
454 0 :
455 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
456 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
457 0 : let layer_local_page_count = sharded_range.page_count();
458 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
459 0 : if layer_local_page_count == 0 {
460 : // This ancestral layer only covers keys that belong to other shards.
461 : // We include the full metadata in the log: if we had some critical bug that caused
462 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
463 0 : info!(%layer, old_metadata=?layer.metadata(),
464 0 : "dropping layer after shard split, contains no keys for this shard.",
465 : );
466 :
467 0 : if cfg!(debug_assertions) {
468 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
469 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
470 : // should be !is_key_disposable()
471 0 : let range = layer_desc.get_key_range();
472 0 : let mut key = range.start;
473 0 : while key < range.end {
474 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
475 0 : key = key.next();
476 : }
477 0 : }
478 :
479 0 : drop_layers.push(layer);
480 0 : continue;
481 0 : } else if layer_local_page_count != u32::MAX
482 0 : && layer_local_page_count == layer_raw_page_count
483 : {
484 0 : debug!(%layer,
485 0 : "layer is entirely shard local ({} keys), no need to filter it",
486 : layer_local_page_count
487 : );
488 0 : continue;
489 0 : }
490 0 :
491 0 : // Don't bother re-writing a layer unless it will at least halve its size
492 0 : if layer_local_page_count != u32::MAX
493 0 : && layer_local_page_count > layer_raw_page_count / 2
494 : {
495 0 : debug!(%layer,
496 0 : "layer is already mostly local ({}/{}), not rewriting",
497 : layer_local_page_count,
498 : layer_raw_page_count
499 : );
500 0 : }
501 :
502 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
503 : // without incurring the I/O cost of a rewrite.
504 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
505 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
506 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
507 0 : continue;
508 0 : }
509 0 :
510 0 : if layer_desc.is_delta() {
511 : // We do not yet implement rewrite of delta layers
512 0 : debug!(%layer, "Skipping rewrite of delta layer");
513 0 : continue;
514 0 : }
515 0 :
516 0 : // Only rewrite layers if their generations differ. This guarantees:
517 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
518 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
519 0 : if layer.metadata().generation == self.generation {
520 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
521 0 : continue;
522 0 : }
523 0 :
524 0 : if layers_to_rewrite.len() >= rewrite_max {
525 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
526 0 : layers_to_rewrite.len()
527 : );
528 0 : continue;
529 0 : }
530 0 :
531 0 : // Fall through: all our conditions for doing a rewrite passed.
532 0 : layers_to_rewrite.push(layer);
533 : }
534 :
535 : // Drop read lock on layer map before we start doing time-consuming I/O
536 0 : drop(layers);
537 0 :
538 0 : let mut replace_image_layers = Vec::new();
539 :
540 0 : for layer in layers_to_rewrite {
541 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
542 0 : let mut image_layer_writer = ImageLayerWriter::new(
543 0 : self.conf,
544 0 : self.timeline_id,
545 0 : self.tenant_shard_id,
546 0 : &layer.layer_desc().key_range,
547 0 : layer.layer_desc().image_layer_lsn(),
548 0 : ctx,
549 0 : )
550 0 : .await
551 0 : .map_err(CompactionError::Other)?;
552 :
553 : // Safety of layer rewrites:
554 : // - We are writing to a different local file path than we are reading from, so the old Layer
555 : // cannot interfere with the new one.
556 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
557 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
558 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
559 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
560 : // - Any readers that have a reference to the old layer will keep it alive until they are done
561 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
562 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
563 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
564 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
565 : // - ingestion, which only inserts layers, therefore cannot collide with us.
566 0 : let resident = layer.download_and_keep_resident().await?;
567 :
568 0 : let keys_written = resident
569 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
570 0 : .await?;
571 :
572 0 : if keys_written > 0 {
573 0 : let (desc, path) = image_layer_writer
574 0 : .finish(ctx)
575 0 : .await
576 0 : .map_err(CompactionError::Other)?;
577 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
578 0 : .map_err(CompactionError::Other)?;
579 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
580 0 : layer.metadata().file_size,
581 0 : new_layer.metadata().file_size);
582 :
583 0 : replace_image_layers.push((layer, new_layer));
584 0 : } else {
585 0 : // Drop the old layer. Usually for this case we would already have noticed that
586 0 : // the layer has no data for us with the ShardedRange check above, but
587 0 : drop_layers.push(layer);
588 0 : }
589 : }
590 :
591 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
592 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
593 : // to remote index) and be removed. This is inefficient but safe.
594 0 : fail::fail_point!("compact-shard-ancestors-localonly");
595 0 :
596 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
597 0 : self.rewrite_layers(replace_image_layers, drop_layers)
598 0 : .await?;
599 :
600 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
601 0 :
602 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
603 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
604 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
605 0 : // load.
606 0 : match self.remote_client.wait_completion().await {
607 0 : Ok(()) => (),
608 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
609 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
610 0 : return Err(CompactionError::ShuttingDown)
611 : }
612 : }
613 :
614 0 : fail::fail_point!("compact-shard-ancestors-persistent");
615 0 :
616 0 : Ok(())
617 0 : }
618 :
619 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
620 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
621 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
622 : ///
623 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
624 : /// that we know won't be needed for reads.
625 192 : pub(super) async fn update_layer_visibility(
626 192 : &self,
627 192 : ) -> Result<(), super::layer_manager::Shutdown> {
628 192 : let head_lsn = self.get_last_record_lsn();
629 :
630 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
631 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
632 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
633 : // they will be subject to L0->L1 compaction in the near future.
634 192 : let layer_manager = self.layers.read().await;
635 192 : let layer_map = layer_manager.layer_map()?;
636 :
637 192 : let readable_points = {
638 192 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
639 192 :
640 192 : let mut readable_points = Vec::with_capacity(children.len() + 1);
641 192 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
642 0 : if *is_offloaded == MaybeOffloaded::Yes {
643 0 : continue;
644 0 : }
645 0 : readable_points.push(*child_lsn);
646 : }
647 192 : readable_points.push(head_lsn);
648 192 : readable_points
649 192 : };
650 192 :
651 192 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
652 504 : for (layer_desc, visibility) in layer_visibility {
653 312 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
654 312 : let layer = layer_manager.get_from_desc(&layer_desc);
655 312 : layer.set_visibility(visibility);
656 312 : }
657 :
658 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
659 : // avoid assuming that everything at a branch point is visible.
660 192 : drop(covered);
661 192 : Ok(())
662 192 : }
663 :
664 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
665 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
666 364 : async fn compact_level0(
667 364 : self: &Arc<Self>,
668 364 : target_file_size: u64,
669 364 : force_compaction_ignore_threshold: bool,
670 364 : ctx: &RequestContext,
671 364 : ) -> Result<bool, CompactionError> {
672 : let CompactLevel0Phase1Result {
673 364 : new_layers,
674 364 : deltas_to_compact,
675 364 : fully_compacted,
676 : } = {
677 364 : let phase1_span = info_span!("compact_level0_phase1");
678 364 : let ctx = ctx.attached_child();
679 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
680 364 : version: Some(2),
681 364 : tenant_id: Some(self.tenant_shard_id),
682 364 : timeline_id: Some(self.timeline_id),
683 364 : ..Default::default()
684 364 : };
685 364 :
686 364 : let begin = tokio::time::Instant::now();
687 364 : let phase1_layers_locked = self.layers.read().await;
688 364 : let now = tokio::time::Instant::now();
689 364 : stats.read_lock_acquisition_micros =
690 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
691 364 : self.compact_level0_phase1(
692 364 : phase1_layers_locked,
693 364 : stats,
694 364 : target_file_size,
695 364 : force_compaction_ignore_threshold,
696 364 : &ctx,
697 364 : )
698 364 : .instrument(phase1_span)
699 9898 : .await?
700 : };
701 :
702 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
703 : // nothing to do
704 336 : return Ok(true);
705 28 : }
706 28 :
707 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
708 1 : .await?;
709 28 : Ok(fully_compacted)
710 364 : }
711 :
712 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
713 364 : async fn compact_level0_phase1<'a>(
714 364 : self: &'a Arc<Self>,
715 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
716 364 : mut stats: CompactLevel0Phase1StatsBuilder,
717 364 : target_file_size: u64,
718 364 : force_compaction_ignore_threshold: bool,
719 364 : ctx: &RequestContext,
720 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
721 364 : stats.read_lock_held_spawn_blocking_startup_micros =
722 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
723 364 : let layers = guard.layer_map()?;
724 364 : let level0_deltas = layers.level0_deltas();
725 364 : stats.level0_deltas_count = Some(level0_deltas.len());
726 364 :
727 364 : // Only compact if enough layers have accumulated.
728 364 : let threshold = self.get_compaction_threshold();
729 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
730 336 : if force_compaction_ignore_threshold {
731 0 : if !level0_deltas.is_empty() {
732 0 : info!(
733 0 : level0_deltas = level0_deltas.len(),
734 0 : threshold, "too few deltas to compact, but forcing compaction"
735 : );
736 : } else {
737 0 : info!(
738 0 : level0_deltas = level0_deltas.len(),
739 0 : threshold, "too few deltas to compact, cannot force compaction"
740 : );
741 0 : return Ok(CompactLevel0Phase1Result::default());
742 : }
743 : } else {
744 336 : debug!(
745 0 : level0_deltas = level0_deltas.len(),
746 0 : threshold, "too few deltas to compact"
747 : );
748 336 : return Ok(CompactLevel0Phase1Result::default());
749 : }
750 28 : }
751 :
752 28 : let mut level0_deltas = level0_deltas
753 28 : .iter()
754 402 : .map(|x| guard.get_from_desc(x))
755 28 : .collect::<Vec<_>>();
756 28 :
757 28 : // Gather the files to compact in this iteration.
758 28 : //
759 28 : // Start with the oldest Level 0 delta file, and collect any other
760 28 : // level 0 files that form a contiguous sequence, such that the end
761 28 : // LSN of previous file matches the start LSN of the next file.
762 28 : //
763 28 : // Note that if the files don't form such a sequence, we might
764 28 : // "compact" just a single file. That's a bit pointless, but it allows
765 28 : // us to get rid of the level 0 file, and compact the other files on
766 28 : // the next iteration. This could probably made smarter, but such
767 28 : // "gaps" in the sequence of level 0 files should only happen in case
768 28 : // of a crash, partial download from cloud storage, or something like
769 28 : // that, so it's not a big deal in practice.
770 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
771 28 : let mut level0_deltas_iter = level0_deltas.iter();
772 28 :
773 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
774 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
775 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
776 28 :
777 28 : // Accumulate the size of layers in `deltas_to_compact`
778 28 : let mut deltas_to_compact_bytes = 0;
779 28 :
780 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
781 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
782 28 : // work in this function to only operate on this much delta data at once.
783 28 : //
784 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
785 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
786 28 : // still let them compact a full stack of L0s in one go.
787 28 : let delta_size_limit = std::cmp::max(
788 28 : self.get_compaction_threshold(),
789 28 : DEFAULT_COMPACTION_THRESHOLD,
790 28 : ) as u64
791 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
792 28 :
793 28 : let mut fully_compacted = true;
794 28 :
795 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
796 402 : for l in level0_deltas_iter {
797 374 : let lsn_range = &l.layer_desc().lsn_range;
798 374 :
799 374 : if lsn_range.start != prev_lsn_end {
800 0 : break;
801 374 : }
802 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
803 374 : deltas_to_compact_bytes += l.metadata().file_size;
804 374 : prev_lsn_end = lsn_range.end;
805 374 :
806 374 : if deltas_to_compact_bytes >= delta_size_limit {
807 0 : info!(
808 0 : l0_deltas_selected = deltas_to_compact.len(),
809 0 : l0_deltas_total = level0_deltas.len(),
810 0 : "L0 compaction picker hit max delta layer size limit: {}",
811 : delta_size_limit
812 : );
813 0 : fully_compacted = false;
814 0 :
815 0 : // Proceed with compaction, but only a subset of L0s
816 0 : break;
817 374 : }
818 : }
819 28 : let lsn_range = Range {
820 28 : start: deltas_to_compact
821 28 : .first()
822 28 : .unwrap()
823 28 : .layer_desc()
824 28 : .lsn_range
825 28 : .start,
826 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
827 28 : };
828 28 :
829 28 : info!(
830 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
831 0 : lsn_range.start,
832 0 : lsn_range.end,
833 0 : deltas_to_compact.len(),
834 0 : level0_deltas.len()
835 : );
836 :
837 402 : for l in deltas_to_compact.iter() {
838 402 : info!("compact includes {l}");
839 : }
840 :
841 : // We don't need the original list of layers anymore. Drop it so that
842 : // we don't accidentally use it later in the function.
843 28 : drop(level0_deltas);
844 28 :
845 28 : stats.read_lock_held_prerequisites_micros = stats
846 28 : .read_lock_held_spawn_blocking_startup_micros
847 28 : .till_now();
848 :
849 : // TODO: replace with streaming k-merge
850 28 : let all_keys = {
851 28 : let mut all_keys = Vec::new();
852 402 : for l in deltas_to_compact.iter() {
853 402 : if self.cancel.is_cancelled() {
854 0 : return Err(CompactionError::ShuttingDown);
855 402 : }
856 2366 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
857 : }
858 : // The current stdlib sorting implementation is designed in a way where it is
859 : // particularly fast where the slice is made up of sorted sub-ranges.
860 4423776 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
861 28 : all_keys
862 28 : };
863 28 :
864 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
865 :
866 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
867 : //
868 : // A hole is a key range for which this compaction doesn't have any WAL records.
869 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
870 : // cover the hole, but actually don't contain any WAL records for that key range.
871 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
872 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
873 : //
874 : // The algorithm chooses holes as follows.
875 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
876 : // - Filter: min threshold on range length
877 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
878 : //
879 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
880 : #[derive(PartialEq, Eq)]
881 : struct Hole {
882 : key_range: Range<Key>,
883 : coverage_size: usize,
884 : }
885 28 : let holes: Vec<Hole> = {
886 : use std::cmp::Ordering;
887 : impl Ord for Hole {
888 0 : fn cmp(&self, other: &Self) -> Ordering {
889 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
890 0 : }
891 : }
892 : impl PartialOrd for Hole {
893 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
894 0 : Some(self.cmp(other))
895 0 : }
896 : }
897 28 : let max_holes = deltas_to_compact.len();
898 28 : let last_record_lsn = self.get_last_record_lsn();
899 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
900 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
901 28 : // min-heap (reserve space for one more element added before eviction)
902 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
903 28 : let mut prev: Option<Key> = None;
904 :
905 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
906 2064038 : if let Some(prev_key) = prev {
907 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
908 : // compaction is the gap between data key and metadata keys.
909 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
910 0 : && !Key::is_metadata_key(&prev_key)
911 : {
912 0 : let key_range = prev_key..next_key;
913 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
914 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
915 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
916 0 : // That is why it is better to measure size of hole as number of covering image layers.
917 0 : let coverage_size =
918 0 : layers.image_coverage(&key_range, last_record_lsn).len();
919 0 : if coverage_size >= min_hole_coverage_size {
920 0 : heap.push(Hole {
921 0 : key_range,
922 0 : coverage_size,
923 0 : });
924 0 : if heap.len() > max_holes {
925 0 : heap.pop(); // remove smallest hole
926 0 : }
927 0 : }
928 2064010 : }
929 28 : }
930 2064038 : prev = Some(next_key.next());
931 : }
932 28 : let mut holes = heap.into_vec();
933 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
934 28 : holes
935 28 : };
936 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
937 28 : drop_rlock(guard);
938 28 :
939 28 : if self.cancel.is_cancelled() {
940 0 : return Err(CompactionError::ShuttingDown);
941 28 : }
942 28 :
943 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
944 :
945 : // This iterator walks through all key-value pairs from all the layers
946 : // we're compacting, in key, LSN order.
947 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
948 : // then the Value::Image is ordered before Value::WalRecord.
949 28 : let mut all_values_iter = {
950 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
951 402 : for l in deltas_to_compact.iter() {
952 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
953 402 : deltas.push(l);
954 : }
955 28 : MergeIterator::create(&deltas, &[], ctx)
956 28 : };
957 28 :
958 28 : // This iterator walks through all keys and is needed to calculate size used by each key
959 28 : let mut all_keys_iter = all_keys
960 28 : .iter()
961 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
962 2064010 : .coalesce(|mut prev, cur| {
963 2064010 : // Coalesce keys that belong to the same key pair.
964 2064010 : // This ensures that compaction doesn't put them
965 2064010 : // into different layer files.
966 2064010 : // Still limit this by the target file size,
967 2064010 : // so that we keep the size of the files in
968 2064010 : // check.
969 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
970 40038 : prev.2 += cur.2;
971 40038 : Ok(prev)
972 : } else {
973 2023972 : Err((prev, cur))
974 : }
975 2064010 : });
976 28 :
977 28 : // Merge the contents of all the input delta layers into a new set
978 28 : // of delta layers, based on the current partitioning.
979 28 : //
980 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
981 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
982 28 : // would be too large. In that case, we also split on the LSN dimension.
983 28 : //
984 28 : // LSN
985 28 : // ^
986 28 : // |
987 28 : // | +-----------+ +--+--+--+--+
988 28 : // | | | | | | | |
989 28 : // | +-----------+ | | | | |
990 28 : // | | | | | | | |
991 28 : // | +-----------+ ==> | | | | |
992 28 : // | | | | | | | |
993 28 : // | +-----------+ | | | | |
994 28 : // | | | | | | | |
995 28 : // | +-----------+ +--+--+--+--+
996 28 : // |
997 28 : // +--------------> key
998 28 : //
999 28 : //
1000 28 : // If one key (X) has a lot of page versions:
1001 28 : //
1002 28 : // LSN
1003 28 : // ^
1004 28 : // | (X)
1005 28 : // | +-----------+ +--+--+--+--+
1006 28 : // | | | | | | | |
1007 28 : // | +-----------+ | | +--+ |
1008 28 : // | | | | | | | |
1009 28 : // | +-----------+ ==> | | | | |
1010 28 : // | | | | | +--+ |
1011 28 : // | +-----------+ | | | | |
1012 28 : // | | | | | | | |
1013 28 : // | +-----------+ +--+--+--+--+
1014 28 : // |
1015 28 : // +--------------> key
1016 28 : // TODO: this actually divides the layers into fixed-size chunks, not
1017 28 : // based on the partitioning.
1018 28 : //
1019 28 : // TODO: we should also opportunistically materialize and
1020 28 : // garbage collect what we can.
1021 28 : let mut new_layers = Vec::new();
1022 28 : let mut prev_key: Option<Key> = None;
1023 28 : let mut writer: Option<DeltaLayerWriter> = None;
1024 28 : let mut key_values_total_size = 0u64;
1025 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1026 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1027 28 : let mut next_hole = 0; // index of next hole in holes vector
1028 28 :
1029 28 : let mut keys = 0;
1030 :
1031 2064066 : while let Some((key, lsn, value)) = all_values_iter
1032 2064066 : .next()
1033 3425 : .await
1034 2064066 : .map_err(CompactionError::Other)?
1035 : {
1036 2064038 : keys += 1;
1037 2064038 :
1038 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1039 : // avoid hitting the cancellation token on every key. in benches, we end up
1040 : // shuffling an order of million keys per layer, this means we'll check it
1041 : // around tens of times per layer.
1042 0 : return Err(CompactionError::ShuttingDown);
1043 2064038 : }
1044 2064038 :
1045 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
1046 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1047 2064038 : if !same_key || lsn == dup_end_lsn {
1048 2024000 : let mut next_key_size = 0u64;
1049 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1050 2024000 : dup_start_lsn = Lsn::INVALID;
1051 2024000 : if !same_key {
1052 2024000 : dup_end_lsn = Lsn::INVALID;
1053 2024000 : }
1054 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1055 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1056 2024000 : next_key_size = next_size;
1057 2024000 : if key != next_key {
1058 2023972 : if dup_end_lsn.is_valid() {
1059 0 : // We are writting segment with duplicates:
1060 0 : // place all remaining values of this key in separate segment
1061 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1062 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1063 2023972 : }
1064 2023972 : break;
1065 28 : }
1066 28 : key_values_total_size += next_size;
1067 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1068 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1069 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1070 : // Split key between multiple layers: such layer can contain only single key
1071 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1072 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1073 : } else {
1074 0 : lsn // start with the first LSN for this key
1075 : };
1076 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1077 0 : break;
1078 28 : }
1079 : }
1080 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1081 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1082 0 : dup_start_lsn = dup_end_lsn;
1083 0 : dup_end_lsn = lsn_range.end;
1084 2024000 : }
1085 2024000 : if writer.is_some() {
1086 2023972 : let written_size = writer.as_mut().unwrap().size();
1087 2023972 : let contains_hole =
1088 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1089 : // check if key cause layer overflow or contains hole...
1090 2023972 : if is_dup_layer
1091 2023972 : || dup_end_lsn.is_valid()
1092 2023972 : || written_size + key_values_total_size > target_file_size
1093 2023692 : || contains_hole
1094 : {
1095 : // ... if so, flush previous layer and prepare to write new one
1096 280 : let (desc, path) = writer
1097 280 : .take()
1098 280 : .unwrap()
1099 280 : .finish(prev_key.unwrap().next(), ctx)
1100 711 : .await
1101 280 : .map_err(CompactionError::Other)?;
1102 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1103 280 : .map_err(CompactionError::Other)?;
1104 :
1105 280 : new_layers.push(new_delta);
1106 280 : writer = None;
1107 280 :
1108 280 : if contains_hole {
1109 0 : // skip hole
1110 0 : next_hole += 1;
1111 280 : }
1112 2023692 : }
1113 28 : }
1114 : // Remember size of key value because at next iteration we will access next item
1115 2024000 : key_values_total_size = next_key_size;
1116 40038 : }
1117 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1118 0 : Err(CompactionError::Other(anyhow::anyhow!(
1119 0 : "failpoint delta-layer-writer-fail-before-finish"
1120 0 : )))
1121 2064038 : });
1122 :
1123 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1124 2064038 : if writer.is_none() {
1125 308 : if self.cancel.is_cancelled() {
1126 : // to be somewhat responsive to cancellation, check for each new layer
1127 0 : return Err(CompactionError::ShuttingDown);
1128 308 : }
1129 : // Create writer if not initiaized yet
1130 308 : writer = Some(
1131 : DeltaLayerWriter::new(
1132 308 : self.conf,
1133 308 : self.timeline_id,
1134 308 : self.tenant_shard_id,
1135 308 : key,
1136 308 : if dup_end_lsn.is_valid() {
1137 : // this is a layer containing slice of values of the same key
1138 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1139 0 : dup_start_lsn..dup_end_lsn
1140 : } else {
1141 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1142 308 : lsn_range.clone()
1143 : },
1144 308 : ctx,
1145 : )
1146 154 : .await
1147 308 : .map_err(CompactionError::Other)?,
1148 : );
1149 :
1150 308 : keys = 0;
1151 2063730 : }
1152 :
1153 2064038 : writer
1154 2064038 : .as_mut()
1155 2064038 : .unwrap()
1156 2064038 : .put_value(key, lsn, value, ctx)
1157 1225 : .await
1158 2064038 : .map_err(CompactionError::Other)?;
1159 : } else {
1160 0 : debug!(
1161 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1162 0 : key,
1163 0 : self.shard_identity.get_shard_number(&key)
1164 : );
1165 : }
1166 :
1167 2064038 : if !new_layers.is_empty() {
1168 19786 : fail_point!("after-timeline-compacted-first-L1");
1169 2044252 : }
1170 :
1171 2064038 : prev_key = Some(key);
1172 : }
1173 28 : if let Some(writer) = writer {
1174 28 : let (desc, path) = writer
1175 28 : .finish(prev_key.unwrap().next(), ctx)
1176 1989 : .await
1177 28 : .map_err(CompactionError::Other)?;
1178 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1179 28 : .map_err(CompactionError::Other)?;
1180 28 : new_layers.push(new_delta);
1181 0 : }
1182 :
1183 : // Sync layers
1184 28 : if !new_layers.is_empty() {
1185 : // Print a warning if the created layer is larger than double the target size
1186 : // Add two pages for potential overhead. This should in theory be already
1187 : // accounted for in the target calculation, but for very small targets,
1188 : // we still might easily hit the limit otherwise.
1189 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1190 308 : for layer in new_layers.iter() {
1191 308 : if layer.layer_desc().file_size > warn_limit {
1192 0 : warn!(
1193 : %layer,
1194 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1195 : );
1196 308 : }
1197 : }
1198 :
1199 : // The writer.finish() above already did the fsync of the inodes.
1200 : // We just need to fsync the directory in which these inodes are linked,
1201 : // which we know to be the timeline directory.
1202 : //
1203 : // We use fatal_err() below because the after writer.finish() returns with success,
1204 : // the in-memory state of the filesystem already has the layer file in its final place,
1205 : // and subsequent pageserver code could think it's durable while it really isn't.
1206 28 : let timeline_dir = VirtualFile::open(
1207 28 : &self
1208 28 : .conf
1209 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1210 28 : ctx,
1211 28 : )
1212 14 : .await
1213 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1214 28 : timeline_dir
1215 28 : .sync_all()
1216 14 : .await
1217 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1218 0 : }
1219 :
1220 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1221 28 : stats.new_deltas_count = Some(new_layers.len());
1222 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1223 28 :
1224 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1225 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1226 : {
1227 28 : Ok(stats_json) => {
1228 28 : info!(
1229 0 : stats_json = stats_json.as_str(),
1230 0 : "compact_level0_phase1 stats available"
1231 : )
1232 : }
1233 0 : Err(e) => {
1234 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1235 : }
1236 : }
1237 :
1238 : // Without this, rustc complains about deltas_to_compact still
1239 : // being borrowed when we `.into_iter()` below.
1240 28 : drop(all_values_iter);
1241 28 :
1242 28 : Ok(CompactLevel0Phase1Result {
1243 28 : new_layers,
1244 28 : deltas_to_compact: deltas_to_compact
1245 28 : .into_iter()
1246 402 : .map(|x| x.drop_eviction_guard())
1247 28 : .collect::<Vec<_>>(),
1248 28 : fully_compacted,
1249 28 : })
1250 364 : }
1251 : }
1252 :
1253 : #[derive(Default)]
1254 : struct CompactLevel0Phase1Result {
1255 : new_layers: Vec<ResidentLayer>,
1256 : deltas_to_compact: Vec<Layer>,
1257 : // Whether we have included all L0 layers, or selected only part of them due to the
1258 : // L0 compaction size limit.
1259 : fully_compacted: bool,
1260 : }
1261 :
1262 : #[derive(Default)]
1263 : struct CompactLevel0Phase1StatsBuilder {
1264 : version: Option<u64>,
1265 : tenant_id: Option<TenantShardId>,
1266 : timeline_id: Option<TimelineId>,
1267 : read_lock_acquisition_micros: DurationRecorder,
1268 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1269 : read_lock_held_key_sort_micros: DurationRecorder,
1270 : read_lock_held_prerequisites_micros: DurationRecorder,
1271 : read_lock_held_compute_holes_micros: DurationRecorder,
1272 : read_lock_drop_micros: DurationRecorder,
1273 : write_layer_files_micros: DurationRecorder,
1274 : level0_deltas_count: Option<usize>,
1275 : new_deltas_count: Option<usize>,
1276 : new_deltas_size: Option<u64>,
1277 : }
1278 :
1279 : #[derive(serde::Serialize)]
1280 : struct CompactLevel0Phase1Stats {
1281 : version: u64,
1282 : tenant_id: TenantShardId,
1283 : timeline_id: TimelineId,
1284 : read_lock_acquisition_micros: RecordedDuration,
1285 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1286 : read_lock_held_key_sort_micros: RecordedDuration,
1287 : read_lock_held_prerequisites_micros: RecordedDuration,
1288 : read_lock_held_compute_holes_micros: RecordedDuration,
1289 : read_lock_drop_micros: RecordedDuration,
1290 : write_layer_files_micros: RecordedDuration,
1291 : level0_deltas_count: usize,
1292 : new_deltas_count: usize,
1293 : new_deltas_size: u64,
1294 : }
1295 :
1296 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1297 : type Error = anyhow::Error;
1298 :
1299 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1300 28 : Ok(Self {
1301 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1302 28 : tenant_id: value
1303 28 : .tenant_id
1304 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1305 28 : timeline_id: value
1306 28 : .timeline_id
1307 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1308 28 : read_lock_acquisition_micros: value
1309 28 : .read_lock_acquisition_micros
1310 28 : .into_recorded()
1311 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1312 28 : read_lock_held_spawn_blocking_startup_micros: value
1313 28 : .read_lock_held_spawn_blocking_startup_micros
1314 28 : .into_recorded()
1315 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1316 28 : read_lock_held_key_sort_micros: value
1317 28 : .read_lock_held_key_sort_micros
1318 28 : .into_recorded()
1319 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1320 28 : read_lock_held_prerequisites_micros: value
1321 28 : .read_lock_held_prerequisites_micros
1322 28 : .into_recorded()
1323 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1324 28 : read_lock_held_compute_holes_micros: value
1325 28 : .read_lock_held_compute_holes_micros
1326 28 : .into_recorded()
1327 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1328 28 : read_lock_drop_micros: value
1329 28 : .read_lock_drop_micros
1330 28 : .into_recorded()
1331 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1332 28 : write_layer_files_micros: value
1333 28 : .write_layer_files_micros
1334 28 : .into_recorded()
1335 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1336 28 : level0_deltas_count: value
1337 28 : .level0_deltas_count
1338 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1339 28 : new_deltas_count: value
1340 28 : .new_deltas_count
1341 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1342 28 : new_deltas_size: value
1343 28 : .new_deltas_size
1344 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1345 : })
1346 28 : }
1347 : }
1348 :
1349 : impl Timeline {
1350 : /// Entry point for new tiered compaction algorithm.
1351 : ///
1352 : /// All the real work is in the implementation in the pageserver_compaction
1353 : /// crate. The code here would apply to any algorithm implemented by the
1354 : /// same interface, but tiered is the only one at the moment.
1355 : ///
1356 : /// TODO: cancellation
1357 0 : pub(crate) async fn compact_tiered(
1358 0 : self: &Arc<Self>,
1359 0 : _cancel: &CancellationToken,
1360 0 : ctx: &RequestContext,
1361 0 : ) -> Result<(), CompactionError> {
1362 0 : let fanout = self.get_compaction_threshold() as u64;
1363 0 : let target_file_size = self.get_checkpoint_distance();
1364 :
1365 : // Find the top of the historical layers
1366 0 : let end_lsn = {
1367 0 : let guard = self.layers.read().await;
1368 0 : let layers = guard.layer_map()?;
1369 :
1370 0 : let l0_deltas = layers.level0_deltas();
1371 0 :
1372 0 : // As an optimization, if we find that there are too few L0 layers,
1373 0 : // bail out early. We know that the compaction algorithm would do
1374 0 : // nothing in that case.
1375 0 : if l0_deltas.len() < fanout as usize {
1376 : // doesn't need compacting
1377 0 : return Ok(());
1378 0 : }
1379 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1380 0 : };
1381 0 :
1382 0 : // Is the timeline being deleted?
1383 0 : if self.is_stopping() {
1384 0 : trace!("Dropping out of compaction on timeline shutdown");
1385 0 : return Err(CompactionError::ShuttingDown);
1386 0 : }
1387 :
1388 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1389 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1390 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1391 0 :
1392 0 : pageserver_compaction::compact_tiered::compact_tiered(
1393 0 : &mut adaptor,
1394 0 : end_lsn,
1395 0 : target_file_size,
1396 0 : fanout,
1397 0 : ctx,
1398 0 : )
1399 0 : .await
1400 : // TODO: compact_tiered needs to return CompactionError
1401 0 : .map_err(CompactionError::Other)?;
1402 :
1403 0 : adaptor.flush_updates().await?;
1404 0 : Ok(())
1405 0 : }
1406 :
1407 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1408 : ///
1409 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1410 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1411 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1412 : ///
1413 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1414 : ///
1415 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1416 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1417 : ///
1418 : /// The function will produce:
1419 : ///
1420 : /// ```plain
1421 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1422 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1423 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1424 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1425 : /// ```
1426 : ///
1427 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1428 430 : pub(crate) async fn generate_key_retention(
1429 430 : self: &Arc<Timeline>,
1430 430 : key: Key,
1431 430 : full_history: &[(Key, Lsn, Value)],
1432 430 : horizon: Lsn,
1433 430 : retain_lsn_below_horizon: &[Lsn],
1434 430 : delta_threshold_cnt: usize,
1435 430 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1436 430 : ) -> anyhow::Result<KeyHistoryRetention> {
1437 430 : // Pre-checks for the invariants
1438 430 : if cfg!(debug_assertions) {
1439 1040 : for (log_key, _, _) in full_history {
1440 610 : assert_eq!(log_key, &key, "mismatched key");
1441 : }
1442 430 : for i in 1..full_history.len() {
1443 180 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1444 180 : if full_history[i - 1].1 == full_history[i].1 {
1445 0 : assert!(
1446 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1447 0 : "unordered delta/image, or duplicated delta"
1448 : );
1449 180 : }
1450 : }
1451 : // There was an assertion for no base image that checks if the first
1452 : // record in the history is `will_init` before, but it was removed.
1453 : // This is explained in the test cases for generate_key_retention.
1454 : // Search "incomplete history" for more information.
1455 1000 : for lsn in retain_lsn_below_horizon {
1456 570 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1457 : }
1458 430 : for i in 1..retain_lsn_below_horizon.len() {
1459 278 : assert!(
1460 278 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1461 0 : "unordered LSN"
1462 : );
1463 : }
1464 0 : }
1465 430 : let has_ancestor = base_img_from_ancestor.is_some();
1466 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1467 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1468 430 : let (mut split_history, lsn_split_points) = {
1469 430 : let mut split_history = Vec::new();
1470 430 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1471 430 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1472 1000 : for lsn in retain_lsn_below_horizon {
1473 570 : lsn_split_points.push(*lsn);
1474 570 : }
1475 430 : lsn_split_points.push(horizon);
1476 430 : let mut current_idx = 0;
1477 1040 : for item @ (_, lsn, _) in full_history {
1478 772 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1479 162 : current_idx += 1;
1480 162 : }
1481 610 : split_history[current_idx].push(item);
1482 : }
1483 430 : (split_history, lsn_split_points)
1484 : };
1485 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1486 1860 : for split_for_lsn in &mut split_history {
1487 1430 : let mut prev_lsn = None;
1488 1430 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1489 1430 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1490 610 : if let Some(prev_lsn) = &prev_lsn {
1491 66 : if *prev_lsn == lsn {
1492 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1493 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1494 : // drop this delta and keep the image.
1495 : //
1496 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1497 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1498 : // dropped.
1499 : //
1500 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1501 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1502 0 : continue;
1503 66 : }
1504 544 : }
1505 610 : prev_lsn = Some(lsn);
1506 610 : new_split_for_lsn.push(record);
1507 : }
1508 1430 : *split_for_lsn = new_split_for_lsn;
1509 : }
1510 : // Step 3: generate images when necessary
1511 430 : let mut retention = Vec::with_capacity(split_history.len());
1512 430 : let mut records_since_last_image = 0;
1513 430 : let batch_cnt = split_history.len();
1514 430 : assert!(
1515 430 : batch_cnt >= 2,
1516 0 : "should have at least below + above horizon batches"
1517 : );
1518 430 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1519 430 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1520 18 : replay_history.push((key, lsn, Value::Image(img)));
1521 412 : }
1522 :
1523 : /// Generate debug information for the replay history
1524 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1525 : use std::fmt::Write;
1526 0 : let mut output = String::new();
1527 0 : if let Some((key, _, _)) = replay_history.first() {
1528 0 : write!(output, "key={} ", key).unwrap();
1529 0 : let mut cnt = 0;
1530 0 : for (_, lsn, val) in replay_history {
1531 0 : if val.is_image() {
1532 0 : write!(output, "i@{} ", lsn).unwrap();
1533 0 : } else if val.will_init() {
1534 0 : write!(output, "di@{} ", lsn).unwrap();
1535 0 : } else {
1536 0 : write!(output, "d@{} ", lsn).unwrap();
1537 0 : }
1538 0 : cnt += 1;
1539 0 : if cnt >= 128 {
1540 0 : write!(output, "... and more").unwrap();
1541 0 : break;
1542 0 : }
1543 : }
1544 0 : } else {
1545 0 : write!(output, "<no history>").unwrap();
1546 0 : }
1547 0 : output
1548 0 : }
1549 :
1550 0 : fn generate_debug_trace(
1551 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1552 0 : full_history: &[(Key, Lsn, Value)],
1553 0 : lsns: &[Lsn],
1554 0 : horizon: Lsn,
1555 0 : ) -> String {
1556 : use std::fmt::Write;
1557 0 : let mut output = String::new();
1558 0 : if let Some(replay_history) = replay_history {
1559 0 : writeln!(
1560 0 : output,
1561 0 : "replay_history: {}",
1562 0 : generate_history_trace(replay_history)
1563 0 : )
1564 0 : .unwrap();
1565 0 : } else {
1566 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1567 0 : }
1568 0 : writeln!(
1569 0 : output,
1570 0 : "full_history: {}",
1571 0 : generate_history_trace(full_history)
1572 0 : )
1573 0 : .unwrap();
1574 0 : writeln!(
1575 0 : output,
1576 0 : "when processing: [{}] horizon={}",
1577 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1578 0 : horizon
1579 0 : )
1580 0 : .unwrap();
1581 0 : output
1582 0 : }
1583 :
1584 1430 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1585 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1586 1430 : records_since_last_image += split_for_lsn.len();
1587 1430 : let generate_image = if i == 0 && !has_ancestor {
1588 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1589 412 : true
1590 1018 : } else if i == batch_cnt - 1 {
1591 : // Do not generate images for the last batch (above horizon)
1592 430 : false
1593 588 : } else if records_since_last_image >= delta_threshold_cnt {
1594 : // Generate images when there are too many records
1595 6 : true
1596 : } else {
1597 582 : false
1598 : };
1599 1430 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1600 : // Only retain the items after the last image record
1601 1758 : for idx in (0..replay_history.len()).rev() {
1602 1758 : if replay_history[idx].2.will_init() {
1603 1430 : replay_history = replay_history[idx..].to_vec();
1604 1430 : break;
1605 328 : }
1606 : }
1607 1430 : if let Some((_, _, val)) = replay_history.first() {
1608 1430 : if !val.will_init() {
1609 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1610 0 : || {
1611 0 : generate_debug_trace(
1612 0 : Some(&replay_history),
1613 0 : full_history,
1614 0 : retain_lsn_below_horizon,
1615 0 : horizon,
1616 0 : )
1617 0 : },
1618 0 : );
1619 1430 : }
1620 0 : }
1621 1430 : if generate_image && records_since_last_image > 0 {
1622 418 : records_since_last_image = 0;
1623 418 : let replay_history_for_debug = if cfg!(debug_assertions) {
1624 418 : Some(replay_history.clone())
1625 : } else {
1626 0 : None
1627 : };
1628 418 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1629 418 : let history = std::mem::take(&mut replay_history);
1630 418 : let mut img = None;
1631 418 : let mut records = Vec::with_capacity(history.len());
1632 418 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1633 418 : img = Some((*lsn, val.clone()));
1634 418 : for (_, lsn, val) in history.into_iter().skip(1) {
1635 34 : let Value::WalRecord(rec) = val else {
1636 0 : return Err(anyhow::anyhow!(
1637 0 : "invalid record, first record is image, expect walrecords"
1638 0 : ))
1639 0 : .with_context(|| {
1640 0 : generate_debug_trace(
1641 0 : replay_history_for_debug_ref,
1642 0 : full_history,
1643 0 : retain_lsn_below_horizon,
1644 0 : horizon,
1645 0 : )
1646 0 : });
1647 : };
1648 34 : records.push((lsn, rec));
1649 : }
1650 : } else {
1651 0 : for (_, lsn, val) in history.into_iter() {
1652 0 : let Value::WalRecord(rec) = val else {
1653 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1654 0 : .with_context(|| generate_debug_trace(
1655 0 : replay_history_for_debug_ref,
1656 0 : full_history,
1657 0 : retain_lsn_below_horizon,
1658 0 : horizon,
1659 0 : ));
1660 : };
1661 0 : records.push((lsn, rec));
1662 : }
1663 : }
1664 418 : records.reverse();
1665 418 : let state = ValueReconstructState { img, records };
1666 418 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1667 418 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1668 418 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1669 418 : retention.push(vec![(request_lsn, Value::Image(img))]);
1670 1012 : } else {
1671 1012 : let deltas = split_for_lsn
1672 1012 : .iter()
1673 1012 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1674 1012 : .collect_vec();
1675 1012 : retention.push(deltas);
1676 1012 : }
1677 : }
1678 430 : let mut result = Vec::with_capacity(retention.len());
1679 430 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1680 1430 : for (idx, logs) in retention.into_iter().enumerate() {
1681 1430 : if idx == lsn_split_points.len() {
1682 430 : return Ok(KeyHistoryRetention {
1683 430 : below_horizon: result,
1684 430 : above_horizon: KeyLogAtLsn(logs),
1685 430 : });
1686 1000 : } else {
1687 1000 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1688 1000 : }
1689 : }
1690 0 : unreachable!("key retention is empty")
1691 430 : }
1692 :
1693 : /// Check how much space is left on the disk
1694 26 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
1695 26 : let tenants_dir = self.conf.tenants_path();
1696 :
1697 26 : let stat = Statvfs::get(&tenants_dir, None)
1698 26 : .context("statvfs failed, presumably directory got unlinked")?;
1699 :
1700 26 : let (avail_bytes, _) = stat.get_avail_total_bytes();
1701 26 :
1702 26 : Ok(avail_bytes)
1703 26 : }
1704 :
1705 : /// Check if the compaction can proceed safely without running out of space. We assume the size
1706 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
1707 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
1708 : /// compaction.
1709 26 : async fn check_compaction_space(
1710 26 : self: &Arc<Self>,
1711 26 : layer_selection: &[Layer],
1712 26 : ) -> anyhow::Result<()> {
1713 26 : let available_space = self.check_available_space().await?;
1714 26 : let mut remote_layer_size = 0;
1715 26 : let mut all_layer_size = 0;
1716 108 : for layer in layer_selection {
1717 82 : let needs_download = layer.needs_download().await?;
1718 82 : if needs_download.is_some() {
1719 0 : remote_layer_size += layer.layer_desc().file_size;
1720 82 : }
1721 82 : all_layer_size += layer.layer_desc().file_size;
1722 : }
1723 26 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
1724 26 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
1725 : {
1726 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
1727 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
1728 26 : }
1729 26 : Ok(())
1730 26 : }
1731 :
1732 : /// An experimental compaction building block that combines compaction with garbage collection.
1733 : ///
1734 : /// The current implementation picks all delta + image layers that are below or intersecting with
1735 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1736 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1737 : /// and create delta layers with all deltas >= gc horizon.
1738 26 : pub(crate) async fn compact_with_gc(
1739 26 : self: &Arc<Self>,
1740 26 : cancel: &CancellationToken,
1741 26 : flags: EnumSet<CompactFlags>,
1742 26 : ctx: &RequestContext,
1743 26 : ) -> anyhow::Result<()> {
1744 : use std::collections::BTreeSet;
1745 :
1746 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1747 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1748 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1749 :
1750 26 : let gc_lock = async {
1751 26 : tokio::select! {
1752 26 : guard = self.gc_lock.lock() => Ok(guard),
1753 : // TODO: refactor to CompactionError to correctly pass cancelled error
1754 26 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1755 : }
1756 26 : };
1757 :
1758 26 : let gc_lock = crate::timed(
1759 26 : gc_lock,
1760 26 : "acquires gc lock",
1761 26 : std::time::Duration::from_secs(5),
1762 26 : )
1763 1 : .await?;
1764 :
1765 26 : let dry_run = flags.contains(CompactFlags::DryRun);
1766 26 :
1767 26 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
1768 :
1769 26 : scopeguard::defer! {
1770 26 : info!("done enhanced gc bottom-most compaction");
1771 26 : };
1772 26 :
1773 26 : let mut stat = CompactionStatistics::default();
1774 :
1775 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1776 : // The layer selection has the following properties:
1777 : // 1. If a layer is in the selection, all layers below it are in the selection.
1778 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1779 26 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1780 26 : let guard = self.layers.read().await;
1781 26 : let layers = guard.layer_map()?;
1782 26 : let gc_info = self.gc_info.read().unwrap();
1783 26 : let mut retain_lsns_below_horizon = Vec::new();
1784 26 : let gc_cutoff = gc_info.cutoffs.select_min();
1785 34 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
1786 34 : if lsn < &gc_cutoff {
1787 34 : retain_lsns_below_horizon.push(*lsn);
1788 34 : }
1789 : }
1790 26 : for lsn in gc_info.leases.keys() {
1791 0 : if lsn < &gc_cutoff {
1792 0 : retain_lsns_below_horizon.push(*lsn);
1793 0 : }
1794 : }
1795 26 : let mut selected_layers = Vec::new();
1796 26 : drop(gc_info);
1797 : // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
1798 26 : let Some(max_layer_lsn) = layers
1799 26 : .iter_historic_layers()
1800 100 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
1801 82 : .map(|desc| desc.get_lsn_range().end)
1802 26 : .max()
1803 : else {
1804 0 : info!("no layers to compact with gc");
1805 0 : return Ok(());
1806 : };
1807 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
1808 : // layers to compact.
1809 100 : for desc in layers.iter_historic_layers() {
1810 100 : if desc.get_lsn_range().end <= max_layer_lsn {
1811 82 : selected_layers.push(guard.get_from_desc(&desc));
1812 82 : }
1813 : }
1814 26 : if selected_layers.is_empty() {
1815 0 : info!("no layers to compact with gc");
1816 0 : return Ok(());
1817 26 : }
1818 26 : retain_lsns_below_horizon.sort();
1819 26 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1820 : };
1821 26 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1822 2 : Lsn(self.ancestor_lsn.0 + 1)
1823 : } else {
1824 24 : let res = retain_lsns_below_horizon
1825 24 : .first()
1826 24 : .copied()
1827 24 : .unwrap_or(gc_cutoff);
1828 24 : if cfg!(debug_assertions) {
1829 24 : assert_eq!(
1830 24 : res,
1831 24 : retain_lsns_below_horizon
1832 24 : .iter()
1833 24 : .min()
1834 24 : .copied()
1835 24 : .unwrap_or(gc_cutoff)
1836 24 : );
1837 0 : }
1838 24 : res
1839 : };
1840 26 : info!(
1841 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1842 0 : layer_selection.len(),
1843 : gc_cutoff,
1844 : lowest_retain_lsn
1845 : );
1846 :
1847 28 : self.check_compaction_space(&layer_selection).await?;
1848 :
1849 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1850 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
1851 26 : let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
1852 108 : for layer in &layer_selection {
1853 82 : let desc = layer.layer_desc();
1854 82 : if desc.is_delta() {
1855 : // ignore single-key layer files
1856 46 : if desc.key_range.start.next() != desc.key_range.end {
1857 34 : let lsn_range = &desc.lsn_range;
1858 34 : lsn_split_point.insert(lsn_range.start);
1859 34 : lsn_split_point.insert(lsn_range.end);
1860 34 : }
1861 46 : stat.visit_delta_layer(desc.file_size());
1862 36 : } else {
1863 36 : stat.visit_image_layer(desc.file_size());
1864 36 : }
1865 : }
1866 26 : let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
1867 26 : .iter()
1868 82 : .map(|layer| layer.layer_desc().layer_name())
1869 26 : .collect_vec();
1870 26 : if let Some(err) = check_valid_layermap(&layer_names) {
1871 0 : bail!("cannot run gc-compaction because {}", err);
1872 26 : }
1873 26 : // The maximum LSN we are processing in this compaction loop
1874 26 : let end_lsn = layer_selection
1875 26 : .iter()
1876 82 : .map(|l| l.layer_desc().lsn_range.end)
1877 26 : .max()
1878 26 : .unwrap();
1879 26 : // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
1880 26 : // as an L0 layer.
1881 26 : let mut delta_layers = Vec::new();
1882 26 : let mut image_layers = Vec::new();
1883 26 : let mut downloaded_layers = Vec::new();
1884 108 : for layer in &layer_selection {
1885 82 : let resident_layer = layer.download_and_keep_resident().await?;
1886 82 : downloaded_layers.push(resident_layer);
1887 : }
1888 108 : for resident_layer in &downloaded_layers {
1889 82 : if resident_layer.layer_desc().is_delta() {
1890 46 : let layer = resident_layer.get_as_delta(ctx).await?;
1891 46 : delta_layers.push(layer);
1892 : } else {
1893 36 : let layer = resident_layer.get_as_image(ctx).await?;
1894 36 : image_layers.push(layer);
1895 : }
1896 : }
1897 26 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
1898 26 : let mut merge_iter = FilterIterator::create(
1899 26 : MergeIterator::create(&delta_layers, &image_layers, ctx),
1900 26 : dense_ks,
1901 26 : sparse_ks,
1902 26 : )?;
1903 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1904 : // Data of the same key.
1905 26 : let mut accumulated_values = Vec::new();
1906 26 : let mut last_key: Option<Key> = None;
1907 :
1908 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
1909 : // when some condition meet.
1910 26 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
1911 : Some(
1912 24 : SplitImageLayerWriter::new(
1913 24 : self.conf,
1914 24 : self.timeline_id,
1915 24 : self.tenant_shard_id,
1916 24 : Key::MIN,
1917 24 : lowest_retain_lsn,
1918 24 : self.get_compaction_target_size(),
1919 24 : ctx,
1920 24 : )
1921 12 : .await?,
1922 : )
1923 : } else {
1924 2 : None
1925 : };
1926 :
1927 26 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
1928 26 : self.conf,
1929 26 : self.timeline_id,
1930 26 : self.tenant_shard_id,
1931 26 : lowest_retain_lsn..end_lsn,
1932 26 : self.get_compaction_target_size(),
1933 26 : )
1934 0 : .await?;
1935 :
1936 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
1937 : ///
1938 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
1939 : /// is needed for reconstruction. This should be fixed in the future.
1940 : ///
1941 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
1942 : /// images.
1943 422 : async fn get_ancestor_image(
1944 422 : tline: &Arc<Timeline>,
1945 422 : key: Key,
1946 422 : ctx: &RequestContext,
1947 422 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
1948 422 : if tline.ancestor_timeline.is_none() {
1949 408 : return Ok(None);
1950 14 : };
1951 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
1952 : // as much existing code as possible.
1953 14 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
1954 14 : Ok(Some((key, tline.ancestor_lsn, img)))
1955 422 : }
1956 :
1957 : // Actually, we can decide not to write to the image layer at all at this point because
1958 : // the key and LSN range are determined. However, to keep things simple here, we still
1959 : // create this writer, and discard the writer in the end.
1960 :
1961 586 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1962 560 : if cancel.is_cancelled() {
1963 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
1964 560 : }
1965 560 : match val {
1966 420 : Value::Image(_) => stat.visit_image_key(&val),
1967 140 : Value::WalRecord(_) => stat.visit_wal_key(&val),
1968 : }
1969 560 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1970 164 : if last_key.is_none() {
1971 26 : last_key = Some(key);
1972 138 : }
1973 164 : accumulated_values.push((key, lsn, val));
1974 : } else {
1975 396 : let last_key = last_key.as_mut().unwrap();
1976 396 : stat.on_unique_key_visited();
1977 396 : let retention = self
1978 396 : .generate_key_retention(
1979 396 : *last_key,
1980 396 : &accumulated_values,
1981 396 : gc_cutoff,
1982 396 : &retain_lsns_below_horizon,
1983 396 : COMPACTION_DELTA_THRESHOLD,
1984 396 : get_ancestor_image(self, *last_key, ctx).await?,
1985 : )
1986 0 : .await?;
1987 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1988 396 : retention
1989 396 : .pipe_to(
1990 396 : *last_key,
1991 396 : self,
1992 396 : &mut delta_layer_writer,
1993 396 : image_layer_writer.as_mut(),
1994 396 : &mut stat,
1995 396 : dry_run,
1996 396 : ctx,
1997 396 : )
1998 397 : .await?;
1999 396 : accumulated_values.clear();
2000 396 : *last_key = key;
2001 396 : accumulated_values.push((key, lsn, val));
2002 : }
2003 : }
2004 :
2005 26 : let last_key = last_key.expect("no keys produced during compaction");
2006 26 : // TODO: move this part to the loop body
2007 26 : stat.on_unique_key_visited();
2008 26 : let retention = self
2009 26 : .generate_key_retention(
2010 26 : last_key,
2011 26 : &accumulated_values,
2012 26 : gc_cutoff,
2013 26 : &retain_lsns_below_horizon,
2014 26 : COMPACTION_DELTA_THRESHOLD,
2015 26 : get_ancestor_image(self, last_key, ctx).await?,
2016 : )
2017 0 : .await?;
2018 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
2019 26 : retention
2020 26 : .pipe_to(
2021 26 : last_key,
2022 26 : self,
2023 26 : &mut delta_layer_writer,
2024 26 : image_layer_writer.as_mut(),
2025 26 : &mut stat,
2026 26 : dry_run,
2027 26 : ctx,
2028 26 : )
2029 24 : .await?;
2030 :
2031 38 : let discard = |key: &PersistentLayerKey| {
2032 38 : let key = key.clone();
2033 38 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2034 38 : };
2035 :
2036 26 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2037 24 : if !dry_run {
2038 20 : writer
2039 20 : .finish_with_discard_fn(self, ctx, Key::MAX, discard)
2040 25 : .await?
2041 : } else {
2042 4 : drop(writer);
2043 4 : Vec::new()
2044 : }
2045 : } else {
2046 2 : Vec::new()
2047 : };
2048 :
2049 26 : let produced_delta_layers = if !dry_run {
2050 22 : delta_layer_writer
2051 22 : .finish_with_discard_fn(self, ctx, discard)
2052 25 : .await?
2053 : } else {
2054 4 : let (layers, _) = delta_layer_writer.take()?;
2055 4 : assert!(layers.is_empty(), "delta layers produced in dry run mode?");
2056 4 : Vec::new()
2057 : };
2058 :
2059 26 : let mut compact_to = Vec::new();
2060 26 : let mut keep_layers = HashSet::new();
2061 26 : let produced_delta_layers_len = produced_delta_layers.len();
2062 26 : let produced_image_layers_len = produced_image_layers.len();
2063 44 : for action in produced_delta_layers {
2064 18 : match action {
2065 10 : SplitWriterResult::Produced(layer) => {
2066 10 : stat.produce_delta_layer(layer.layer_desc().file_size());
2067 10 : compact_to.push(layer);
2068 10 : }
2069 8 : SplitWriterResult::Discarded(l) => {
2070 8 : keep_layers.insert(l);
2071 8 : stat.discard_delta_layer();
2072 8 : }
2073 : }
2074 : }
2075 46 : for action in produced_image_layers {
2076 20 : match action {
2077 12 : SplitWriterResult::Produced(layer) => {
2078 12 : stat.produce_image_layer(layer.layer_desc().file_size());
2079 12 : compact_to.push(layer);
2080 12 : }
2081 8 : SplitWriterResult::Discarded(l) => {
2082 8 : keep_layers.insert(l);
2083 8 : stat.discard_image_layer();
2084 8 : }
2085 : }
2086 : }
2087 26 : let mut layer_selection = layer_selection;
2088 82 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2089 26 :
2090 26 : info!(
2091 0 : "gc-compaction statistics: {}",
2092 0 : serde_json::to_string(&stat)?
2093 : );
2094 :
2095 26 : if dry_run {
2096 4 : return Ok(());
2097 22 : }
2098 22 :
2099 22 : info!(
2100 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2101 0 : produced_delta_layers_len,
2102 0 : produced_image_layers_len,
2103 0 : layer_selection.len()
2104 : );
2105 :
2106 : // Step 3: Place back to the layer map.
2107 : {
2108 22 : let mut guard = self.layers.write().await;
2109 22 : guard
2110 22 : .open_mut()?
2111 22 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2112 22 : };
2113 22 : self.remote_client
2114 22 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2115 :
2116 22 : drop(gc_lock);
2117 22 :
2118 22 : Ok(())
2119 26 : }
2120 : }
2121 :
2122 : struct TimelineAdaptor {
2123 : timeline: Arc<Timeline>,
2124 :
2125 : keyspace: (Lsn, KeySpace),
2126 :
2127 : new_deltas: Vec<ResidentLayer>,
2128 : new_images: Vec<ResidentLayer>,
2129 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2130 : }
2131 :
2132 : impl TimelineAdaptor {
2133 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2134 0 : Self {
2135 0 : timeline: timeline.clone(),
2136 0 : keyspace,
2137 0 : new_images: Vec::new(),
2138 0 : new_deltas: Vec::new(),
2139 0 : layers_to_delete: Vec::new(),
2140 0 : }
2141 0 : }
2142 :
2143 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2144 0 : let layers_to_delete = {
2145 0 : let guard = self.timeline.layers.read().await;
2146 0 : self.layers_to_delete
2147 0 : .iter()
2148 0 : .map(|x| guard.get_from_desc(x))
2149 0 : .collect::<Vec<Layer>>()
2150 0 : };
2151 0 : self.timeline
2152 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2153 0 : .await?;
2154 :
2155 0 : self.timeline
2156 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2157 :
2158 0 : self.new_deltas.clear();
2159 0 : self.layers_to_delete.clear();
2160 0 : Ok(())
2161 0 : }
2162 : }
2163 :
2164 : #[derive(Clone)]
2165 : struct ResidentDeltaLayer(ResidentLayer);
2166 : #[derive(Clone)]
2167 : struct ResidentImageLayer(ResidentLayer);
2168 :
2169 : impl CompactionJobExecutor for TimelineAdaptor {
2170 : type Key = crate::repository::Key;
2171 :
2172 : type Layer = OwnArc<PersistentLayerDesc>;
2173 : type DeltaLayer = ResidentDeltaLayer;
2174 : type ImageLayer = ResidentImageLayer;
2175 :
2176 : type RequestContext = crate::context::RequestContext;
2177 :
2178 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2179 0 : self.timeline.get_shard_identity()
2180 0 : }
2181 :
2182 0 : async fn get_layers(
2183 0 : &mut self,
2184 0 : key_range: &Range<Key>,
2185 0 : lsn_range: &Range<Lsn>,
2186 0 : _ctx: &RequestContext,
2187 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2188 0 : self.flush_updates().await?;
2189 :
2190 0 : let guard = self.timeline.layers.read().await;
2191 0 : let layer_map = guard.layer_map()?;
2192 :
2193 0 : let result = layer_map
2194 0 : .iter_historic_layers()
2195 0 : .filter(|l| {
2196 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2197 0 : })
2198 0 : .map(OwnArc)
2199 0 : .collect();
2200 0 : Ok(result)
2201 0 : }
2202 :
2203 0 : async fn get_keyspace(
2204 0 : &mut self,
2205 0 : key_range: &Range<Key>,
2206 0 : lsn: Lsn,
2207 0 : _ctx: &RequestContext,
2208 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2209 0 : if lsn == self.keyspace.0 {
2210 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2211 0 : &self.keyspace.1.ranges,
2212 0 : key_range,
2213 0 : ))
2214 : } else {
2215 : // The current compaction implementation only ever requests the key space
2216 : // at the compaction end LSN.
2217 0 : anyhow::bail!("keyspace not available for requested lsn");
2218 : }
2219 0 : }
2220 :
2221 0 : async fn downcast_delta_layer(
2222 0 : &self,
2223 0 : layer: &OwnArc<PersistentLayerDesc>,
2224 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2225 0 : // this is a lot more complex than a simple downcast...
2226 0 : if layer.is_delta() {
2227 0 : let l = {
2228 0 : let guard = self.timeline.layers.read().await;
2229 0 : guard.get_from_desc(layer)
2230 : };
2231 0 : let result = l.download_and_keep_resident().await?;
2232 :
2233 0 : Ok(Some(ResidentDeltaLayer(result)))
2234 : } else {
2235 0 : Ok(None)
2236 : }
2237 0 : }
2238 :
2239 0 : async fn create_image(
2240 0 : &mut self,
2241 0 : lsn: Lsn,
2242 0 : key_range: &Range<Key>,
2243 0 : ctx: &RequestContext,
2244 0 : ) -> anyhow::Result<()> {
2245 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2246 0 : }
2247 :
2248 0 : async fn create_delta(
2249 0 : &mut self,
2250 0 : lsn_range: &Range<Lsn>,
2251 0 : key_range: &Range<Key>,
2252 0 : input_layers: &[ResidentDeltaLayer],
2253 0 : ctx: &RequestContext,
2254 0 : ) -> anyhow::Result<()> {
2255 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2256 :
2257 0 : let mut all_entries = Vec::new();
2258 0 : for dl in input_layers.iter() {
2259 0 : all_entries.extend(dl.load_keys(ctx).await?);
2260 : }
2261 :
2262 : // The current stdlib sorting implementation is designed in a way where it is
2263 : // particularly fast where the slice is made up of sorted sub-ranges.
2264 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2265 :
2266 0 : let mut writer = DeltaLayerWriter::new(
2267 0 : self.timeline.conf,
2268 0 : self.timeline.timeline_id,
2269 0 : self.timeline.tenant_shard_id,
2270 0 : key_range.start,
2271 0 : lsn_range.clone(),
2272 0 : ctx,
2273 0 : )
2274 0 : .await?;
2275 :
2276 0 : let mut dup_values = 0;
2277 0 :
2278 0 : // This iterator walks through all key-value pairs from all the layers
2279 0 : // we're compacting, in key, LSN order.
2280 0 : let mut prev: Option<(Key, Lsn)> = None;
2281 : for &DeltaEntry {
2282 0 : key, lsn, ref val, ..
2283 0 : } in all_entries.iter()
2284 : {
2285 0 : if prev == Some((key, lsn)) {
2286 : // This is a duplicate. Skip it.
2287 : //
2288 : // It can happen if compaction is interrupted after writing some
2289 : // layers but not all, and we are compacting the range again.
2290 : // The calculations in the algorithm assume that there are no
2291 : // duplicates, so the math on targeted file size is likely off,
2292 : // and we will create smaller files than expected.
2293 0 : dup_values += 1;
2294 0 : continue;
2295 0 : }
2296 :
2297 0 : let value = val.load(ctx).await?;
2298 :
2299 0 : writer.put_value(key, lsn, value, ctx).await?;
2300 :
2301 0 : prev = Some((key, lsn));
2302 : }
2303 :
2304 0 : if dup_values > 0 {
2305 0 : warn!("delta layer created with {} duplicate values", dup_values);
2306 0 : }
2307 :
2308 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2309 0 : Err(anyhow::anyhow!(
2310 0 : "failpoint delta-layer-writer-fail-before-finish"
2311 0 : ))
2312 0 : });
2313 :
2314 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2315 0 : let new_delta_layer =
2316 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2317 :
2318 0 : self.new_deltas.push(new_delta_layer);
2319 0 : Ok(())
2320 0 : }
2321 :
2322 0 : async fn delete_layer(
2323 0 : &mut self,
2324 0 : layer: &OwnArc<PersistentLayerDesc>,
2325 0 : _ctx: &RequestContext,
2326 0 : ) -> anyhow::Result<()> {
2327 0 : self.layers_to_delete.push(layer.clone().0);
2328 0 : Ok(())
2329 0 : }
2330 : }
2331 :
2332 : impl TimelineAdaptor {
2333 0 : async fn create_image_impl(
2334 0 : &mut self,
2335 0 : lsn: Lsn,
2336 0 : key_range: &Range<Key>,
2337 0 : ctx: &RequestContext,
2338 0 : ) -> Result<(), CreateImageLayersError> {
2339 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2340 :
2341 0 : let image_layer_writer = ImageLayerWriter::new(
2342 0 : self.timeline.conf,
2343 0 : self.timeline.timeline_id,
2344 0 : self.timeline.tenant_shard_id,
2345 0 : key_range,
2346 0 : lsn,
2347 0 : ctx,
2348 0 : )
2349 0 : .await?;
2350 :
2351 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2352 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2353 0 : "failpoint image-layer-writer-fail-before-finish"
2354 0 : )))
2355 0 : });
2356 :
2357 0 : let keyspace = KeySpace {
2358 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2359 : };
2360 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2361 0 : let start = Key::MIN;
2362 : let ImageLayerCreationOutcome {
2363 0 : image,
2364 : next_start_key: _,
2365 0 : } = self
2366 0 : .timeline
2367 0 : .create_image_layer_for_rel_blocks(
2368 0 : &keyspace,
2369 0 : image_layer_writer,
2370 0 : lsn,
2371 0 : ctx,
2372 0 : key_range.clone(),
2373 0 : start,
2374 0 : )
2375 0 : .await?;
2376 :
2377 0 : if let Some(image_layer) = image {
2378 0 : self.new_images.push(image_layer);
2379 0 : }
2380 :
2381 0 : timer.stop_and_record();
2382 0 :
2383 0 : Ok(())
2384 0 : }
2385 : }
2386 :
2387 : impl CompactionRequestContext for crate::context::RequestContext {}
2388 :
2389 : #[derive(Debug, Clone)]
2390 : pub struct OwnArc<T>(pub Arc<T>);
2391 :
2392 : impl<T> Deref for OwnArc<T> {
2393 : type Target = <Arc<T> as Deref>::Target;
2394 0 : fn deref(&self) -> &Self::Target {
2395 0 : &self.0
2396 0 : }
2397 : }
2398 :
2399 : impl<T> AsRef<T> for OwnArc<T> {
2400 0 : fn as_ref(&self) -> &T {
2401 0 : self.0.as_ref()
2402 0 : }
2403 : }
2404 :
2405 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2406 0 : fn key_range(&self) -> &Range<Key> {
2407 0 : &self.key_range
2408 0 : }
2409 0 : fn lsn_range(&self) -> &Range<Lsn> {
2410 0 : &self.lsn_range
2411 0 : }
2412 0 : fn file_size(&self) -> u64 {
2413 0 : self.file_size
2414 0 : }
2415 0 : fn short_id(&self) -> std::string::String {
2416 0 : self.as_ref().short_id().to_string()
2417 0 : }
2418 0 : fn is_delta(&self) -> bool {
2419 0 : self.as_ref().is_delta()
2420 0 : }
2421 : }
2422 :
2423 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2424 0 : fn key_range(&self) -> &Range<Key> {
2425 0 : &self.layer_desc().key_range
2426 0 : }
2427 0 : fn lsn_range(&self) -> &Range<Lsn> {
2428 0 : &self.layer_desc().lsn_range
2429 0 : }
2430 0 : fn file_size(&self) -> u64 {
2431 0 : self.layer_desc().file_size
2432 0 : }
2433 0 : fn short_id(&self) -> std::string::String {
2434 0 : self.layer_desc().short_id().to_string()
2435 0 : }
2436 0 : fn is_delta(&self) -> bool {
2437 0 : true
2438 0 : }
2439 : }
2440 :
2441 : use crate::tenant::timeline::DeltaEntry;
2442 :
2443 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2444 0 : fn key_range(&self) -> &Range<Key> {
2445 0 : &self.0.layer_desc().key_range
2446 0 : }
2447 0 : fn lsn_range(&self) -> &Range<Lsn> {
2448 0 : &self.0.layer_desc().lsn_range
2449 0 : }
2450 0 : fn file_size(&self) -> u64 {
2451 0 : self.0.layer_desc().file_size
2452 0 : }
2453 0 : fn short_id(&self) -> std::string::String {
2454 0 : self.0.layer_desc().short_id().to_string()
2455 0 : }
2456 0 : fn is_delta(&self) -> bool {
2457 0 : true
2458 0 : }
2459 : }
2460 :
2461 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2462 : type DeltaEntry<'a> = DeltaEntry<'a>;
2463 :
2464 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2465 0 : self.0.load_keys(ctx).await
2466 0 : }
2467 : }
2468 :
2469 : impl CompactionLayer<Key> for ResidentImageLayer {
2470 0 : fn key_range(&self) -> &Range<Key> {
2471 0 : &self.0.layer_desc().key_range
2472 0 : }
2473 0 : fn lsn_range(&self) -> &Range<Lsn> {
2474 0 : &self.0.layer_desc().lsn_range
2475 0 : }
2476 0 : fn file_size(&self) -> u64 {
2477 0 : self.0.layer_desc().file_size
2478 0 : }
2479 0 : fn short_id(&self) -> std::string::String {
2480 0 : self.0.layer_desc().short_id().to_string()
2481 0 : }
2482 0 : fn is_delta(&self) -> bool {
2483 0 : false
2484 0 : }
2485 : }
2486 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|