Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
25 : use serde::Serialize;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{debug, info, info_span, trace, warn, Instrument};
28 : use utils::id::TimelineId;
29 :
30 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
31 : use crate::page_cache;
32 : use crate::tenant::remote_timeline_client::WaitCompletionError;
33 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
34 : use crate::tenant::storage_layer::split_writer::{
35 : SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
36 : };
37 : use crate::tenant::storage_layer::{
38 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
39 : };
40 : use crate::tenant::timeline::ImageLayerCreationOutcome;
41 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
42 : use crate::tenant::timeline::{Layer, ResidentLayer};
43 : use crate::tenant::DeltaLayer;
44 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
45 : use pageserver_api::config::tenant_conf_defaults::{
46 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
47 : };
48 :
49 : use crate::keyspace::KeySpace;
50 : use crate::repository::{Key, Value};
51 : use crate::walrecord::NeonWalRecord;
52 :
53 : use utils::lsn::Lsn;
54 :
55 : use pageserver_compaction::helpers::overlaps_with;
56 : use pageserver_compaction::interface::*;
57 :
58 : use super::CompactionError;
59 :
60 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
61 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
62 :
63 : /// The result of bottom-most compaction for a single key at each LSN.
64 : #[derive(Debug)]
65 : #[cfg_attr(test, derive(PartialEq))]
66 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
67 :
68 : /// The result of bottom-most compaction.
69 : #[derive(Debug)]
70 : #[cfg_attr(test, derive(PartialEq))]
71 : pub(crate) struct KeyHistoryRetention {
72 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
73 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
74 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
75 : pub(crate) above_horizon: KeyLogAtLsn,
76 : }
77 :
78 : impl KeyHistoryRetention {
79 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
80 : ///
81 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
82 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
83 : /// And we have branches at LSN 0x10, 0x20, 0x30.
84 : /// Then we delete branch @ 0x20.
85 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
86 : /// And that wouldnt' change the shape of the layer.
87 : ///
88 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
89 : ///
90 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
91 114 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
92 114 : if dry_run {
93 0 : return true;
94 114 : }
95 114 : let guard = tline.layers.read().await;
96 114 : if !guard.contains_key(key) {
97 66 : return false;
98 48 : }
99 48 : let layer_generation = guard.get_from_key(key).metadata().generation;
100 48 : drop(guard);
101 48 : if layer_generation == tline.generation {
102 48 : info!(
103 : key=%key,
104 : ?layer_generation,
105 0 : "discard layer due to duplicated layer key in the same generation",
106 : );
107 48 : true
108 : } else {
109 0 : false
110 : }
111 114 : }
112 :
113 : /// Pipe a history of a single key to the writers.
114 : ///
115 : /// If `image_writer` is none, the images will be placed into the delta layers.
116 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
117 : #[allow(clippy::too_many_arguments)]
118 1266 : async fn pipe_to(
119 1266 : self,
120 1266 : key: Key,
121 1266 : tline: &Arc<Timeline>,
122 1266 : delta_writer: &mut SplitDeltaLayerWriter,
123 1266 : mut image_writer: Option<&mut SplitImageLayerWriter>,
124 1266 : stat: &mut CompactionStatistics,
125 1266 : dry_run: bool,
126 1266 : ctx: &RequestContext,
127 1266 : ) -> anyhow::Result<()> {
128 1266 : let mut first_batch = true;
129 1266 : let discard = |key: &PersistentLayerKey| {
130 0 : let key = key.clone();
131 0 : async move { Self::discard_key(&key, tline, dry_run).await }
132 0 : };
133 4206 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
134 2940 : if first_batch {
135 1266 : if logs.len() == 1 && logs[0].1.is_image() {
136 1224 : let Value::Image(img) = &logs[0].1 else {
137 0 : unreachable!()
138 : };
139 1224 : stat.produce_image_key(img);
140 1224 : if let Some(image_writer) = image_writer.as_mut() {
141 1224 : image_writer
142 1224 : .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
143 1242 : .await?;
144 : } else {
145 0 : delta_writer
146 0 : .put_value_with_discard_fn(
147 0 : key,
148 0 : cutoff_lsn,
149 0 : Value::Image(img.clone()),
150 0 : tline,
151 0 : ctx,
152 0 : discard,
153 0 : )
154 0 : .await?;
155 : }
156 : } else {
157 84 : for (lsn, val) in logs {
158 42 : stat.produce_key(&val);
159 42 : delta_writer
160 42 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
161 3 : .await?;
162 : }
163 : }
164 1266 : first_batch = false;
165 : } else {
166 1920 : for (lsn, val) in logs {
167 246 : stat.produce_key(&val);
168 246 : delta_writer
169 246 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
170 24 : .await?;
171 : }
172 : }
173 : }
174 1266 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
175 1362 : for (lsn, val) in above_horizon_logs {
176 96 : stat.produce_key(&val);
177 96 : delta_writer
178 96 : .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
179 6 : .await?;
180 : }
181 1266 : Ok(())
182 1266 : }
183 : }
184 :
185 : #[derive(Debug, Serialize, Default)]
186 : struct CompactionStatisticsNumSize {
187 : num: u64,
188 : size: u64,
189 : }
190 :
191 : #[derive(Debug, Serialize, Default)]
192 : pub struct CompactionStatistics {
193 : delta_layer_visited: CompactionStatisticsNumSize,
194 : image_layer_visited: CompactionStatisticsNumSize,
195 : delta_layer_produced: CompactionStatisticsNumSize,
196 : image_layer_produced: CompactionStatisticsNumSize,
197 : num_delta_layer_discarded: usize,
198 : num_image_layer_discarded: usize,
199 : num_unique_keys_visited: usize,
200 : wal_keys_visited: CompactionStatisticsNumSize,
201 : image_keys_visited: CompactionStatisticsNumSize,
202 : wal_produced: CompactionStatisticsNumSize,
203 : image_produced: CompactionStatisticsNumSize,
204 : }
205 :
206 : impl CompactionStatistics {
207 2058 : fn estimated_size_of_value(val: &Value) -> usize {
208 798 : match val {
209 1260 : Value::Image(img) => img.len(),
210 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
211 798 : _ => std::mem::size_of::<NeonWalRecord>(),
212 : }
213 2058 : }
214 3288 : fn estimated_size_of_key() -> usize {
215 3288 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
216 3288 : }
217 138 : fn visit_delta_layer(&mut self, size: u64) {
218 138 : self.delta_layer_visited.num += 1;
219 138 : self.delta_layer_visited.size += size;
220 138 : }
221 108 : fn visit_image_layer(&mut self, size: u64) {
222 108 : self.image_layer_visited.num += 1;
223 108 : self.image_layer_visited.size += size;
224 108 : }
225 1266 : fn on_unique_key_visited(&mut self) {
226 1266 : self.num_unique_keys_visited += 1;
227 1266 : }
228 420 : fn visit_wal_key(&mut self, val: &Value) {
229 420 : self.wal_keys_visited.num += 1;
230 420 : self.wal_keys_visited.size +=
231 420 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
232 420 : }
233 1260 : fn visit_image_key(&mut self, val: &Value) {
234 1260 : self.image_keys_visited.num += 1;
235 1260 : self.image_keys_visited.size +=
236 1260 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
237 1260 : }
238 384 : fn produce_key(&mut self, val: &Value) {
239 384 : match val {
240 6 : Value::Image(img) => self.produce_image_key(img),
241 378 : Value::WalRecord(_) => self.produce_wal_key(val),
242 : }
243 384 : }
244 378 : fn produce_wal_key(&mut self, val: &Value) {
245 378 : self.wal_produced.num += 1;
246 378 : self.wal_produced.size +=
247 378 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
248 378 : }
249 1230 : fn produce_image_key(&mut self, val: &Bytes) {
250 1230 : self.image_produced.num += 1;
251 1230 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
252 1230 : }
253 24 : fn discard_delta_layer(&mut self) {
254 24 : self.num_delta_layer_discarded += 1;
255 24 : }
256 24 : fn discard_image_layer(&mut self) {
257 24 : self.num_image_layer_discarded += 1;
258 24 : }
259 30 : fn produce_delta_layer(&mut self, size: u64) {
260 30 : self.delta_layer_produced.num += 1;
261 30 : self.delta_layer_produced.size += size;
262 30 : }
263 36 : fn produce_image_layer(&mut self, size: u64) {
264 36 : self.image_layer_produced.num += 1;
265 36 : self.image_layer_produced.size += size;
266 36 : }
267 : }
268 :
269 : impl Timeline {
270 : /// TODO: cancellation
271 : ///
272 : /// Returns whether the compaction has pending tasks.
273 1092 : pub(crate) async fn compact_legacy(
274 1092 : self: &Arc<Self>,
275 1092 : cancel: &CancellationToken,
276 1092 : flags: EnumSet<CompactFlags>,
277 1092 : ctx: &RequestContext,
278 1092 : ) -> Result<bool, CompactionError> {
279 1092 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
280 0 : self.compact_with_gc(cancel, flags, ctx)
281 0 : .await
282 0 : .map_err(CompactionError::Other)?;
283 0 : return Ok(false);
284 1092 : }
285 1092 :
286 1092 : if flags.contains(CompactFlags::DryRun) {
287 0 : return Err(CompactionError::Other(anyhow!(
288 0 : "dry-run mode is not supported for legacy compaction for now"
289 0 : )));
290 1092 : }
291 1092 :
292 1092 : // High level strategy for compaction / image creation:
293 1092 : //
294 1092 : // 1. First, calculate the desired "partitioning" of the
295 1092 : // currently in-use key space. The goal is to partition the
296 1092 : // key space into roughly fixed-size chunks, but also take into
297 1092 : // account any existing image layers, and try to align the
298 1092 : // chunk boundaries with the existing image layers to avoid
299 1092 : // too much churn. Also try to align chunk boundaries with
300 1092 : // relation boundaries. In principle, we don't know about
301 1092 : // relation boundaries here, we just deal with key-value
302 1092 : // pairs, and the code in pgdatadir_mapping.rs knows how to
303 1092 : // map relations into key-value pairs. But in practice we know
304 1092 : // that 'field6' is the block number, and the fields 1-5
305 1092 : // identify a relation. This is just an optimization,
306 1092 : // though.
307 1092 : //
308 1092 : // 2. Once we know the partitioning, for each partition,
309 1092 : // decide if it's time to create a new image layer. The
310 1092 : // criteria is: there has been too much "churn" since the last
311 1092 : // image layer? The "churn" is fuzzy concept, it's a
312 1092 : // combination of too many delta files, or too much WAL in
313 1092 : // total in the delta file. Or perhaps: if creating an image
314 1092 : // file would allow to delete some older files.
315 1092 : //
316 1092 : // 3. After that, we compact all level0 delta files if there
317 1092 : // are too many of them. While compacting, we also garbage
318 1092 : // collect any page versions that are no longer needed because
319 1092 : // of the new image layers we created in step 2.
320 1092 : //
321 1092 : // TODO: This high level strategy hasn't been implemented yet.
322 1092 : // Below are functions compact_level0() and create_image_layers()
323 1092 : // but they are a bit ad hoc and don't quite work like it's explained
324 1092 : // above. Rewrite it.
325 1092 :
326 1092 : // Is the timeline being deleted?
327 1092 : if self.is_stopping() {
328 0 : trace!("Dropping out of compaction on timeline shutdown");
329 0 : return Err(CompactionError::ShuttingDown);
330 1092 : }
331 1092 :
332 1092 : let target_file_size = self.get_checkpoint_distance();
333 :
334 : // Define partitioning schema if needed
335 :
336 : // FIXME: the match should only cover repartitioning, not the next steps
337 1092 : let (partition_count, has_pending_tasks) = match self
338 1092 : .repartition(
339 1092 : self.get_last_record_lsn(),
340 1092 : self.get_compaction_target_size(),
341 1092 : flags,
342 1092 : ctx,
343 1092 : )
344 47982 : .await
345 : {
346 1092 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
347 1092 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
348 1092 : let image_ctx = RequestContextBuilder::extend(ctx)
349 1092 : .access_stats_behavior(AccessStatsBehavior::Skip)
350 1092 : .build();
351 1092 :
352 1092 : // 2. Compact
353 1092 : let timer = self.metrics.compact_time_histo.start_timer();
354 29656 : let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
355 1092 : timer.stop_and_record();
356 1092 :
357 1092 : let mut partitioning = dense_partitioning;
358 1092 : partitioning
359 1092 : .parts
360 1092 : .extend(sparse_partitioning.into_dense().parts);
361 1092 :
362 1092 : // 3. Create new image layers for partitions that have been modified
363 1092 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
364 1092 : if fully_compacted {
365 1092 : let image_layers = self
366 1092 : .create_image_layers(
367 1092 : &partitioning,
368 1092 : lsn,
369 1092 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
370 42 : ImageLayerCreationMode::Force
371 : } else {
372 1050 : ImageLayerCreationMode::Try
373 : },
374 1092 : &image_ctx,
375 : )
376 40849 : .await?;
377 :
378 1092 : self.upload_new_image_layers(image_layers)?;
379 : } else {
380 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
381 : }
382 1092 : (partitioning.parts.len(), !fully_compacted)
383 : }
384 0 : Err(err) => {
385 0 : // no partitioning? This is normal, if the timeline was just created
386 0 : // as an empty timeline. Also in unit tests, when we use the timeline
387 0 : // as a simple key-value store, ignoring the datadir layout. Log the
388 0 : // error but continue.
389 0 : //
390 0 : // Suppress error when it's due to cancellation
391 0 : if !self.cancel.is_cancelled() {
392 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
393 0 : }
394 0 : (1, false)
395 : }
396 : };
397 :
398 1092 : if self.shard_identity.count >= ShardCount::new(2) {
399 : // Limit the number of layer rewrites to the number of partitions: this means its
400 : // runtime should be comparable to a full round of image layer creations, rather than
401 : // being potentially much longer.
402 0 : let rewrite_max = partition_count;
403 0 :
404 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
405 1092 : }
406 :
407 1092 : Ok(has_pending_tasks)
408 1092 : }
409 :
410 : /// Check for layers that are elegible to be rewritten:
411 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
412 : /// we don't indefinitely retain keys in this shard that aren't needed.
413 : /// - For future use: layers beyond pitr_interval that are in formats we would
414 : /// rather not maintain compatibility with indefinitely.
415 : ///
416 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
417 : /// how much work it will try to do in each compaction pass.
418 0 : async fn compact_shard_ancestors(
419 0 : self: &Arc<Self>,
420 0 : rewrite_max: usize,
421 0 : ctx: &RequestContext,
422 0 : ) -> Result<(), CompactionError> {
423 0 : let mut drop_layers = Vec::new();
424 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
425 0 :
426 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
427 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
428 0 : // pitr_interval, for example because a branchpoint references it.
429 0 : //
430 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
431 0 : // are rewriting layers.
432 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
433 0 :
434 0 : tracing::info!(
435 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
436 0 : *latest_gc_cutoff,
437 0 : self.gc_info.read().unwrap().cutoffs.time
438 : );
439 :
440 0 : let layers = self.layers.read().await;
441 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
442 0 : let layer = layers.get_from_desc(&layer_desc);
443 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
444 : // This layer does not belong to a historic ancestor, no need to re-image it.
445 0 : continue;
446 0 : }
447 0 :
448 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
449 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
450 0 : let layer_local_page_count = sharded_range.page_count();
451 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
452 0 : if layer_local_page_count == 0 {
453 : // This ancestral layer only covers keys that belong to other shards.
454 : // We include the full metadata in the log: if we had some critical bug that caused
455 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
456 0 : info!(%layer, old_metadata=?layer.metadata(),
457 0 : "dropping layer after shard split, contains no keys for this shard.",
458 : );
459 :
460 0 : if cfg!(debug_assertions) {
461 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
462 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
463 : // should be !is_key_disposable()
464 0 : let range = layer_desc.get_key_range();
465 0 : let mut key = range.start;
466 0 : while key < range.end {
467 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
468 0 : key = key.next();
469 : }
470 0 : }
471 :
472 0 : drop_layers.push(layer);
473 0 : continue;
474 0 : } else if layer_local_page_count != u32::MAX
475 0 : && layer_local_page_count == layer_raw_page_count
476 : {
477 0 : debug!(%layer,
478 0 : "layer is entirely shard local ({} keys), no need to filter it",
479 : layer_local_page_count
480 : );
481 0 : continue;
482 0 : }
483 0 :
484 0 : // Don't bother re-writing a layer unless it will at least halve its size
485 0 : if layer_local_page_count != u32::MAX
486 0 : && layer_local_page_count > layer_raw_page_count / 2
487 : {
488 0 : debug!(%layer,
489 0 : "layer is already mostly local ({}/{}), not rewriting",
490 : layer_local_page_count,
491 : layer_raw_page_count
492 : );
493 0 : }
494 :
495 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
496 : // without incurring the I/O cost of a rewrite.
497 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
498 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
499 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
500 0 : continue;
501 0 : }
502 0 :
503 0 : if layer_desc.is_delta() {
504 : // We do not yet implement rewrite of delta layers
505 0 : debug!(%layer, "Skipping rewrite of delta layer");
506 0 : continue;
507 0 : }
508 0 :
509 0 : // Only rewrite layers if their generations differ. This guarantees:
510 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
511 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
512 0 : if layer.metadata().generation == self.generation {
513 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
514 0 : continue;
515 0 : }
516 0 :
517 0 : if layers_to_rewrite.len() >= rewrite_max {
518 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
519 0 : layers_to_rewrite.len()
520 : );
521 0 : continue;
522 0 : }
523 0 :
524 0 : // Fall through: all our conditions for doing a rewrite passed.
525 0 : layers_to_rewrite.push(layer);
526 : }
527 :
528 : // Drop read lock on layer map before we start doing time-consuming I/O
529 0 : drop(layers);
530 0 :
531 0 : let mut replace_image_layers = Vec::new();
532 :
533 0 : for layer in layers_to_rewrite {
534 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
535 0 : let mut image_layer_writer = ImageLayerWriter::new(
536 0 : self.conf,
537 0 : self.timeline_id,
538 0 : self.tenant_shard_id,
539 0 : &layer.layer_desc().key_range,
540 0 : layer.layer_desc().image_layer_lsn(),
541 0 : ctx,
542 0 : )
543 0 : .await
544 0 : .map_err(CompactionError::Other)?;
545 :
546 : // Safety of layer rewrites:
547 : // - We are writing to a different local file path than we are reading from, so the old Layer
548 : // cannot interfere with the new one.
549 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
550 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
551 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
552 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
553 : // - Any readers that have a reference to the old layer will keep it alive until they are done
554 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
555 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
556 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
557 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
558 : // - ingestion, which only inserts layers, therefore cannot collide with us.
559 0 : let resident = layer.download_and_keep_resident().await?;
560 :
561 0 : let keys_written = resident
562 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
563 0 : .await?;
564 :
565 0 : if keys_written > 0 {
566 0 : let new_layer = image_layer_writer
567 0 : .finish(self, ctx)
568 0 : .await
569 0 : .map_err(CompactionError::Other)?;
570 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
571 0 : layer.metadata().file_size,
572 0 : new_layer.metadata().file_size);
573 :
574 0 : replace_image_layers.push((layer, new_layer));
575 0 : } else {
576 0 : // Drop the old layer. Usually for this case we would already have noticed that
577 0 : // the layer has no data for us with the ShardedRange check above, but
578 0 : drop_layers.push(layer);
579 0 : }
580 : }
581 :
582 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
583 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
584 : // to remote index) and be removed. This is inefficient but safe.
585 0 : fail::fail_point!("compact-shard-ancestors-localonly");
586 0 :
587 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
588 0 : self.rewrite_layers(replace_image_layers, drop_layers)
589 0 : .await?;
590 :
591 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
592 0 :
593 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
594 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
595 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
596 0 : // load.
597 0 : match self.remote_client.wait_completion().await {
598 0 : Ok(()) => (),
599 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
600 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
601 0 : return Err(CompactionError::ShuttingDown)
602 : }
603 : }
604 :
605 0 : fail::fail_point!("compact-shard-ancestors-persistent");
606 0 :
607 0 : Ok(())
608 0 : }
609 :
610 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
611 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
612 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
613 : ///
614 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
615 : /// that we know won't be needed for reads.
616 2148 : pub(super) async fn update_layer_visibility(
617 2148 : &self,
618 2148 : ) -> Result<(), super::layer_manager::Shutdown> {
619 2148 : let head_lsn = self.get_last_record_lsn();
620 :
621 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
622 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
623 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
624 : // they will be subject to L0->L1 compaction in the near future.
625 2148 : let layer_manager = self.layers.read().await;
626 2148 : let layer_map = layer_manager.layer_map()?;
627 :
628 2148 : let readable_points = {
629 2148 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
630 2148 :
631 2148 : let mut readable_points = Vec::with_capacity(children.len() + 1);
632 2148 : for (child_lsn, _child_timeline_id) in &children {
633 0 : readable_points.push(*child_lsn);
634 0 : }
635 2148 : readable_points.push(head_lsn);
636 2148 : readable_points
637 2148 : };
638 2148 :
639 2148 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
640 10218 : for (layer_desc, visibility) in layer_visibility {
641 8070 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
642 8070 : let layer = layer_manager.get_from_desc(&layer_desc);
643 8070 : layer.set_visibility(visibility);
644 8070 : }
645 :
646 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
647 : // avoid assuming that everything at a branch point is visible.
648 2148 : drop(covered);
649 2148 : Ok(())
650 2148 : }
651 :
652 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
653 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
654 1092 : async fn compact_level0(
655 1092 : self: &Arc<Self>,
656 1092 : target_file_size: u64,
657 1092 : ctx: &RequestContext,
658 1092 : ) -> Result<bool, CompactionError> {
659 : let CompactLevel0Phase1Result {
660 1092 : new_layers,
661 1092 : deltas_to_compact,
662 1092 : fully_compacted,
663 : } = {
664 1092 : let phase1_span = info_span!("compact_level0_phase1");
665 1092 : let ctx = ctx.attached_child();
666 1092 : let mut stats = CompactLevel0Phase1StatsBuilder {
667 1092 : version: Some(2),
668 1092 : tenant_id: Some(self.tenant_shard_id),
669 1092 : timeline_id: Some(self.timeline_id),
670 1092 : ..Default::default()
671 1092 : };
672 1092 :
673 1092 : let begin = tokio::time::Instant::now();
674 1092 : let phase1_layers_locked = self.layers.read().await;
675 1092 : let now = tokio::time::Instant::now();
676 1092 : stats.read_lock_acquisition_micros =
677 1092 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
678 1092 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
679 1092 : .instrument(phase1_span)
680 29651 : .await?
681 : };
682 :
683 1092 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
684 : // nothing to do
685 1008 : return Ok(true);
686 84 : }
687 84 :
688 84 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
689 3 : .await?;
690 84 : Ok(fully_compacted)
691 1092 : }
692 :
693 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
694 1092 : async fn compact_level0_phase1<'a>(
695 1092 : self: &'a Arc<Self>,
696 1092 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
697 1092 : mut stats: CompactLevel0Phase1StatsBuilder,
698 1092 : target_file_size: u64,
699 1092 : ctx: &RequestContext,
700 1092 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
701 1092 : stats.read_lock_held_spawn_blocking_startup_micros =
702 1092 : stats.read_lock_acquisition_micros.till_now(); // set by caller
703 1092 : let layers = guard.layer_map()?;
704 1092 : let level0_deltas = layers.level0_deltas();
705 1092 : stats.level0_deltas_count = Some(level0_deltas.len());
706 1092 :
707 1092 : // Only compact if enough layers have accumulated.
708 1092 : let threshold = self.get_compaction_threshold();
709 1092 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
710 1008 : debug!(
711 0 : level0_deltas = level0_deltas.len(),
712 0 : threshold, "too few deltas to compact"
713 : );
714 1008 : return Ok(CompactLevel0Phase1Result::default());
715 84 : }
716 84 :
717 84 : let mut level0_deltas = level0_deltas
718 84 : .iter()
719 1206 : .map(|x| guard.get_from_desc(x))
720 84 : .collect::<Vec<_>>();
721 84 :
722 84 : // Gather the files to compact in this iteration.
723 84 : //
724 84 : // Start with the oldest Level 0 delta file, and collect any other
725 84 : // level 0 files that form a contiguous sequence, such that the end
726 84 : // LSN of previous file matches the start LSN of the next file.
727 84 : //
728 84 : // Note that if the files don't form such a sequence, we might
729 84 : // "compact" just a single file. That's a bit pointless, but it allows
730 84 : // us to get rid of the level 0 file, and compact the other files on
731 84 : // the next iteration. This could probably made smarter, but such
732 84 : // "gaps" in the sequence of level 0 files should only happen in case
733 84 : // of a crash, partial download from cloud storage, or something like
734 84 : // that, so it's not a big deal in practice.
735 2244 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
736 84 : let mut level0_deltas_iter = level0_deltas.iter();
737 84 :
738 84 : let first_level0_delta = level0_deltas_iter.next().unwrap();
739 84 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
740 84 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
741 84 :
742 84 : // Accumulate the size of layers in `deltas_to_compact`
743 84 : let mut deltas_to_compact_bytes = 0;
744 84 :
745 84 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
746 84 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
747 84 : // work in this function to only operate on this much delta data at once.
748 84 : //
749 84 : // Take the max of the configured value & the default, so that tests that configure tiny values
750 84 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
751 84 : // still let them compact a full stack of L0s in one go.
752 84 : let delta_size_limit = std::cmp::max(
753 84 : self.get_compaction_threshold(),
754 84 : DEFAULT_COMPACTION_THRESHOLD,
755 84 : ) as u64
756 84 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
757 84 :
758 84 : let mut fully_compacted = true;
759 84 :
760 84 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
761 1206 : for l in level0_deltas_iter {
762 1122 : let lsn_range = &l.layer_desc().lsn_range;
763 1122 :
764 1122 : if lsn_range.start != prev_lsn_end {
765 0 : break;
766 1122 : }
767 1122 : deltas_to_compact.push(l.download_and_keep_resident().await?);
768 1122 : deltas_to_compact_bytes += l.metadata().file_size;
769 1122 : prev_lsn_end = lsn_range.end;
770 1122 :
771 1122 : if deltas_to_compact_bytes >= delta_size_limit {
772 0 : info!(
773 0 : l0_deltas_selected = deltas_to_compact.len(),
774 0 : l0_deltas_total = level0_deltas.len(),
775 0 : "L0 compaction picker hit max delta layer size limit: {}",
776 : delta_size_limit
777 : );
778 0 : fully_compacted = false;
779 0 :
780 0 : // Proceed with compaction, but only a subset of L0s
781 0 : break;
782 1122 : }
783 : }
784 84 : let lsn_range = Range {
785 84 : start: deltas_to_compact
786 84 : .first()
787 84 : .unwrap()
788 84 : .layer_desc()
789 84 : .lsn_range
790 84 : .start,
791 84 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
792 84 : };
793 84 :
794 84 : info!(
795 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
796 0 : lsn_range.start,
797 0 : lsn_range.end,
798 0 : deltas_to_compact.len(),
799 0 : level0_deltas.len()
800 : );
801 :
802 1206 : for l in deltas_to_compact.iter() {
803 1206 : info!("compact includes {l}");
804 : }
805 :
806 : // We don't need the original list of layers anymore. Drop it so that
807 : // we don't accidentally use it later in the function.
808 84 : drop(level0_deltas);
809 84 :
810 84 : stats.read_lock_held_prerequisites_micros = stats
811 84 : .read_lock_held_spawn_blocking_startup_micros
812 84 : .till_now();
813 :
814 : // TODO: replace with streaming k-merge
815 84 : let all_keys = {
816 84 : let mut all_keys = Vec::new();
817 1206 : for l in deltas_to_compact.iter() {
818 1206 : if self.cancel.is_cancelled() {
819 0 : return Err(CompactionError::ShuttingDown);
820 1206 : }
821 7083 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
822 : }
823 : // The current stdlib sorting implementation is designed in a way where it is
824 : // particularly fast where the slice is made up of sorted sub-ranges.
825 13271354 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
826 84 : all_keys
827 84 : };
828 84 :
829 84 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
830 :
831 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
832 : //
833 : // A hole is a key range for which this compaction doesn't have any WAL records.
834 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
835 : // cover the hole, but actually don't contain any WAL records for that key range.
836 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
837 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
838 : //
839 : // The algorithm chooses holes as follows.
840 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
841 : // - Filter: min threshold on range length
842 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
843 : //
844 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
845 : #[derive(PartialEq, Eq)]
846 : struct Hole {
847 : key_range: Range<Key>,
848 : coverage_size: usize,
849 : }
850 84 : let holes: Vec<Hole> = {
851 : use std::cmp::Ordering;
852 : impl Ord for Hole {
853 0 : fn cmp(&self, other: &Self) -> Ordering {
854 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
855 0 : }
856 : }
857 : impl PartialOrd for Hole {
858 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
859 0 : Some(self.cmp(other))
860 0 : }
861 : }
862 84 : let max_holes = deltas_to_compact.len();
863 84 : let last_record_lsn = self.get_last_record_lsn();
864 84 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
865 84 : let min_hole_coverage_size = 3; // TODO: something more flexible?
866 84 : // min-heap (reserve space for one more element added before eviction)
867 84 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
868 84 : let mut prev: Option<Key> = None;
869 :
870 6192114 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
871 6192114 : if let Some(prev_key) = prev {
872 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
873 : // compaction is the gap between data key and metadata keys.
874 6192030 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
875 0 : && !Key::is_metadata_key(&prev_key)
876 : {
877 0 : let key_range = prev_key..next_key;
878 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
879 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
880 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
881 0 : // That is why it is better to measure size of hole as number of covering image layers.
882 0 : let coverage_size =
883 0 : layers.image_coverage(&key_range, last_record_lsn).len();
884 0 : if coverage_size >= min_hole_coverage_size {
885 0 : heap.push(Hole {
886 0 : key_range,
887 0 : coverage_size,
888 0 : });
889 0 : if heap.len() > max_holes {
890 0 : heap.pop(); // remove smallest hole
891 0 : }
892 0 : }
893 6192030 : }
894 84 : }
895 6192114 : prev = Some(next_key.next());
896 : }
897 84 : let mut holes = heap.into_vec();
898 84 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
899 84 : holes
900 84 : };
901 84 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
902 84 : drop_rlock(guard);
903 84 :
904 84 : if self.cancel.is_cancelled() {
905 0 : return Err(CompactionError::ShuttingDown);
906 84 : }
907 84 :
908 84 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
909 :
910 : // This iterator walks through all key-value pairs from all the layers
911 : // we're compacting, in key, LSN order.
912 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
913 : // then the Value::Image is ordered before Value::WalRecord.
914 84 : let mut all_values_iter = {
915 84 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
916 1206 : for l in deltas_to_compact.iter() {
917 1206 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
918 1206 : deltas.push(l);
919 : }
920 84 : MergeIterator::create(&deltas, &[], ctx)
921 84 : };
922 84 :
923 84 : // This iterator walks through all keys and is needed to calculate size used by each key
924 84 : let mut all_keys_iter = all_keys
925 84 : .iter()
926 6192114 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
927 6192030 : .coalesce(|mut prev, cur| {
928 6192030 : // Coalesce keys that belong to the same key pair.
929 6192030 : // This ensures that compaction doesn't put them
930 6192030 : // into different layer files.
931 6192030 : // Still limit this by the target file size,
932 6192030 : // so that we keep the size of the files in
933 6192030 : // check.
934 6192030 : if prev.0 == cur.0 && prev.2 < target_file_size {
935 120114 : prev.2 += cur.2;
936 120114 : Ok(prev)
937 : } else {
938 6071916 : Err((prev, cur))
939 : }
940 6192030 : });
941 84 :
942 84 : // Merge the contents of all the input delta layers into a new set
943 84 : // of delta layers, based on the current partitioning.
944 84 : //
945 84 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
946 84 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
947 84 : // would be too large. In that case, we also split on the LSN dimension.
948 84 : //
949 84 : // LSN
950 84 : // ^
951 84 : // |
952 84 : // | +-----------+ +--+--+--+--+
953 84 : // | | | | | | | |
954 84 : // | +-----------+ | | | | |
955 84 : // | | | | | | | |
956 84 : // | +-----------+ ==> | | | | |
957 84 : // | | | | | | | |
958 84 : // | +-----------+ | | | | |
959 84 : // | | | | | | | |
960 84 : // | +-----------+ +--+--+--+--+
961 84 : // |
962 84 : // +--------------> key
963 84 : //
964 84 : //
965 84 : // If one key (X) has a lot of page versions:
966 84 : //
967 84 : // LSN
968 84 : // ^
969 84 : // | (X)
970 84 : // | +-----------+ +--+--+--+--+
971 84 : // | | | | | | | |
972 84 : // | +-----------+ | | +--+ |
973 84 : // | | | | | | | |
974 84 : // | +-----------+ ==> | | | | |
975 84 : // | | | | | +--+ |
976 84 : // | +-----------+ | | | | |
977 84 : // | | | | | | | |
978 84 : // | +-----------+ +--+--+--+--+
979 84 : // |
980 84 : // +--------------> key
981 84 : // TODO: this actually divides the layers into fixed-size chunks, not
982 84 : // based on the partitioning.
983 84 : //
984 84 : // TODO: we should also opportunistically materialize and
985 84 : // garbage collect what we can.
986 84 : let mut new_layers = Vec::new();
987 84 : let mut prev_key: Option<Key> = None;
988 84 : let mut writer: Option<DeltaLayerWriter> = None;
989 84 : let mut key_values_total_size = 0u64;
990 84 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
991 84 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
992 84 : let mut next_hole = 0; // index of next hole in holes vector
993 84 :
994 84 : let mut keys = 0;
995 :
996 6192198 : while let Some((key, lsn, value)) = all_values_iter
997 6192198 : .next()
998 10248 : .await
999 6192198 : .map_err(CompactionError::Other)?
1000 : {
1001 6192114 : keys += 1;
1002 6192114 :
1003 6192114 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1004 : // avoid hitting the cancellation token on every key. in benches, we end up
1005 : // shuffling an order of million keys per layer, this means we'll check it
1006 : // around tens of times per layer.
1007 0 : return Err(CompactionError::ShuttingDown);
1008 6192114 : }
1009 6192114 :
1010 6192114 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
1011 6192114 : // We need to check key boundaries once we reach next key or end of layer with the same key
1012 6192114 : if !same_key || lsn == dup_end_lsn {
1013 6072000 : let mut next_key_size = 0u64;
1014 6072000 : let is_dup_layer = dup_end_lsn.is_valid();
1015 6072000 : dup_start_lsn = Lsn::INVALID;
1016 6072000 : if !same_key {
1017 6072000 : dup_end_lsn = Lsn::INVALID;
1018 6072000 : }
1019 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1020 6072000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1021 6072000 : next_key_size = next_size;
1022 6072000 : if key != next_key {
1023 6071916 : if dup_end_lsn.is_valid() {
1024 0 : // We are writting segment with duplicates:
1025 0 : // place all remaining values of this key in separate segment
1026 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1027 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1028 6071916 : }
1029 6071916 : break;
1030 84 : }
1031 84 : key_values_total_size += next_size;
1032 84 : // Check if it is time to split segment: if total keys size is larger than target file size.
1033 84 : // We need to avoid generation of empty segments if next_size > target_file_size.
1034 84 : if key_values_total_size > target_file_size && lsn != next_lsn {
1035 : // Split key between multiple layers: such layer can contain only single key
1036 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1037 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1038 : } else {
1039 0 : lsn // start with the first LSN for this key
1040 : };
1041 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1042 0 : break;
1043 84 : }
1044 : }
1045 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1046 6072000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1047 0 : dup_start_lsn = dup_end_lsn;
1048 0 : dup_end_lsn = lsn_range.end;
1049 6072000 : }
1050 6072000 : if writer.is_some() {
1051 6071916 : let written_size = writer.as_mut().unwrap().size();
1052 6071916 : let contains_hole =
1053 6071916 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1054 : // check if key cause layer overflow or contains hole...
1055 6071916 : if is_dup_layer
1056 6071916 : || dup_end_lsn.is_valid()
1057 6071916 : || written_size + key_values_total_size > target_file_size
1058 6071076 : || contains_hole
1059 : {
1060 : // ... if so, flush previous layer and prepare to write new one
1061 840 : let (desc, path) = writer
1062 840 : .take()
1063 840 : .unwrap()
1064 840 : .finish(prev_key.unwrap().next(), ctx)
1065 2130 : .await
1066 840 : .map_err(CompactionError::Other)?;
1067 840 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1068 840 : .map_err(CompactionError::Other)?;
1069 :
1070 840 : new_layers.push(new_delta);
1071 840 : writer = None;
1072 840 :
1073 840 : if contains_hole {
1074 0 : // skip hole
1075 0 : next_hole += 1;
1076 840 : }
1077 6071076 : }
1078 84 : }
1079 : // Remember size of key value because at next iteration we will access next item
1080 6072000 : key_values_total_size = next_key_size;
1081 120114 : }
1082 6192114 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1083 0 : Err(CompactionError::Other(anyhow::anyhow!(
1084 0 : "failpoint delta-layer-writer-fail-before-finish"
1085 0 : )))
1086 6192114 : });
1087 :
1088 6192114 : if !self.shard_identity.is_key_disposable(&key) {
1089 6192114 : if writer.is_none() {
1090 924 : if self.cancel.is_cancelled() {
1091 : // to be somewhat responsive to cancellation, check for each new layer
1092 0 : return Err(CompactionError::ShuttingDown);
1093 924 : }
1094 : // Create writer if not initiaized yet
1095 924 : writer = Some(
1096 : DeltaLayerWriter::new(
1097 924 : self.conf,
1098 924 : self.timeline_id,
1099 924 : self.tenant_shard_id,
1100 924 : key,
1101 924 : if dup_end_lsn.is_valid() {
1102 : // this is a layer containing slice of values of the same key
1103 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1104 0 : dup_start_lsn..dup_end_lsn
1105 : } else {
1106 924 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1107 924 : lsn_range.clone()
1108 : },
1109 924 : ctx,
1110 : )
1111 462 : .await
1112 924 : .map_err(CompactionError::Other)?,
1113 : );
1114 :
1115 924 : keys = 0;
1116 6191190 : }
1117 :
1118 6192114 : writer
1119 6192114 : .as_mut()
1120 6192114 : .unwrap()
1121 6192114 : .put_value(key, lsn, value, ctx)
1122 3676 : .await
1123 6192114 : .map_err(CompactionError::Other)?;
1124 : } else {
1125 0 : debug!(
1126 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1127 0 : key,
1128 0 : self.shard_identity.get_shard_number(&key)
1129 : );
1130 : }
1131 :
1132 6192114 : if !new_layers.is_empty() {
1133 59358 : fail_point!("after-timeline-compacted-first-L1");
1134 6132756 : }
1135 :
1136 6192114 : prev_key = Some(key);
1137 : }
1138 84 : if let Some(writer) = writer {
1139 84 : let (desc, path) = writer
1140 84 : .finish(prev_key.unwrap().next(), ctx)
1141 5968 : .await
1142 84 : .map_err(CompactionError::Other)?;
1143 84 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1144 84 : .map_err(CompactionError::Other)?;
1145 84 : new_layers.push(new_delta);
1146 0 : }
1147 :
1148 : // Sync layers
1149 84 : if !new_layers.is_empty() {
1150 : // Print a warning if the created layer is larger than double the target size
1151 : // Add two pages for potential overhead. This should in theory be already
1152 : // accounted for in the target calculation, but for very small targets,
1153 : // we still might easily hit the limit otherwise.
1154 84 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1155 924 : for layer in new_layers.iter() {
1156 924 : if layer.layer_desc().file_size > warn_limit {
1157 0 : warn!(
1158 : %layer,
1159 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1160 : );
1161 924 : }
1162 : }
1163 :
1164 : // The writer.finish() above already did the fsync of the inodes.
1165 : // We just need to fsync the directory in which these inodes are linked,
1166 : // which we know to be the timeline directory.
1167 : //
1168 : // We use fatal_err() below because the after writer.finish() returns with success,
1169 : // the in-memory state of the filesystem already has the layer file in its final place,
1170 : // and subsequent pageserver code could think it's durable while it really isn't.
1171 84 : let timeline_dir = VirtualFile::open(
1172 84 : &self
1173 84 : .conf
1174 84 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1175 84 : ctx,
1176 84 : )
1177 42 : .await
1178 84 : .fatal_err("VirtualFile::open for timeline dir fsync");
1179 84 : timeline_dir
1180 84 : .sync_all()
1181 42 : .await
1182 84 : .fatal_err("VirtualFile::sync_all timeline dir");
1183 0 : }
1184 :
1185 84 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1186 84 : stats.new_deltas_count = Some(new_layers.len());
1187 924 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1188 84 :
1189 84 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1190 84 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1191 : {
1192 84 : Ok(stats_json) => {
1193 84 : info!(
1194 0 : stats_json = stats_json.as_str(),
1195 0 : "compact_level0_phase1 stats available"
1196 : )
1197 : }
1198 0 : Err(e) => {
1199 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1200 : }
1201 : }
1202 :
1203 : // Without this, rustc complains about deltas_to_compact still
1204 : // being borrowed when we `.into_iter()` below.
1205 84 : drop(all_values_iter);
1206 84 :
1207 84 : Ok(CompactLevel0Phase1Result {
1208 84 : new_layers,
1209 84 : deltas_to_compact: deltas_to_compact
1210 84 : .into_iter()
1211 1206 : .map(|x| x.drop_eviction_guard())
1212 84 : .collect::<Vec<_>>(),
1213 84 : fully_compacted,
1214 84 : })
1215 1092 : }
1216 : }
1217 :
1218 : #[derive(Default)]
1219 : struct CompactLevel0Phase1Result {
1220 : new_layers: Vec<ResidentLayer>,
1221 : deltas_to_compact: Vec<Layer>,
1222 : // Whether we have included all L0 layers, or selected only part of them due to the
1223 : // L0 compaction size limit.
1224 : fully_compacted: bool,
1225 : }
1226 :
1227 : #[derive(Default)]
1228 : struct CompactLevel0Phase1StatsBuilder {
1229 : version: Option<u64>,
1230 : tenant_id: Option<TenantShardId>,
1231 : timeline_id: Option<TimelineId>,
1232 : read_lock_acquisition_micros: DurationRecorder,
1233 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1234 : read_lock_held_key_sort_micros: DurationRecorder,
1235 : read_lock_held_prerequisites_micros: DurationRecorder,
1236 : read_lock_held_compute_holes_micros: DurationRecorder,
1237 : read_lock_drop_micros: DurationRecorder,
1238 : write_layer_files_micros: DurationRecorder,
1239 : level0_deltas_count: Option<usize>,
1240 : new_deltas_count: Option<usize>,
1241 : new_deltas_size: Option<u64>,
1242 : }
1243 :
1244 : #[derive(serde::Serialize)]
1245 : struct CompactLevel0Phase1Stats {
1246 : version: u64,
1247 : tenant_id: TenantShardId,
1248 : timeline_id: TimelineId,
1249 : read_lock_acquisition_micros: RecordedDuration,
1250 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1251 : read_lock_held_key_sort_micros: RecordedDuration,
1252 : read_lock_held_prerequisites_micros: RecordedDuration,
1253 : read_lock_held_compute_holes_micros: RecordedDuration,
1254 : read_lock_drop_micros: RecordedDuration,
1255 : write_layer_files_micros: RecordedDuration,
1256 : level0_deltas_count: usize,
1257 : new_deltas_count: usize,
1258 : new_deltas_size: u64,
1259 : }
1260 :
1261 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1262 : type Error = anyhow::Error;
1263 :
1264 84 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1265 84 : Ok(Self {
1266 84 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1267 84 : tenant_id: value
1268 84 : .tenant_id
1269 84 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1270 84 : timeline_id: value
1271 84 : .timeline_id
1272 84 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1273 84 : read_lock_acquisition_micros: value
1274 84 : .read_lock_acquisition_micros
1275 84 : .into_recorded()
1276 84 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1277 84 : read_lock_held_spawn_blocking_startup_micros: value
1278 84 : .read_lock_held_spawn_blocking_startup_micros
1279 84 : .into_recorded()
1280 84 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1281 84 : read_lock_held_key_sort_micros: value
1282 84 : .read_lock_held_key_sort_micros
1283 84 : .into_recorded()
1284 84 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1285 84 : read_lock_held_prerequisites_micros: value
1286 84 : .read_lock_held_prerequisites_micros
1287 84 : .into_recorded()
1288 84 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1289 84 : read_lock_held_compute_holes_micros: value
1290 84 : .read_lock_held_compute_holes_micros
1291 84 : .into_recorded()
1292 84 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1293 84 : read_lock_drop_micros: value
1294 84 : .read_lock_drop_micros
1295 84 : .into_recorded()
1296 84 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1297 84 : write_layer_files_micros: value
1298 84 : .write_layer_files_micros
1299 84 : .into_recorded()
1300 84 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1301 84 : level0_deltas_count: value
1302 84 : .level0_deltas_count
1303 84 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1304 84 : new_deltas_count: value
1305 84 : .new_deltas_count
1306 84 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1307 84 : new_deltas_size: value
1308 84 : .new_deltas_size
1309 84 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1310 : })
1311 84 : }
1312 : }
1313 :
1314 : impl Timeline {
1315 : /// Entry point for new tiered compaction algorithm.
1316 : ///
1317 : /// All the real work is in the implementation in the pageserver_compaction
1318 : /// crate. The code here would apply to any algorithm implemented by the
1319 : /// same interface, but tiered is the only one at the moment.
1320 : ///
1321 : /// TODO: cancellation
1322 0 : pub(crate) async fn compact_tiered(
1323 0 : self: &Arc<Self>,
1324 0 : _cancel: &CancellationToken,
1325 0 : ctx: &RequestContext,
1326 0 : ) -> Result<(), CompactionError> {
1327 0 : let fanout = self.get_compaction_threshold() as u64;
1328 0 : let target_file_size = self.get_checkpoint_distance();
1329 :
1330 : // Find the top of the historical layers
1331 0 : let end_lsn = {
1332 0 : let guard = self.layers.read().await;
1333 0 : let layers = guard.layer_map()?;
1334 :
1335 0 : let l0_deltas = layers.level0_deltas();
1336 0 :
1337 0 : // As an optimization, if we find that there are too few L0 layers,
1338 0 : // bail out early. We know that the compaction algorithm would do
1339 0 : // nothing in that case.
1340 0 : if l0_deltas.len() < fanout as usize {
1341 : // doesn't need compacting
1342 0 : return Ok(());
1343 0 : }
1344 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1345 0 : };
1346 0 :
1347 0 : // Is the timeline being deleted?
1348 0 : if self.is_stopping() {
1349 0 : trace!("Dropping out of compaction on timeline shutdown");
1350 0 : return Err(CompactionError::ShuttingDown);
1351 0 : }
1352 :
1353 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1354 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1355 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1356 0 :
1357 0 : pageserver_compaction::compact_tiered::compact_tiered(
1358 0 : &mut adaptor,
1359 0 : end_lsn,
1360 0 : target_file_size,
1361 0 : fanout,
1362 0 : ctx,
1363 0 : )
1364 0 : .await
1365 : // TODO: compact_tiered needs to return CompactionError
1366 0 : .map_err(CompactionError::Other)?;
1367 :
1368 0 : adaptor.flush_updates().await?;
1369 0 : Ok(())
1370 0 : }
1371 :
1372 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1373 : ///
1374 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1375 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1376 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1377 : ///
1378 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1379 : ///
1380 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1381 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1382 : ///
1383 : /// The function will produce:
1384 : ///
1385 : /// ```plain
1386 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1387 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1388 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1389 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1390 : /// ```
1391 : ///
1392 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1393 1290 : pub(crate) async fn generate_key_retention(
1394 1290 : self: &Arc<Timeline>,
1395 1290 : key: Key,
1396 1290 : full_history: &[(Key, Lsn, Value)],
1397 1290 : horizon: Lsn,
1398 1290 : retain_lsn_below_horizon: &[Lsn],
1399 1290 : delta_threshold_cnt: usize,
1400 1290 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1401 1290 : ) -> anyhow::Result<KeyHistoryRetention> {
1402 1290 : // Pre-checks for the invariants
1403 1290 : if cfg!(debug_assertions) {
1404 3120 : for (log_key, _, _) in full_history {
1405 1830 : assert_eq!(log_key, &key, "mismatched key");
1406 : }
1407 1290 : for i in 1..full_history.len() {
1408 540 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1409 540 : if full_history[i - 1].1 == full_history[i].1 {
1410 0 : assert!(
1411 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1412 0 : "unordered delta/image, or duplicated delta"
1413 : );
1414 540 : }
1415 : }
1416 : // There was an assertion for no base image that checks if the first
1417 : // record in the history is `will_init` before, but it was removed.
1418 : // This is explained in the test cases for generate_key_retention.
1419 : // Search "incomplete history" for more information.
1420 3000 : for lsn in retain_lsn_below_horizon {
1421 1710 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1422 : }
1423 1290 : for i in 1..retain_lsn_below_horizon.len() {
1424 834 : assert!(
1425 834 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1426 0 : "unordered LSN"
1427 : );
1428 : }
1429 0 : }
1430 1290 : let has_ancestor = base_img_from_ancestor.is_some();
1431 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1432 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1433 1290 : let (mut split_history, lsn_split_points) = {
1434 1290 : let mut split_history = Vec::new();
1435 1290 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1436 1290 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1437 3000 : for lsn in retain_lsn_below_horizon {
1438 1710 : lsn_split_points.push(*lsn);
1439 1710 : }
1440 1290 : lsn_split_points.push(horizon);
1441 1290 : let mut current_idx = 0;
1442 3120 : for item @ (_, lsn, _) in full_history {
1443 2316 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1444 486 : current_idx += 1;
1445 486 : }
1446 1830 : split_history[current_idx].push(item);
1447 : }
1448 1290 : (split_history, lsn_split_points)
1449 : };
1450 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1451 5580 : for split_for_lsn in &mut split_history {
1452 4290 : let mut prev_lsn = None;
1453 4290 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1454 4290 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1455 1830 : if let Some(prev_lsn) = &prev_lsn {
1456 198 : if *prev_lsn == lsn {
1457 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1458 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1459 : // drop this delta and keep the image.
1460 : //
1461 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1462 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1463 : // dropped.
1464 : //
1465 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1466 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1467 0 : continue;
1468 198 : }
1469 1632 : }
1470 1830 : prev_lsn = Some(lsn);
1471 1830 : new_split_for_lsn.push(record);
1472 : }
1473 4290 : *split_for_lsn = new_split_for_lsn;
1474 : }
1475 : // Step 3: generate images when necessary
1476 1290 : let mut retention = Vec::with_capacity(split_history.len());
1477 1290 : let mut records_since_last_image = 0;
1478 1290 : let batch_cnt = split_history.len();
1479 1290 : assert!(
1480 1290 : batch_cnt >= 2,
1481 0 : "should have at least below + above horizon batches"
1482 : );
1483 1290 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1484 1290 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1485 54 : replay_history.push((key, lsn, Value::Image(img)));
1486 1236 : }
1487 :
1488 : /// Generate debug information for the replay history
1489 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1490 : use std::fmt::Write;
1491 0 : let mut output = String::new();
1492 0 : if let Some((key, _, _)) = replay_history.first() {
1493 0 : write!(output, "key={} ", key).unwrap();
1494 0 : let mut cnt = 0;
1495 0 : for (_, lsn, val) in replay_history {
1496 0 : if val.is_image() {
1497 0 : write!(output, "i@{} ", lsn).unwrap();
1498 0 : } else if val.will_init() {
1499 0 : write!(output, "di@{} ", lsn).unwrap();
1500 0 : } else {
1501 0 : write!(output, "d@{} ", lsn).unwrap();
1502 0 : }
1503 0 : cnt += 1;
1504 0 : if cnt >= 128 {
1505 0 : write!(output, "... and more").unwrap();
1506 0 : break;
1507 0 : }
1508 : }
1509 0 : } else {
1510 0 : write!(output, "<no history>").unwrap();
1511 0 : }
1512 0 : output
1513 0 : }
1514 :
1515 0 : fn generate_debug_trace(
1516 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1517 0 : full_history: &[(Key, Lsn, Value)],
1518 0 : lsns: &[Lsn],
1519 0 : horizon: Lsn,
1520 0 : ) -> String {
1521 : use std::fmt::Write;
1522 0 : let mut output = String::new();
1523 0 : if let Some(replay_history) = replay_history {
1524 0 : writeln!(
1525 0 : output,
1526 0 : "replay_history: {}",
1527 0 : generate_history_trace(replay_history)
1528 0 : )
1529 0 : .unwrap();
1530 0 : } else {
1531 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1532 0 : }
1533 0 : writeln!(
1534 0 : output,
1535 0 : "full_history: {}",
1536 0 : generate_history_trace(full_history)
1537 0 : )
1538 0 : .unwrap();
1539 0 : writeln!(
1540 0 : output,
1541 0 : "when processing: [{}] horizon={}",
1542 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1543 0 : horizon
1544 0 : )
1545 0 : .unwrap();
1546 0 : output
1547 0 : }
1548 :
1549 4290 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1550 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1551 4290 : records_since_last_image += split_for_lsn.len();
1552 4290 : let generate_image = if i == 0 && !has_ancestor {
1553 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1554 1236 : true
1555 3054 : } else if i == batch_cnt - 1 {
1556 : // Do not generate images for the last batch (above horizon)
1557 1290 : false
1558 1764 : } else if records_since_last_image >= delta_threshold_cnt {
1559 : // Generate images when there are too many records
1560 18 : true
1561 : } else {
1562 1746 : false
1563 : };
1564 4290 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1565 : // Only retain the items after the last image record
1566 5274 : for idx in (0..replay_history.len()).rev() {
1567 5274 : if replay_history[idx].2.will_init() {
1568 4290 : replay_history = replay_history[idx..].to_vec();
1569 4290 : break;
1570 984 : }
1571 : }
1572 4290 : if let Some((_, _, val)) = replay_history.first() {
1573 4290 : if !val.will_init() {
1574 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1575 0 : || {
1576 0 : generate_debug_trace(
1577 0 : Some(&replay_history),
1578 0 : full_history,
1579 0 : retain_lsn_below_horizon,
1580 0 : horizon,
1581 0 : )
1582 0 : },
1583 0 : );
1584 4290 : }
1585 0 : }
1586 4290 : if generate_image && records_since_last_image > 0 {
1587 1254 : records_since_last_image = 0;
1588 1254 : let replay_history_for_debug = if cfg!(debug_assertions) {
1589 1254 : Some(replay_history.clone())
1590 : } else {
1591 0 : None
1592 : };
1593 1254 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1594 1254 : let history = std::mem::take(&mut replay_history);
1595 1254 : let mut img = None;
1596 1254 : let mut records = Vec::with_capacity(history.len());
1597 1254 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1598 1254 : img = Some((*lsn, val.clone()));
1599 1254 : for (_, lsn, val) in history.into_iter().skip(1) {
1600 102 : let Value::WalRecord(rec) = val else {
1601 0 : return Err(anyhow::anyhow!(
1602 0 : "invalid record, first record is image, expect walrecords"
1603 0 : ))
1604 0 : .with_context(|| {
1605 0 : generate_debug_trace(
1606 0 : replay_history_for_debug_ref,
1607 0 : full_history,
1608 0 : retain_lsn_below_horizon,
1609 0 : horizon,
1610 0 : )
1611 0 : });
1612 : };
1613 102 : records.push((lsn, rec));
1614 : }
1615 : } else {
1616 0 : for (_, lsn, val) in history.into_iter() {
1617 0 : let Value::WalRecord(rec) = val else {
1618 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1619 0 : .with_context(|| generate_debug_trace(
1620 0 : replay_history_for_debug_ref,
1621 0 : full_history,
1622 0 : retain_lsn_below_horizon,
1623 0 : horizon,
1624 0 : ));
1625 : };
1626 0 : records.push((lsn, rec));
1627 : }
1628 : }
1629 1254 : records.reverse();
1630 1254 : let state = ValueReconstructState { img, records };
1631 1254 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1632 1254 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1633 1254 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1634 1254 : retention.push(vec![(request_lsn, Value::Image(img))]);
1635 3036 : } else {
1636 3036 : let deltas = split_for_lsn
1637 3036 : .iter()
1638 3036 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1639 3036 : .collect_vec();
1640 3036 : retention.push(deltas);
1641 3036 : }
1642 : }
1643 1290 : let mut result = Vec::with_capacity(retention.len());
1644 1290 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1645 4290 : for (idx, logs) in retention.into_iter().enumerate() {
1646 4290 : if idx == lsn_split_points.len() {
1647 1290 : return Ok(KeyHistoryRetention {
1648 1290 : below_horizon: result,
1649 1290 : above_horizon: KeyLogAtLsn(logs),
1650 1290 : });
1651 3000 : } else {
1652 3000 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1653 3000 : }
1654 : }
1655 0 : unreachable!("key retention is empty")
1656 1290 : }
1657 :
1658 : /// An experimental compaction building block that combines compaction with garbage collection.
1659 : ///
1660 : /// The current implementation picks all delta + image layers that are below or intersecting with
1661 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1662 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1663 : /// and create delta layers with all deltas >= gc horizon.
1664 78 : pub(crate) async fn compact_with_gc(
1665 78 : self: &Arc<Self>,
1666 78 : cancel: &CancellationToken,
1667 78 : flags: EnumSet<CompactFlags>,
1668 78 : ctx: &RequestContext,
1669 78 : ) -> anyhow::Result<()> {
1670 : use std::collections::BTreeSet;
1671 :
1672 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1673 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1674 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1675 :
1676 78 : let gc_lock = async {
1677 78 : tokio::select! {
1678 78 : guard = self.gc_lock.lock() => Ok(guard),
1679 : // TODO: refactor to CompactionError to correctly pass cancelled error
1680 78 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1681 : }
1682 78 : };
1683 :
1684 78 : let gc_lock = crate::timed(
1685 78 : gc_lock,
1686 78 : "acquires gc lock",
1687 78 : std::time::Duration::from_secs(5),
1688 78 : )
1689 3 : .await?;
1690 :
1691 78 : let dry_run = flags.contains(CompactFlags::DryRun);
1692 78 :
1693 78 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
1694 :
1695 78 : scopeguard::defer! {
1696 78 : info!("done enhanced gc bottom-most compaction");
1697 78 : };
1698 78 :
1699 78 : let mut stat = CompactionStatistics::default();
1700 :
1701 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1702 : // The layer selection has the following properties:
1703 : // 1. If a layer is in the selection, all layers below it are in the selection.
1704 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1705 78 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1706 78 : let guard = self.layers.read().await;
1707 78 : let layers = guard.layer_map()?;
1708 78 : let gc_info = self.gc_info.read().unwrap();
1709 78 : let mut retain_lsns_below_horizon = Vec::new();
1710 78 : let gc_cutoff = gc_info.cutoffs.select_min();
1711 102 : for (lsn, _timeline_id) in &gc_info.retain_lsns {
1712 102 : if lsn < &gc_cutoff {
1713 102 : retain_lsns_below_horizon.push(*lsn);
1714 102 : }
1715 : }
1716 78 : for lsn in gc_info.leases.keys() {
1717 0 : if lsn < &gc_cutoff {
1718 0 : retain_lsns_below_horizon.push(*lsn);
1719 0 : }
1720 : }
1721 78 : let mut selected_layers = Vec::new();
1722 78 : drop(gc_info);
1723 : // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
1724 78 : let Some(max_layer_lsn) = layers
1725 78 : .iter_historic_layers()
1726 300 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
1727 246 : .map(|desc| desc.get_lsn_range().end)
1728 78 : .max()
1729 : else {
1730 0 : info!("no layers to compact with gc");
1731 0 : return Ok(());
1732 : };
1733 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
1734 : // layers to compact.
1735 300 : for desc in layers.iter_historic_layers() {
1736 300 : if desc.get_lsn_range().end <= max_layer_lsn {
1737 246 : selected_layers.push(guard.get_from_desc(&desc));
1738 246 : }
1739 : }
1740 78 : if selected_layers.is_empty() {
1741 0 : info!("no layers to compact with gc");
1742 0 : return Ok(());
1743 78 : }
1744 78 : retain_lsns_below_horizon.sort();
1745 78 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1746 : };
1747 78 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1748 6 : Lsn(self.ancestor_lsn.0 + 1)
1749 : } else {
1750 72 : let res = retain_lsns_below_horizon
1751 72 : .first()
1752 72 : .copied()
1753 72 : .unwrap_or(gc_cutoff);
1754 72 : if cfg!(debug_assertions) {
1755 72 : assert_eq!(
1756 72 : res,
1757 72 : retain_lsns_below_horizon
1758 72 : .iter()
1759 72 : .min()
1760 72 : .copied()
1761 72 : .unwrap_or(gc_cutoff)
1762 72 : );
1763 0 : }
1764 72 : res
1765 : };
1766 78 : info!(
1767 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1768 0 : layer_selection.len(),
1769 : gc_cutoff,
1770 : lowest_retain_lsn
1771 : );
1772 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1773 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
1774 78 : let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
1775 324 : for layer in &layer_selection {
1776 246 : let desc = layer.layer_desc();
1777 246 : if desc.is_delta() {
1778 : // ignore single-key layer files
1779 138 : if desc.key_range.start.next() != desc.key_range.end {
1780 102 : let lsn_range = &desc.lsn_range;
1781 102 : lsn_split_point.insert(lsn_range.start);
1782 102 : lsn_split_point.insert(lsn_range.end);
1783 102 : }
1784 138 : stat.visit_delta_layer(desc.file_size());
1785 108 : } else {
1786 108 : stat.visit_image_layer(desc.file_size());
1787 108 : }
1788 : }
1789 324 : for layer in &layer_selection {
1790 246 : let desc = layer.layer_desc();
1791 246 : let key_range = &desc.key_range;
1792 246 : if desc.is_delta() && key_range.start.next() != key_range.end {
1793 102 : let lsn_range = desc.lsn_range.clone();
1794 102 : let intersects = lsn_split_point.range(lsn_range).collect_vec();
1795 102 : if intersects.len() > 1 {
1796 0 : bail!(
1797 0 : "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
1798 0 : desc.key(),
1799 0 : intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
1800 0 : );
1801 102 : }
1802 144 : }
1803 : }
1804 : // The maximum LSN we are processing in this compaction loop
1805 78 : let end_lsn = layer_selection
1806 78 : .iter()
1807 246 : .map(|l| l.layer_desc().lsn_range.end)
1808 78 : .max()
1809 78 : .unwrap();
1810 78 : // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
1811 78 : // as an L0 layer.
1812 78 : let mut delta_layers = Vec::new();
1813 78 : let mut image_layers = Vec::new();
1814 78 : let mut downloaded_layers = Vec::new();
1815 324 : for layer in &layer_selection {
1816 246 : let resident_layer = layer.download_and_keep_resident().await?;
1817 246 : downloaded_layers.push(resident_layer);
1818 : }
1819 324 : for resident_layer in &downloaded_layers {
1820 246 : if resident_layer.layer_desc().is_delta() {
1821 138 : let layer = resident_layer.get_as_delta(ctx).await?;
1822 138 : delta_layers.push(layer);
1823 : } else {
1824 108 : let layer = resident_layer.get_as_image(ctx).await?;
1825 108 : image_layers.push(layer);
1826 : }
1827 : }
1828 78 : let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
1829 78 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1830 78 : // Data of the same key.
1831 78 : let mut accumulated_values = Vec::new();
1832 78 : let mut last_key: Option<Key> = None;
1833 :
1834 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
1835 : // when some condition meet.
1836 78 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
1837 : Some(
1838 72 : SplitImageLayerWriter::new(
1839 72 : self.conf,
1840 72 : self.timeline_id,
1841 72 : self.tenant_shard_id,
1842 72 : Key::MIN,
1843 72 : lowest_retain_lsn,
1844 72 : self.get_compaction_target_size(),
1845 72 : ctx,
1846 72 : )
1847 36 : .await?,
1848 : )
1849 : } else {
1850 6 : None
1851 : };
1852 :
1853 78 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
1854 78 : self.conf,
1855 78 : self.timeline_id,
1856 78 : self.tenant_shard_id,
1857 78 : lowest_retain_lsn..end_lsn,
1858 78 : self.get_compaction_target_size(),
1859 78 : )
1860 0 : .await?;
1861 :
1862 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
1863 : ///
1864 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
1865 : /// is needed for reconstruction. This should be fixed in the future.
1866 : ///
1867 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
1868 : /// images.
1869 1266 : async fn get_ancestor_image(
1870 1266 : tline: &Arc<Timeline>,
1871 1266 : key: Key,
1872 1266 : ctx: &RequestContext,
1873 1266 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
1874 1266 : if tline.ancestor_timeline.is_none() {
1875 1224 : return Ok(None);
1876 42 : };
1877 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
1878 : // as much existing code as possible.
1879 42 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
1880 42 : Ok(Some((key, tline.ancestor_lsn, img)))
1881 1266 : }
1882 :
1883 : // Actually, we can decide not to write to the image layer at all at this point because
1884 : // the key and LSN range are determined. However, to keep things simple here, we still
1885 : // create this writer, and discard the writer in the end.
1886 :
1887 1758 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1888 1680 : if cancel.is_cancelled() {
1889 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
1890 1680 : }
1891 1680 : match val {
1892 1260 : Value::Image(_) => stat.visit_image_key(&val),
1893 420 : Value::WalRecord(_) => stat.visit_wal_key(&val),
1894 : }
1895 1680 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1896 492 : if last_key.is_none() {
1897 78 : last_key = Some(key);
1898 414 : }
1899 492 : accumulated_values.push((key, lsn, val));
1900 : } else {
1901 1188 : let last_key = last_key.as_mut().unwrap();
1902 1188 : stat.on_unique_key_visited();
1903 1188 : let retention = self
1904 1188 : .generate_key_retention(
1905 1188 : *last_key,
1906 1188 : &accumulated_values,
1907 1188 : gc_cutoff,
1908 1188 : &retain_lsns_below_horizon,
1909 1188 : COMPACTION_DELTA_THRESHOLD,
1910 1188 : get_ancestor_image(self, *last_key, ctx).await?,
1911 : )
1912 0 : .await?;
1913 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1914 1188 : retention
1915 1188 : .pipe_to(
1916 1188 : *last_key,
1917 1188 : self,
1918 1188 : &mut delta_layer_writer,
1919 1188 : image_layer_writer.as_mut(),
1920 1188 : &mut stat,
1921 1188 : dry_run,
1922 1188 : ctx,
1923 1188 : )
1924 1203 : .await?;
1925 1188 : accumulated_values.clear();
1926 1188 : *last_key = key;
1927 1188 : accumulated_values.push((key, lsn, val));
1928 : }
1929 : }
1930 :
1931 78 : let last_key = last_key.expect("no keys produced during compaction");
1932 78 : // TODO: move this part to the loop body
1933 78 : stat.on_unique_key_visited();
1934 78 : let retention = self
1935 78 : .generate_key_retention(
1936 78 : last_key,
1937 78 : &accumulated_values,
1938 78 : gc_cutoff,
1939 78 : &retain_lsns_below_horizon,
1940 78 : COMPACTION_DELTA_THRESHOLD,
1941 78 : get_ancestor_image(self, last_key, ctx).await?,
1942 : )
1943 0 : .await?;
1944 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1945 78 : retention
1946 78 : .pipe_to(
1947 78 : last_key,
1948 78 : self,
1949 78 : &mut delta_layer_writer,
1950 78 : image_layer_writer.as_mut(),
1951 78 : &mut stat,
1952 78 : dry_run,
1953 78 : ctx,
1954 78 : )
1955 72 : .await?;
1956 :
1957 114 : let discard = |key: &PersistentLayerKey| {
1958 114 : let key = key.clone();
1959 114 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
1960 114 : };
1961 :
1962 78 : let produced_image_layers = if let Some(writer) = image_layer_writer {
1963 72 : if !dry_run {
1964 60 : writer
1965 60 : .finish_with_discard_fn(self, ctx, Key::MAX, discard)
1966 72 : .await?
1967 : } else {
1968 12 : let (layers, _) = writer.take()?;
1969 12 : assert!(layers.is_empty(), "image layers produced in dry run mode?");
1970 12 : Vec::new()
1971 : }
1972 : } else {
1973 6 : Vec::new()
1974 : };
1975 :
1976 78 : let produced_delta_layers = if !dry_run {
1977 66 : delta_layer_writer
1978 66 : .finish_with_discard_fn(self, ctx, discard)
1979 78 : .await?
1980 : } else {
1981 12 : let (layers, _) = delta_layer_writer.take()?;
1982 12 : assert!(layers.is_empty(), "delta layers produced in dry run mode?");
1983 12 : Vec::new()
1984 : };
1985 :
1986 78 : let mut compact_to = Vec::new();
1987 78 : let mut keep_layers = HashSet::new();
1988 78 : let produced_delta_layers_len = produced_delta_layers.len();
1989 78 : let produced_image_layers_len = produced_image_layers.len();
1990 132 : for action in produced_delta_layers {
1991 54 : match action {
1992 30 : SplitWriterResult::Produced(layer) => {
1993 30 : stat.produce_delta_layer(layer.layer_desc().file_size());
1994 30 : compact_to.push(layer);
1995 30 : }
1996 24 : SplitWriterResult::Discarded(l) => {
1997 24 : keep_layers.insert(l);
1998 24 : stat.discard_delta_layer();
1999 24 : }
2000 : }
2001 : }
2002 138 : for action in produced_image_layers {
2003 60 : match action {
2004 36 : SplitWriterResult::Produced(layer) => {
2005 36 : stat.produce_image_layer(layer.layer_desc().file_size());
2006 36 : compact_to.push(layer);
2007 36 : }
2008 24 : SplitWriterResult::Discarded(l) => {
2009 24 : keep_layers.insert(l);
2010 24 : stat.discard_image_layer();
2011 24 : }
2012 : }
2013 : }
2014 78 : let mut layer_selection = layer_selection;
2015 246 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2016 78 :
2017 78 : info!(
2018 0 : "gc-compaction statistics: {}",
2019 0 : serde_json::to_string(&stat)?
2020 : );
2021 :
2022 78 : if dry_run {
2023 12 : return Ok(());
2024 66 : }
2025 66 :
2026 66 : info!(
2027 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2028 0 : produced_delta_layers_len,
2029 0 : produced_image_layers_len,
2030 0 : layer_selection.len()
2031 : );
2032 :
2033 : // Step 3: Place back to the layer map.
2034 : {
2035 66 : let mut guard = self.layers.write().await;
2036 66 : guard
2037 66 : .open_mut()?
2038 66 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2039 66 : };
2040 66 : self.remote_client
2041 66 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2042 :
2043 66 : drop(gc_lock);
2044 66 :
2045 66 : Ok(())
2046 78 : }
2047 : }
2048 :
2049 : struct TimelineAdaptor {
2050 : timeline: Arc<Timeline>,
2051 :
2052 : keyspace: (Lsn, KeySpace),
2053 :
2054 : new_deltas: Vec<ResidentLayer>,
2055 : new_images: Vec<ResidentLayer>,
2056 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2057 : }
2058 :
2059 : impl TimelineAdaptor {
2060 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2061 0 : Self {
2062 0 : timeline: timeline.clone(),
2063 0 : keyspace,
2064 0 : new_images: Vec::new(),
2065 0 : new_deltas: Vec::new(),
2066 0 : layers_to_delete: Vec::new(),
2067 0 : }
2068 0 : }
2069 :
2070 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2071 0 : let layers_to_delete = {
2072 0 : let guard = self.timeline.layers.read().await;
2073 0 : self.layers_to_delete
2074 0 : .iter()
2075 0 : .map(|x| guard.get_from_desc(x))
2076 0 : .collect::<Vec<Layer>>()
2077 0 : };
2078 0 : self.timeline
2079 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2080 0 : .await?;
2081 :
2082 0 : self.timeline
2083 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2084 :
2085 0 : self.new_deltas.clear();
2086 0 : self.layers_to_delete.clear();
2087 0 : Ok(())
2088 0 : }
2089 : }
2090 :
2091 : #[derive(Clone)]
2092 : struct ResidentDeltaLayer(ResidentLayer);
2093 : #[derive(Clone)]
2094 : struct ResidentImageLayer(ResidentLayer);
2095 :
2096 : impl CompactionJobExecutor for TimelineAdaptor {
2097 : type Key = crate::repository::Key;
2098 :
2099 : type Layer = OwnArc<PersistentLayerDesc>;
2100 : type DeltaLayer = ResidentDeltaLayer;
2101 : type ImageLayer = ResidentImageLayer;
2102 :
2103 : type RequestContext = crate::context::RequestContext;
2104 :
2105 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2106 0 : self.timeline.get_shard_identity()
2107 0 : }
2108 :
2109 0 : async fn get_layers(
2110 0 : &mut self,
2111 0 : key_range: &Range<Key>,
2112 0 : lsn_range: &Range<Lsn>,
2113 0 : _ctx: &RequestContext,
2114 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2115 0 : self.flush_updates().await?;
2116 :
2117 0 : let guard = self.timeline.layers.read().await;
2118 0 : let layer_map = guard.layer_map()?;
2119 :
2120 0 : let result = layer_map
2121 0 : .iter_historic_layers()
2122 0 : .filter(|l| {
2123 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2124 0 : })
2125 0 : .map(OwnArc)
2126 0 : .collect();
2127 0 : Ok(result)
2128 0 : }
2129 :
2130 0 : async fn get_keyspace(
2131 0 : &mut self,
2132 0 : key_range: &Range<Key>,
2133 0 : lsn: Lsn,
2134 0 : _ctx: &RequestContext,
2135 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2136 0 : if lsn == self.keyspace.0 {
2137 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2138 0 : &self.keyspace.1.ranges,
2139 0 : key_range,
2140 0 : ))
2141 : } else {
2142 : // The current compaction implementation only ever requests the key space
2143 : // at the compaction end LSN.
2144 0 : anyhow::bail!("keyspace not available for requested lsn");
2145 : }
2146 0 : }
2147 :
2148 0 : async fn downcast_delta_layer(
2149 0 : &self,
2150 0 : layer: &OwnArc<PersistentLayerDesc>,
2151 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2152 0 : // this is a lot more complex than a simple downcast...
2153 0 : if layer.is_delta() {
2154 0 : let l = {
2155 0 : let guard = self.timeline.layers.read().await;
2156 0 : guard.get_from_desc(layer)
2157 : };
2158 0 : let result = l.download_and_keep_resident().await?;
2159 :
2160 0 : Ok(Some(ResidentDeltaLayer(result)))
2161 : } else {
2162 0 : Ok(None)
2163 : }
2164 0 : }
2165 :
2166 0 : async fn create_image(
2167 0 : &mut self,
2168 0 : lsn: Lsn,
2169 0 : key_range: &Range<Key>,
2170 0 : ctx: &RequestContext,
2171 0 : ) -> anyhow::Result<()> {
2172 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2173 0 : }
2174 :
2175 0 : async fn create_delta(
2176 0 : &mut self,
2177 0 : lsn_range: &Range<Lsn>,
2178 0 : key_range: &Range<Key>,
2179 0 : input_layers: &[ResidentDeltaLayer],
2180 0 : ctx: &RequestContext,
2181 0 : ) -> anyhow::Result<()> {
2182 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2183 :
2184 0 : let mut all_entries = Vec::new();
2185 0 : for dl in input_layers.iter() {
2186 0 : all_entries.extend(dl.load_keys(ctx).await?);
2187 : }
2188 :
2189 : // The current stdlib sorting implementation is designed in a way where it is
2190 : // particularly fast where the slice is made up of sorted sub-ranges.
2191 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2192 :
2193 0 : let mut writer = DeltaLayerWriter::new(
2194 0 : self.timeline.conf,
2195 0 : self.timeline.timeline_id,
2196 0 : self.timeline.tenant_shard_id,
2197 0 : key_range.start,
2198 0 : lsn_range.clone(),
2199 0 : ctx,
2200 0 : )
2201 0 : .await?;
2202 :
2203 0 : let mut dup_values = 0;
2204 0 :
2205 0 : // This iterator walks through all key-value pairs from all the layers
2206 0 : // we're compacting, in key, LSN order.
2207 0 : let mut prev: Option<(Key, Lsn)> = None;
2208 : for &DeltaEntry {
2209 0 : key, lsn, ref val, ..
2210 0 : } in all_entries.iter()
2211 : {
2212 0 : if prev == Some((key, lsn)) {
2213 : // This is a duplicate. Skip it.
2214 : //
2215 : // It can happen if compaction is interrupted after writing some
2216 : // layers but not all, and we are compacting the range again.
2217 : // The calculations in the algorithm assume that there are no
2218 : // duplicates, so the math on targeted file size is likely off,
2219 : // and we will create smaller files than expected.
2220 0 : dup_values += 1;
2221 0 : continue;
2222 0 : }
2223 :
2224 0 : let value = val.load(ctx).await?;
2225 :
2226 0 : writer.put_value(key, lsn, value, ctx).await?;
2227 :
2228 0 : prev = Some((key, lsn));
2229 : }
2230 :
2231 0 : if dup_values > 0 {
2232 0 : warn!("delta layer created with {} duplicate values", dup_values);
2233 0 : }
2234 :
2235 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2236 0 : Err(anyhow::anyhow!(
2237 0 : "failpoint delta-layer-writer-fail-before-finish"
2238 0 : ))
2239 0 : });
2240 :
2241 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2242 0 : let new_delta_layer =
2243 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2244 :
2245 0 : self.new_deltas.push(new_delta_layer);
2246 0 : Ok(())
2247 0 : }
2248 :
2249 0 : async fn delete_layer(
2250 0 : &mut self,
2251 0 : layer: &OwnArc<PersistentLayerDesc>,
2252 0 : _ctx: &RequestContext,
2253 0 : ) -> anyhow::Result<()> {
2254 0 : self.layers_to_delete.push(layer.clone().0);
2255 0 : Ok(())
2256 0 : }
2257 : }
2258 :
2259 : impl TimelineAdaptor {
2260 0 : async fn create_image_impl(
2261 0 : &mut self,
2262 0 : lsn: Lsn,
2263 0 : key_range: &Range<Key>,
2264 0 : ctx: &RequestContext,
2265 0 : ) -> Result<(), CreateImageLayersError> {
2266 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2267 :
2268 0 : let image_layer_writer = ImageLayerWriter::new(
2269 0 : self.timeline.conf,
2270 0 : self.timeline.timeline_id,
2271 0 : self.timeline.tenant_shard_id,
2272 0 : key_range,
2273 0 : lsn,
2274 0 : ctx,
2275 0 : )
2276 0 : .await?;
2277 :
2278 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2279 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2280 0 : "failpoint image-layer-writer-fail-before-finish"
2281 0 : )))
2282 0 : });
2283 :
2284 0 : let keyspace = KeySpace {
2285 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2286 : };
2287 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2288 0 : let start = Key::MIN;
2289 : let ImageLayerCreationOutcome {
2290 0 : image,
2291 : next_start_key: _,
2292 0 : } = self
2293 0 : .timeline
2294 0 : .create_image_layer_for_rel_blocks(
2295 0 : &keyspace,
2296 0 : image_layer_writer,
2297 0 : lsn,
2298 0 : ctx,
2299 0 : key_range.clone(),
2300 0 : start,
2301 0 : )
2302 0 : .await?;
2303 :
2304 0 : if let Some(image_layer) = image {
2305 0 : self.new_images.push(image_layer);
2306 0 : }
2307 :
2308 0 : timer.stop_and_record();
2309 0 :
2310 0 : Ok(())
2311 0 : }
2312 : }
2313 :
2314 : impl CompactionRequestContext for crate::context::RequestContext {}
2315 :
2316 : #[derive(Debug, Clone)]
2317 : pub struct OwnArc<T>(pub Arc<T>);
2318 :
2319 : impl<T> Deref for OwnArc<T> {
2320 : type Target = <Arc<T> as Deref>::Target;
2321 0 : fn deref(&self) -> &Self::Target {
2322 0 : &self.0
2323 0 : }
2324 : }
2325 :
2326 : impl<T> AsRef<T> for OwnArc<T> {
2327 0 : fn as_ref(&self) -> &T {
2328 0 : self.0.as_ref()
2329 0 : }
2330 : }
2331 :
2332 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2333 0 : fn key_range(&self) -> &Range<Key> {
2334 0 : &self.key_range
2335 0 : }
2336 0 : fn lsn_range(&self) -> &Range<Lsn> {
2337 0 : &self.lsn_range
2338 0 : }
2339 0 : fn file_size(&self) -> u64 {
2340 0 : self.file_size
2341 0 : }
2342 0 : fn short_id(&self) -> std::string::String {
2343 0 : self.as_ref().short_id().to_string()
2344 0 : }
2345 0 : fn is_delta(&self) -> bool {
2346 0 : self.as_ref().is_delta()
2347 0 : }
2348 : }
2349 :
2350 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2351 0 : fn key_range(&self) -> &Range<Key> {
2352 0 : &self.layer_desc().key_range
2353 0 : }
2354 0 : fn lsn_range(&self) -> &Range<Lsn> {
2355 0 : &self.layer_desc().lsn_range
2356 0 : }
2357 0 : fn file_size(&self) -> u64 {
2358 0 : self.layer_desc().file_size
2359 0 : }
2360 0 : fn short_id(&self) -> std::string::String {
2361 0 : self.layer_desc().short_id().to_string()
2362 0 : }
2363 0 : fn is_delta(&self) -> bool {
2364 0 : true
2365 0 : }
2366 : }
2367 :
2368 : use crate::tenant::timeline::DeltaEntry;
2369 :
2370 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2371 0 : fn key_range(&self) -> &Range<Key> {
2372 0 : &self.0.layer_desc().key_range
2373 0 : }
2374 0 : fn lsn_range(&self) -> &Range<Lsn> {
2375 0 : &self.0.layer_desc().lsn_range
2376 0 : }
2377 0 : fn file_size(&self) -> u64 {
2378 0 : self.0.layer_desc().file_size
2379 0 : }
2380 0 : fn short_id(&self) -> std::string::String {
2381 0 : self.0.layer_desc().short_id().to_string()
2382 0 : }
2383 0 : fn is_delta(&self) -> bool {
2384 0 : true
2385 0 : }
2386 : }
2387 :
2388 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2389 : type DeltaEntry<'a> = DeltaEntry<'a>;
2390 :
2391 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2392 0 : self.0.load_keys(ctx).await
2393 0 : }
2394 : }
2395 :
2396 : impl CompactionLayer<Key> for ResidentImageLayer {
2397 0 : fn key_range(&self) -> &Range<Key> {
2398 0 : &self.0.layer_desc().key_range
2399 0 : }
2400 0 : fn lsn_range(&self) -> &Range<Lsn> {
2401 0 : &self.0.layer_desc().lsn_range
2402 0 : }
2403 0 : fn file_size(&self) -> u64 {
2404 0 : self.0.layer_desc().file_size
2405 0 : }
2406 0 : fn short_id(&self) -> std::string::String {
2407 0 : self.0.layer_desc().short_id().to_string()
2408 0 : }
2409 0 : fn is_delta(&self) -> bool {
2410 0 : false
2411 0 : }
2412 : }
2413 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|