Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
25 : use serde::Serialize;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{debug, info, info_span, trace, warn, Instrument};
28 : use utils::id::TimelineId;
29 :
30 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
31 : use crate::page_cache;
32 : use crate::statvfs::Statvfs;
33 : use crate::tenant::checks::check_valid_layermap;
34 : use crate::tenant::remote_timeline_client::WaitCompletionError;
35 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
36 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
37 : use crate::tenant::storage_layer::split_writer::{
38 : SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
39 : };
40 : use crate::tenant::storage_layer::{
41 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
42 : };
43 : use crate::tenant::timeline::ImageLayerCreationOutcome;
44 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
45 : use crate::tenant::timeline::{Layer, ResidentLayer};
46 : use crate::tenant::{DeltaLayer, MaybeOffloaded};
47 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
48 : use pageserver_api::config::tenant_conf_defaults::{
49 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
50 : };
51 :
52 : use crate::keyspace::KeySpace;
53 : use crate::repository::{Key, Value};
54 : use crate::walrecord::NeonWalRecord;
55 :
56 : use utils::lsn::Lsn;
57 :
58 : use pageserver_compaction::helpers::overlaps_with;
59 : use pageserver_compaction::interface::*;
60 :
61 : use super::CompactionError;
62 :
63 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
64 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
65 :
66 : /// The result of bottom-most compaction for a single key at each LSN.
67 : #[derive(Debug)]
68 : #[cfg_attr(test, derive(PartialEq))]
69 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
70 :
71 : /// The result of bottom-most compaction.
72 : #[derive(Debug)]
73 : #[cfg_attr(test, derive(PartialEq))]
74 : pub(crate) struct KeyHistoryRetention {
75 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
76 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
77 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
78 : pub(crate) above_horizon: KeyLogAtLsn,
79 : }
80 :
81 : impl KeyHistoryRetention {
82 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
83 : ///
84 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
85 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
86 : /// And we have branches at LSN 0x10, 0x20, 0x30.
87 : /// Then we delete branch @ 0x20.
88 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
89 : /// And that wouldnt' change the shape of the layer.
90 : ///
91 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
92 : ///
93 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
94 38 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
95 38 : if dry_run {
96 0 : return true;
97 38 : }
98 38 : let guard = tline.layers.read().await;
99 38 : if !guard.contains_key(key) {
100 22 : return false;
101 16 : }
102 16 : let layer_generation = guard.get_from_key(key).metadata().generation;
103 16 : drop(guard);
104 16 : if layer_generation == tline.generation {
105 16 : info!(
106 : key=%key,
107 : ?layer_generation,
108 0 : "discard layer due to duplicated layer key in the same generation",
109 : );
110 16 : true
111 : } else {
112 0 : false
113 : }
114 38 : }
115 :
116 : /// Pipe a history of a single key to the writers.
117 : ///
118 : /// If `image_writer` is none, the images will be placed into the delta layers.
119 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
120 : #[allow(clippy::too_many_arguments)]
121 422 : async fn pipe_to(
122 422 : self,
123 422 : key: Key,
124 422 : delta_writer: &mut SplitDeltaLayerWriter,
125 422 : mut image_writer: Option<&mut SplitImageLayerWriter>,
126 422 : stat: &mut CompactionStatistics,
127 422 : ctx: &RequestContext,
128 422 : ) -> anyhow::Result<()> {
129 422 : let mut first_batch = true;
130 1402 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
131 980 : if first_batch {
132 422 : if logs.len() == 1 && logs[0].1.is_image() {
133 408 : let Value::Image(img) = &logs[0].1 else {
134 0 : unreachable!()
135 : };
136 408 : stat.produce_image_key(img);
137 408 : if let Some(image_writer) = image_writer.as_mut() {
138 410 : image_writer.put_image(key, img.clone(), ctx).await?;
139 : } else {
140 0 : delta_writer
141 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
142 0 : .await?;
143 : }
144 : } else {
145 28 : for (lsn, val) in logs {
146 14 : stat.produce_key(&val);
147 14 : delta_writer.put_value(key, lsn, val, ctx).await?;
148 : }
149 : }
150 422 : first_batch = false;
151 : } else {
152 640 : for (lsn, val) in logs {
153 82 : stat.produce_key(&val);
154 82 : delta_writer.put_value(key, lsn, val, ctx).await?;
155 : }
156 : }
157 : }
158 422 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
159 454 : for (lsn, val) in above_horizon_logs {
160 32 : stat.produce_key(&val);
161 32 : delta_writer.put_value(key, lsn, val, ctx).await?;
162 : }
163 422 : Ok(())
164 422 : }
165 : }
166 :
167 : #[derive(Debug, Serialize, Default)]
168 : struct CompactionStatisticsNumSize {
169 : num: u64,
170 : size: u64,
171 : }
172 :
173 : #[derive(Debug, Serialize, Default)]
174 : pub struct CompactionStatistics {
175 : delta_layer_visited: CompactionStatisticsNumSize,
176 : image_layer_visited: CompactionStatisticsNumSize,
177 : delta_layer_produced: CompactionStatisticsNumSize,
178 : image_layer_produced: CompactionStatisticsNumSize,
179 : num_delta_layer_discarded: usize,
180 : num_image_layer_discarded: usize,
181 : num_unique_keys_visited: usize,
182 : wal_keys_visited: CompactionStatisticsNumSize,
183 : image_keys_visited: CompactionStatisticsNumSize,
184 : wal_produced: CompactionStatisticsNumSize,
185 : image_produced: CompactionStatisticsNumSize,
186 : }
187 :
188 : impl CompactionStatistics {
189 686 : fn estimated_size_of_value(val: &Value) -> usize {
190 266 : match val {
191 420 : Value::Image(img) => img.len(),
192 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
193 266 : _ => std::mem::size_of::<NeonWalRecord>(),
194 : }
195 686 : }
196 1096 : fn estimated_size_of_key() -> usize {
197 1096 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
198 1096 : }
199 46 : fn visit_delta_layer(&mut self, size: u64) {
200 46 : self.delta_layer_visited.num += 1;
201 46 : self.delta_layer_visited.size += size;
202 46 : }
203 36 : fn visit_image_layer(&mut self, size: u64) {
204 36 : self.image_layer_visited.num += 1;
205 36 : self.image_layer_visited.size += size;
206 36 : }
207 422 : fn on_unique_key_visited(&mut self) {
208 422 : self.num_unique_keys_visited += 1;
209 422 : }
210 140 : fn visit_wal_key(&mut self, val: &Value) {
211 140 : self.wal_keys_visited.num += 1;
212 140 : self.wal_keys_visited.size +=
213 140 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
214 140 : }
215 420 : fn visit_image_key(&mut self, val: &Value) {
216 420 : self.image_keys_visited.num += 1;
217 420 : self.image_keys_visited.size +=
218 420 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
219 420 : }
220 128 : fn produce_key(&mut self, val: &Value) {
221 128 : match val {
222 2 : Value::Image(img) => self.produce_image_key(img),
223 126 : Value::WalRecord(_) => self.produce_wal_key(val),
224 : }
225 128 : }
226 126 : fn produce_wal_key(&mut self, val: &Value) {
227 126 : self.wal_produced.num += 1;
228 126 : self.wal_produced.size +=
229 126 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
230 126 : }
231 410 : fn produce_image_key(&mut self, val: &Bytes) {
232 410 : self.image_produced.num += 1;
233 410 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
234 410 : }
235 8 : fn discard_delta_layer(&mut self) {
236 8 : self.num_delta_layer_discarded += 1;
237 8 : }
238 8 : fn discard_image_layer(&mut self) {
239 8 : self.num_image_layer_discarded += 1;
240 8 : }
241 10 : fn produce_delta_layer(&mut self, size: u64) {
242 10 : self.delta_layer_produced.num += 1;
243 10 : self.delta_layer_produced.size += size;
244 10 : }
245 12 : fn produce_image_layer(&mut self, size: u64) {
246 12 : self.image_layer_produced.num += 1;
247 12 : self.image_layer_produced.size += size;
248 12 : }
249 : }
250 :
251 : impl Timeline {
252 : /// TODO: cancellation
253 : ///
254 : /// Returns whether the compaction has pending tasks.
255 364 : pub(crate) async fn compact_legacy(
256 364 : self: &Arc<Self>,
257 364 : cancel: &CancellationToken,
258 364 : flags: EnumSet<CompactFlags>,
259 364 : ctx: &RequestContext,
260 364 : ) -> Result<bool, CompactionError> {
261 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
262 0 : self.compact_with_gc(cancel, flags, ctx)
263 0 : .await
264 0 : .map_err(CompactionError::Other)?;
265 0 : return Ok(false);
266 364 : }
267 364 :
268 364 : if flags.contains(CompactFlags::DryRun) {
269 0 : return Err(CompactionError::Other(anyhow!(
270 0 : "dry-run mode is not supported for legacy compaction for now"
271 0 : )));
272 364 : }
273 364 :
274 364 : // High level strategy for compaction / image creation:
275 364 : //
276 364 : // 1. First, calculate the desired "partitioning" of the
277 364 : // currently in-use key space. The goal is to partition the
278 364 : // key space into roughly fixed-size chunks, but also take into
279 364 : // account any existing image layers, and try to align the
280 364 : // chunk boundaries with the existing image layers to avoid
281 364 : // too much churn. Also try to align chunk boundaries with
282 364 : // relation boundaries. In principle, we don't know about
283 364 : // relation boundaries here, we just deal with key-value
284 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
285 364 : // map relations into key-value pairs. But in practice we know
286 364 : // that 'field6' is the block number, and the fields 1-5
287 364 : // identify a relation. This is just an optimization,
288 364 : // though.
289 364 : //
290 364 : // 2. Once we know the partitioning, for each partition,
291 364 : // decide if it's time to create a new image layer. The
292 364 : // criteria is: there has been too much "churn" since the last
293 364 : // image layer? The "churn" is fuzzy concept, it's a
294 364 : // combination of too many delta files, or too much WAL in
295 364 : // total in the delta file. Or perhaps: if creating an image
296 364 : // file would allow to delete some older files.
297 364 : //
298 364 : // 3. After that, we compact all level0 delta files if there
299 364 : // are too many of them. While compacting, we also garbage
300 364 : // collect any page versions that are no longer needed because
301 364 : // of the new image layers we created in step 2.
302 364 : //
303 364 : // TODO: This high level strategy hasn't been implemented yet.
304 364 : // Below are functions compact_level0() and create_image_layers()
305 364 : // but they are a bit ad hoc and don't quite work like it's explained
306 364 : // above. Rewrite it.
307 364 :
308 364 : // Is the timeline being deleted?
309 364 : if self.is_stopping() {
310 0 : trace!("Dropping out of compaction on timeline shutdown");
311 0 : return Err(CompactionError::ShuttingDown);
312 364 : }
313 364 :
314 364 : let target_file_size = self.get_checkpoint_distance();
315 :
316 : // Define partitioning schema if needed
317 :
318 : // FIXME: the match should only cover repartitioning, not the next steps
319 364 : let (partition_count, has_pending_tasks) = match self
320 364 : .repartition(
321 364 : self.get_last_record_lsn(),
322 364 : self.get_compaction_target_size(),
323 364 : flags,
324 364 : ctx,
325 364 : )
326 15750 : .await
327 : {
328 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
329 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
330 364 : let image_ctx = RequestContextBuilder::extend(ctx)
331 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
332 364 : .build();
333 364 :
334 364 : // 2. Compact
335 364 : let timer = self.metrics.compact_time_histo.start_timer();
336 364 : let fully_compacted = self
337 364 : .compact_level0(
338 364 : target_file_size,
339 364 : flags.contains(CompactFlags::ForceL0Compaction),
340 364 : ctx,
341 364 : )
342 9900 : .await?;
343 364 : timer.stop_and_record();
344 364 :
345 364 : let mut partitioning = dense_partitioning;
346 364 : partitioning
347 364 : .parts
348 364 : .extend(sparse_partitioning.into_dense().parts);
349 364 :
350 364 : // 3. Create new image layers for partitions that have been modified
351 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
352 364 : if fully_compacted {
353 364 : let image_layers = self
354 364 : .create_image_layers(
355 364 : &partitioning,
356 364 : lsn,
357 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
358 14 : ImageLayerCreationMode::Force
359 : } else {
360 350 : ImageLayerCreationMode::Try
361 : },
362 364 : &image_ctx,
363 : )
364 11975 : .await?;
365 :
366 364 : self.upload_new_image_layers(image_layers)?;
367 : } else {
368 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
369 : }
370 364 : (partitioning.parts.len(), !fully_compacted)
371 : }
372 0 : Err(err) => {
373 0 : // no partitioning? This is normal, if the timeline was just created
374 0 : // as an empty timeline. Also in unit tests, when we use the timeline
375 0 : // as a simple key-value store, ignoring the datadir layout. Log the
376 0 : // error but continue.
377 0 : //
378 0 : // Suppress error when it's due to cancellation
379 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
380 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
381 0 : }
382 0 : (1, false)
383 : }
384 : };
385 :
386 364 : if self.shard_identity.count >= ShardCount::new(2) {
387 : // Limit the number of layer rewrites to the number of partitions: this means its
388 : // runtime should be comparable to a full round of image layer creations, rather than
389 : // being potentially much longer.
390 0 : let rewrite_max = partition_count;
391 0 :
392 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
393 364 : }
394 :
395 364 : Ok(has_pending_tasks)
396 364 : }
397 :
398 : /// Check for layers that are elegible to be rewritten:
399 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
400 : /// we don't indefinitely retain keys in this shard that aren't needed.
401 : /// - For future use: layers beyond pitr_interval that are in formats we would
402 : /// rather not maintain compatibility with indefinitely.
403 : ///
404 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
405 : /// how much work it will try to do in each compaction pass.
406 0 : async fn compact_shard_ancestors(
407 0 : self: &Arc<Self>,
408 0 : rewrite_max: usize,
409 0 : ctx: &RequestContext,
410 0 : ) -> Result<(), CompactionError> {
411 0 : let mut drop_layers = Vec::new();
412 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
413 0 :
414 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
415 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
416 0 : // pitr_interval, for example because a branchpoint references it.
417 0 : //
418 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
419 0 : // are rewriting layers.
420 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
421 0 :
422 0 : tracing::info!(
423 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
424 0 : *latest_gc_cutoff,
425 0 : self.gc_info.read().unwrap().cutoffs.time
426 : );
427 :
428 0 : let layers = self.layers.read().await;
429 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
430 0 : let layer = layers.get_from_desc(&layer_desc);
431 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
432 : // This layer does not belong to a historic ancestor, no need to re-image it.
433 0 : continue;
434 0 : }
435 0 :
436 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
437 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
438 0 : let layer_local_page_count = sharded_range.page_count();
439 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
440 0 : if layer_local_page_count == 0 {
441 : // This ancestral layer only covers keys that belong to other shards.
442 : // We include the full metadata in the log: if we had some critical bug that caused
443 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
444 0 : info!(%layer, old_metadata=?layer.metadata(),
445 0 : "dropping layer after shard split, contains no keys for this shard.",
446 : );
447 :
448 0 : if cfg!(debug_assertions) {
449 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
450 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
451 : // should be !is_key_disposable()
452 0 : let range = layer_desc.get_key_range();
453 0 : let mut key = range.start;
454 0 : while key < range.end {
455 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
456 0 : key = key.next();
457 : }
458 0 : }
459 :
460 0 : drop_layers.push(layer);
461 0 : continue;
462 0 : } else if layer_local_page_count != u32::MAX
463 0 : && layer_local_page_count == layer_raw_page_count
464 : {
465 0 : debug!(%layer,
466 0 : "layer is entirely shard local ({} keys), no need to filter it",
467 : layer_local_page_count
468 : );
469 0 : continue;
470 0 : }
471 0 :
472 0 : // Don't bother re-writing a layer unless it will at least halve its size
473 0 : if layer_local_page_count != u32::MAX
474 0 : && layer_local_page_count > layer_raw_page_count / 2
475 : {
476 0 : debug!(%layer,
477 0 : "layer is already mostly local ({}/{}), not rewriting",
478 : layer_local_page_count,
479 : layer_raw_page_count
480 : );
481 0 : }
482 :
483 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
484 : // without incurring the I/O cost of a rewrite.
485 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
486 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
487 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
488 0 : continue;
489 0 : }
490 0 :
491 0 : if layer_desc.is_delta() {
492 : // We do not yet implement rewrite of delta layers
493 0 : debug!(%layer, "Skipping rewrite of delta layer");
494 0 : continue;
495 0 : }
496 0 :
497 0 : // Only rewrite layers if their generations differ. This guarantees:
498 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
499 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
500 0 : if layer.metadata().generation == self.generation {
501 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
502 0 : continue;
503 0 : }
504 0 :
505 0 : if layers_to_rewrite.len() >= rewrite_max {
506 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
507 0 : layers_to_rewrite.len()
508 : );
509 0 : continue;
510 0 : }
511 0 :
512 0 : // Fall through: all our conditions for doing a rewrite passed.
513 0 : layers_to_rewrite.push(layer);
514 : }
515 :
516 : // Drop read lock on layer map before we start doing time-consuming I/O
517 0 : drop(layers);
518 0 :
519 0 : let mut replace_image_layers = Vec::new();
520 :
521 0 : for layer in layers_to_rewrite {
522 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
523 0 : let mut image_layer_writer = ImageLayerWriter::new(
524 0 : self.conf,
525 0 : self.timeline_id,
526 0 : self.tenant_shard_id,
527 0 : &layer.layer_desc().key_range,
528 0 : layer.layer_desc().image_layer_lsn(),
529 0 : ctx,
530 0 : )
531 0 : .await
532 0 : .map_err(CompactionError::Other)?;
533 :
534 : // Safety of layer rewrites:
535 : // - We are writing to a different local file path than we are reading from, so the old Layer
536 : // cannot interfere with the new one.
537 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
538 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
539 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
540 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
541 : // - Any readers that have a reference to the old layer will keep it alive until they are done
542 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
543 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
544 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
545 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
546 : // - ingestion, which only inserts layers, therefore cannot collide with us.
547 0 : let resident = layer.download_and_keep_resident().await?;
548 :
549 0 : let keys_written = resident
550 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
551 0 : .await?;
552 :
553 0 : if keys_written > 0 {
554 0 : let (desc, path) = image_layer_writer
555 0 : .finish(ctx)
556 0 : .await
557 0 : .map_err(CompactionError::Other)?;
558 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
559 0 : .map_err(CompactionError::Other)?;
560 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
561 0 : layer.metadata().file_size,
562 0 : new_layer.metadata().file_size);
563 :
564 0 : replace_image_layers.push((layer, new_layer));
565 0 : } else {
566 0 : // Drop the old layer. Usually for this case we would already have noticed that
567 0 : // the layer has no data for us with the ShardedRange check above, but
568 0 : drop_layers.push(layer);
569 0 : }
570 : }
571 :
572 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
573 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
574 : // to remote index) and be removed. This is inefficient but safe.
575 0 : fail::fail_point!("compact-shard-ancestors-localonly");
576 0 :
577 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
578 0 : self.rewrite_layers(replace_image_layers, drop_layers)
579 0 : .await?;
580 :
581 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
582 0 :
583 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
584 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
585 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
586 0 : // load.
587 0 : match self.remote_client.wait_completion().await {
588 0 : Ok(()) => (),
589 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
590 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
591 0 : return Err(CompactionError::ShuttingDown)
592 : }
593 : }
594 :
595 0 : fail::fail_point!("compact-shard-ancestors-persistent");
596 0 :
597 0 : Ok(())
598 0 : }
599 :
600 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
601 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
602 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
603 : ///
604 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
605 : /// that we know won't be needed for reads.
606 192 : pub(super) async fn update_layer_visibility(
607 192 : &self,
608 192 : ) -> Result<(), super::layer_manager::Shutdown> {
609 192 : let head_lsn = self.get_last_record_lsn();
610 :
611 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
612 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
613 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
614 : // they will be subject to L0->L1 compaction in the near future.
615 192 : let layer_manager = self.layers.read().await;
616 192 : let layer_map = layer_manager.layer_map()?;
617 :
618 192 : let readable_points = {
619 192 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
620 192 :
621 192 : let mut readable_points = Vec::with_capacity(children.len() + 1);
622 192 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
623 0 : if *is_offloaded == MaybeOffloaded::Yes {
624 0 : continue;
625 0 : }
626 0 : readable_points.push(*child_lsn);
627 : }
628 192 : readable_points.push(head_lsn);
629 192 : readable_points
630 192 : };
631 192 :
632 192 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
633 504 : for (layer_desc, visibility) in layer_visibility {
634 312 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
635 312 : let layer = layer_manager.get_from_desc(&layer_desc);
636 312 : layer.set_visibility(visibility);
637 312 : }
638 :
639 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
640 : // avoid assuming that everything at a branch point is visible.
641 192 : drop(covered);
642 192 : Ok(())
643 192 : }
644 :
645 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
646 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
647 364 : async fn compact_level0(
648 364 : self: &Arc<Self>,
649 364 : target_file_size: u64,
650 364 : force_compaction_ignore_threshold: bool,
651 364 : ctx: &RequestContext,
652 364 : ) -> Result<bool, CompactionError> {
653 : let CompactLevel0Phase1Result {
654 364 : new_layers,
655 364 : deltas_to_compact,
656 364 : fully_compacted,
657 : } = {
658 364 : let phase1_span = info_span!("compact_level0_phase1");
659 364 : let ctx = ctx.attached_child();
660 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
661 364 : version: Some(2),
662 364 : tenant_id: Some(self.tenant_shard_id),
663 364 : timeline_id: Some(self.timeline_id),
664 364 : ..Default::default()
665 364 : };
666 364 :
667 364 : let begin = tokio::time::Instant::now();
668 364 : let phase1_layers_locked = self.layers.read().await;
669 364 : let now = tokio::time::Instant::now();
670 364 : stats.read_lock_acquisition_micros =
671 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
672 364 : self.compact_level0_phase1(
673 364 : phase1_layers_locked,
674 364 : stats,
675 364 : target_file_size,
676 364 : force_compaction_ignore_threshold,
677 364 : &ctx,
678 364 : )
679 364 : .instrument(phase1_span)
680 9898 : .await?
681 : };
682 :
683 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
684 : // nothing to do
685 336 : return Ok(true);
686 28 : }
687 28 :
688 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
689 1 : .await?;
690 28 : Ok(fully_compacted)
691 364 : }
692 :
693 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
694 364 : async fn compact_level0_phase1<'a>(
695 364 : self: &'a Arc<Self>,
696 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
697 364 : mut stats: CompactLevel0Phase1StatsBuilder,
698 364 : target_file_size: u64,
699 364 : force_compaction_ignore_threshold: bool,
700 364 : ctx: &RequestContext,
701 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
702 364 : stats.read_lock_held_spawn_blocking_startup_micros =
703 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
704 364 : let layers = guard.layer_map()?;
705 364 : let level0_deltas = layers.level0_deltas();
706 364 : stats.level0_deltas_count = Some(level0_deltas.len());
707 364 :
708 364 : // Only compact if enough layers have accumulated.
709 364 : let threshold = self.get_compaction_threshold();
710 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
711 336 : if force_compaction_ignore_threshold {
712 0 : if !level0_deltas.is_empty() {
713 0 : info!(
714 0 : level0_deltas = level0_deltas.len(),
715 0 : threshold, "too few deltas to compact, but forcing compaction"
716 : );
717 : } else {
718 0 : info!(
719 0 : level0_deltas = level0_deltas.len(),
720 0 : threshold, "too few deltas to compact, cannot force compaction"
721 : );
722 0 : return Ok(CompactLevel0Phase1Result::default());
723 : }
724 : } else {
725 336 : debug!(
726 0 : level0_deltas = level0_deltas.len(),
727 0 : threshold, "too few deltas to compact"
728 : );
729 336 : return Ok(CompactLevel0Phase1Result::default());
730 : }
731 28 : }
732 :
733 28 : let mut level0_deltas = level0_deltas
734 28 : .iter()
735 402 : .map(|x| guard.get_from_desc(x))
736 28 : .collect::<Vec<_>>();
737 28 :
738 28 : // Gather the files to compact in this iteration.
739 28 : //
740 28 : // Start with the oldest Level 0 delta file, and collect any other
741 28 : // level 0 files that form a contiguous sequence, such that the end
742 28 : // LSN of previous file matches the start LSN of the next file.
743 28 : //
744 28 : // Note that if the files don't form such a sequence, we might
745 28 : // "compact" just a single file. That's a bit pointless, but it allows
746 28 : // us to get rid of the level 0 file, and compact the other files on
747 28 : // the next iteration. This could probably made smarter, but such
748 28 : // "gaps" in the sequence of level 0 files should only happen in case
749 28 : // of a crash, partial download from cloud storage, or something like
750 28 : // that, so it's not a big deal in practice.
751 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
752 28 : let mut level0_deltas_iter = level0_deltas.iter();
753 28 :
754 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
755 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
756 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
757 28 :
758 28 : // Accumulate the size of layers in `deltas_to_compact`
759 28 : let mut deltas_to_compact_bytes = 0;
760 28 :
761 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
762 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
763 28 : // work in this function to only operate on this much delta data at once.
764 28 : //
765 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
766 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
767 28 : // still let them compact a full stack of L0s in one go.
768 28 : let delta_size_limit = std::cmp::max(
769 28 : self.get_compaction_threshold(),
770 28 : DEFAULT_COMPACTION_THRESHOLD,
771 28 : ) as u64
772 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
773 28 :
774 28 : let mut fully_compacted = true;
775 28 :
776 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
777 402 : for l in level0_deltas_iter {
778 374 : let lsn_range = &l.layer_desc().lsn_range;
779 374 :
780 374 : if lsn_range.start != prev_lsn_end {
781 0 : break;
782 374 : }
783 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
784 374 : deltas_to_compact_bytes += l.metadata().file_size;
785 374 : prev_lsn_end = lsn_range.end;
786 374 :
787 374 : if deltas_to_compact_bytes >= delta_size_limit {
788 0 : info!(
789 0 : l0_deltas_selected = deltas_to_compact.len(),
790 0 : l0_deltas_total = level0_deltas.len(),
791 0 : "L0 compaction picker hit max delta layer size limit: {}",
792 : delta_size_limit
793 : );
794 0 : fully_compacted = false;
795 0 :
796 0 : // Proceed with compaction, but only a subset of L0s
797 0 : break;
798 374 : }
799 : }
800 28 : let lsn_range = Range {
801 28 : start: deltas_to_compact
802 28 : .first()
803 28 : .unwrap()
804 28 : .layer_desc()
805 28 : .lsn_range
806 28 : .start,
807 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
808 28 : };
809 28 :
810 28 : info!(
811 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
812 0 : lsn_range.start,
813 0 : lsn_range.end,
814 0 : deltas_to_compact.len(),
815 0 : level0_deltas.len()
816 : );
817 :
818 402 : for l in deltas_to_compact.iter() {
819 402 : info!("compact includes {l}");
820 : }
821 :
822 : // We don't need the original list of layers anymore. Drop it so that
823 : // we don't accidentally use it later in the function.
824 28 : drop(level0_deltas);
825 28 :
826 28 : stats.read_lock_held_prerequisites_micros = stats
827 28 : .read_lock_held_spawn_blocking_startup_micros
828 28 : .till_now();
829 :
830 : // TODO: replace with streaming k-merge
831 28 : let all_keys = {
832 28 : let mut all_keys = Vec::new();
833 402 : for l in deltas_to_compact.iter() {
834 402 : if self.cancel.is_cancelled() {
835 0 : return Err(CompactionError::ShuttingDown);
836 402 : }
837 2366 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
838 : }
839 : // The current stdlib sorting implementation is designed in a way where it is
840 : // particularly fast where the slice is made up of sorted sub-ranges.
841 4423750 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
842 28 : all_keys
843 28 : };
844 28 :
845 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
846 :
847 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
848 : //
849 : // A hole is a key range for which this compaction doesn't have any WAL records.
850 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
851 : // cover the hole, but actually don't contain any WAL records for that key range.
852 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
853 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
854 : //
855 : // The algorithm chooses holes as follows.
856 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
857 : // - Filter: min threshold on range length
858 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
859 : //
860 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
861 : #[derive(PartialEq, Eq)]
862 : struct Hole {
863 : key_range: Range<Key>,
864 : coverage_size: usize,
865 : }
866 28 : let holes: Vec<Hole> = {
867 : use std::cmp::Ordering;
868 : impl Ord for Hole {
869 0 : fn cmp(&self, other: &Self) -> Ordering {
870 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
871 0 : }
872 : }
873 : impl PartialOrd for Hole {
874 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
875 0 : Some(self.cmp(other))
876 0 : }
877 : }
878 28 : let max_holes = deltas_to_compact.len();
879 28 : let last_record_lsn = self.get_last_record_lsn();
880 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
881 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
882 28 : // min-heap (reserve space for one more element added before eviction)
883 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
884 28 : let mut prev: Option<Key> = None;
885 :
886 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
887 2064038 : if let Some(prev_key) = prev {
888 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
889 : // compaction is the gap between data key and metadata keys.
890 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
891 0 : && !Key::is_metadata_key(&prev_key)
892 : {
893 0 : let key_range = prev_key..next_key;
894 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
895 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
896 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
897 0 : // That is why it is better to measure size of hole as number of covering image layers.
898 0 : let coverage_size =
899 0 : layers.image_coverage(&key_range, last_record_lsn).len();
900 0 : if coverage_size >= min_hole_coverage_size {
901 0 : heap.push(Hole {
902 0 : key_range,
903 0 : coverage_size,
904 0 : });
905 0 : if heap.len() > max_holes {
906 0 : heap.pop(); // remove smallest hole
907 0 : }
908 0 : }
909 2064010 : }
910 28 : }
911 2064038 : prev = Some(next_key.next());
912 : }
913 28 : let mut holes = heap.into_vec();
914 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
915 28 : holes
916 28 : };
917 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
918 28 : drop_rlock(guard);
919 28 :
920 28 : if self.cancel.is_cancelled() {
921 0 : return Err(CompactionError::ShuttingDown);
922 28 : }
923 28 :
924 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
925 :
926 : // This iterator walks through all key-value pairs from all the layers
927 : // we're compacting, in key, LSN order.
928 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
929 : // then the Value::Image is ordered before Value::WalRecord.
930 28 : let mut all_values_iter = {
931 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
932 402 : for l in deltas_to_compact.iter() {
933 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
934 402 : deltas.push(l);
935 : }
936 28 : MergeIterator::create(&deltas, &[], ctx)
937 28 : };
938 28 :
939 28 : // This iterator walks through all keys and is needed to calculate size used by each key
940 28 : let mut all_keys_iter = all_keys
941 28 : .iter()
942 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
943 2064010 : .coalesce(|mut prev, cur| {
944 2064010 : // Coalesce keys that belong to the same key pair.
945 2064010 : // This ensures that compaction doesn't put them
946 2064010 : // into different layer files.
947 2064010 : // Still limit this by the target file size,
948 2064010 : // so that we keep the size of the files in
949 2064010 : // check.
950 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
951 40038 : prev.2 += cur.2;
952 40038 : Ok(prev)
953 : } else {
954 2023972 : Err((prev, cur))
955 : }
956 2064010 : });
957 28 :
958 28 : // Merge the contents of all the input delta layers into a new set
959 28 : // of delta layers, based on the current partitioning.
960 28 : //
961 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
962 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
963 28 : // would be too large. In that case, we also split on the LSN dimension.
964 28 : //
965 28 : // LSN
966 28 : // ^
967 28 : // |
968 28 : // | +-----------+ +--+--+--+--+
969 28 : // | | | | | | | |
970 28 : // | +-----------+ | | | | |
971 28 : // | | | | | | | |
972 28 : // | +-----------+ ==> | | | | |
973 28 : // | | | | | | | |
974 28 : // | +-----------+ | | | | |
975 28 : // | | | | | | | |
976 28 : // | +-----------+ +--+--+--+--+
977 28 : // |
978 28 : // +--------------> key
979 28 : //
980 28 : //
981 28 : // If one key (X) has a lot of page versions:
982 28 : //
983 28 : // LSN
984 28 : // ^
985 28 : // | (X)
986 28 : // | +-----------+ +--+--+--+--+
987 28 : // | | | | | | | |
988 28 : // | +-----------+ | | +--+ |
989 28 : // | | | | | | | |
990 28 : // | +-----------+ ==> | | | | |
991 28 : // | | | | | +--+ |
992 28 : // | +-----------+ | | | | |
993 28 : // | | | | | | | |
994 28 : // | +-----------+ +--+--+--+--+
995 28 : // |
996 28 : // +--------------> key
997 28 : // TODO: this actually divides the layers into fixed-size chunks, not
998 28 : // based on the partitioning.
999 28 : //
1000 28 : // TODO: we should also opportunistically materialize and
1001 28 : // garbage collect what we can.
1002 28 : let mut new_layers = Vec::new();
1003 28 : let mut prev_key: Option<Key> = None;
1004 28 : let mut writer: Option<DeltaLayerWriter> = None;
1005 28 : let mut key_values_total_size = 0u64;
1006 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1007 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1008 28 : let mut next_hole = 0; // index of next hole in holes vector
1009 28 :
1010 28 : let mut keys = 0;
1011 :
1012 2064066 : while let Some((key, lsn, value)) = all_values_iter
1013 2064066 : .next()
1014 3425 : .await
1015 2064066 : .map_err(CompactionError::Other)?
1016 : {
1017 2064038 : keys += 1;
1018 2064038 :
1019 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1020 : // avoid hitting the cancellation token on every key. in benches, we end up
1021 : // shuffling an order of million keys per layer, this means we'll check it
1022 : // around tens of times per layer.
1023 0 : return Err(CompactionError::ShuttingDown);
1024 2064038 : }
1025 2064038 :
1026 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
1027 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1028 2064038 : if !same_key || lsn == dup_end_lsn {
1029 2024000 : let mut next_key_size = 0u64;
1030 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1031 2024000 : dup_start_lsn = Lsn::INVALID;
1032 2024000 : if !same_key {
1033 2024000 : dup_end_lsn = Lsn::INVALID;
1034 2024000 : }
1035 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1036 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1037 2024000 : next_key_size = next_size;
1038 2024000 : if key != next_key {
1039 2023972 : if dup_end_lsn.is_valid() {
1040 0 : // We are writting segment with duplicates:
1041 0 : // place all remaining values of this key in separate segment
1042 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1043 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1044 2023972 : }
1045 2023972 : break;
1046 28 : }
1047 28 : key_values_total_size += next_size;
1048 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1049 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1050 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1051 : // Split key between multiple layers: such layer can contain only single key
1052 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1053 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1054 : } else {
1055 0 : lsn // start with the first LSN for this key
1056 : };
1057 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1058 0 : break;
1059 28 : }
1060 : }
1061 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1062 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1063 0 : dup_start_lsn = dup_end_lsn;
1064 0 : dup_end_lsn = lsn_range.end;
1065 2024000 : }
1066 2024000 : if writer.is_some() {
1067 2023972 : let written_size = writer.as_mut().unwrap().size();
1068 2023972 : let contains_hole =
1069 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1070 : // check if key cause layer overflow or contains hole...
1071 2023972 : if is_dup_layer
1072 2023972 : || dup_end_lsn.is_valid()
1073 2023972 : || written_size + key_values_total_size > target_file_size
1074 2023692 : || contains_hole
1075 : {
1076 : // ... if so, flush previous layer and prepare to write new one
1077 280 : let (desc, path) = writer
1078 280 : .take()
1079 280 : .unwrap()
1080 280 : .finish(prev_key.unwrap().next(), ctx)
1081 711 : .await
1082 280 : .map_err(CompactionError::Other)?;
1083 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1084 280 : .map_err(CompactionError::Other)?;
1085 :
1086 280 : new_layers.push(new_delta);
1087 280 : writer = None;
1088 280 :
1089 280 : if contains_hole {
1090 0 : // skip hole
1091 0 : next_hole += 1;
1092 280 : }
1093 2023692 : }
1094 28 : }
1095 : // Remember size of key value because at next iteration we will access next item
1096 2024000 : key_values_total_size = next_key_size;
1097 40038 : }
1098 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1099 0 : Err(CompactionError::Other(anyhow::anyhow!(
1100 0 : "failpoint delta-layer-writer-fail-before-finish"
1101 0 : )))
1102 2064038 : });
1103 :
1104 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1105 2064038 : if writer.is_none() {
1106 308 : if self.cancel.is_cancelled() {
1107 : // to be somewhat responsive to cancellation, check for each new layer
1108 0 : return Err(CompactionError::ShuttingDown);
1109 308 : }
1110 : // Create writer if not initiaized yet
1111 308 : writer = Some(
1112 : DeltaLayerWriter::new(
1113 308 : self.conf,
1114 308 : self.timeline_id,
1115 308 : self.tenant_shard_id,
1116 308 : key,
1117 308 : if dup_end_lsn.is_valid() {
1118 : // this is a layer containing slice of values of the same key
1119 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1120 0 : dup_start_lsn..dup_end_lsn
1121 : } else {
1122 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1123 308 : lsn_range.clone()
1124 : },
1125 308 : ctx,
1126 : )
1127 154 : .await
1128 308 : .map_err(CompactionError::Other)?,
1129 : );
1130 :
1131 308 : keys = 0;
1132 2063730 : }
1133 :
1134 2064038 : writer
1135 2064038 : .as_mut()
1136 2064038 : .unwrap()
1137 2064038 : .put_value(key, lsn, value, ctx)
1138 1225 : .await
1139 2064038 : .map_err(CompactionError::Other)?;
1140 : } else {
1141 0 : debug!(
1142 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1143 0 : key,
1144 0 : self.shard_identity.get_shard_number(&key)
1145 : );
1146 : }
1147 :
1148 2064038 : if !new_layers.is_empty() {
1149 19786 : fail_point!("after-timeline-compacted-first-L1");
1150 2044252 : }
1151 :
1152 2064038 : prev_key = Some(key);
1153 : }
1154 28 : if let Some(writer) = writer {
1155 28 : let (desc, path) = writer
1156 28 : .finish(prev_key.unwrap().next(), ctx)
1157 1989 : .await
1158 28 : .map_err(CompactionError::Other)?;
1159 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1160 28 : .map_err(CompactionError::Other)?;
1161 28 : new_layers.push(new_delta);
1162 0 : }
1163 :
1164 : // Sync layers
1165 28 : if !new_layers.is_empty() {
1166 : // Print a warning if the created layer is larger than double the target size
1167 : // Add two pages for potential overhead. This should in theory be already
1168 : // accounted for in the target calculation, but for very small targets,
1169 : // we still might easily hit the limit otherwise.
1170 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1171 308 : for layer in new_layers.iter() {
1172 308 : if layer.layer_desc().file_size > warn_limit {
1173 0 : warn!(
1174 : %layer,
1175 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1176 : );
1177 308 : }
1178 : }
1179 :
1180 : // The writer.finish() above already did the fsync of the inodes.
1181 : // We just need to fsync the directory in which these inodes are linked,
1182 : // which we know to be the timeline directory.
1183 : //
1184 : // We use fatal_err() below because the after writer.finish() returns with success,
1185 : // the in-memory state of the filesystem already has the layer file in its final place,
1186 : // and subsequent pageserver code could think it's durable while it really isn't.
1187 28 : let timeline_dir = VirtualFile::open(
1188 28 : &self
1189 28 : .conf
1190 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1191 28 : ctx,
1192 28 : )
1193 14 : .await
1194 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1195 28 : timeline_dir
1196 28 : .sync_all()
1197 14 : .await
1198 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1199 0 : }
1200 :
1201 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1202 28 : stats.new_deltas_count = Some(new_layers.len());
1203 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1204 28 :
1205 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1206 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1207 : {
1208 28 : Ok(stats_json) => {
1209 28 : info!(
1210 0 : stats_json = stats_json.as_str(),
1211 0 : "compact_level0_phase1 stats available"
1212 : )
1213 : }
1214 0 : Err(e) => {
1215 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1216 : }
1217 : }
1218 :
1219 : // Without this, rustc complains about deltas_to_compact still
1220 : // being borrowed when we `.into_iter()` below.
1221 28 : drop(all_values_iter);
1222 28 :
1223 28 : Ok(CompactLevel0Phase1Result {
1224 28 : new_layers,
1225 28 : deltas_to_compact: deltas_to_compact
1226 28 : .into_iter()
1227 402 : .map(|x| x.drop_eviction_guard())
1228 28 : .collect::<Vec<_>>(),
1229 28 : fully_compacted,
1230 28 : })
1231 364 : }
1232 : }
1233 :
1234 : #[derive(Default)]
1235 : struct CompactLevel0Phase1Result {
1236 : new_layers: Vec<ResidentLayer>,
1237 : deltas_to_compact: Vec<Layer>,
1238 : // Whether we have included all L0 layers, or selected only part of them due to the
1239 : // L0 compaction size limit.
1240 : fully_compacted: bool,
1241 : }
1242 :
1243 : #[derive(Default)]
1244 : struct CompactLevel0Phase1StatsBuilder {
1245 : version: Option<u64>,
1246 : tenant_id: Option<TenantShardId>,
1247 : timeline_id: Option<TimelineId>,
1248 : read_lock_acquisition_micros: DurationRecorder,
1249 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1250 : read_lock_held_key_sort_micros: DurationRecorder,
1251 : read_lock_held_prerequisites_micros: DurationRecorder,
1252 : read_lock_held_compute_holes_micros: DurationRecorder,
1253 : read_lock_drop_micros: DurationRecorder,
1254 : write_layer_files_micros: DurationRecorder,
1255 : level0_deltas_count: Option<usize>,
1256 : new_deltas_count: Option<usize>,
1257 : new_deltas_size: Option<u64>,
1258 : }
1259 :
1260 : #[derive(serde::Serialize)]
1261 : struct CompactLevel0Phase1Stats {
1262 : version: u64,
1263 : tenant_id: TenantShardId,
1264 : timeline_id: TimelineId,
1265 : read_lock_acquisition_micros: RecordedDuration,
1266 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1267 : read_lock_held_key_sort_micros: RecordedDuration,
1268 : read_lock_held_prerequisites_micros: RecordedDuration,
1269 : read_lock_held_compute_holes_micros: RecordedDuration,
1270 : read_lock_drop_micros: RecordedDuration,
1271 : write_layer_files_micros: RecordedDuration,
1272 : level0_deltas_count: usize,
1273 : new_deltas_count: usize,
1274 : new_deltas_size: u64,
1275 : }
1276 :
1277 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1278 : type Error = anyhow::Error;
1279 :
1280 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1281 28 : Ok(Self {
1282 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1283 28 : tenant_id: value
1284 28 : .tenant_id
1285 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1286 28 : timeline_id: value
1287 28 : .timeline_id
1288 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1289 28 : read_lock_acquisition_micros: value
1290 28 : .read_lock_acquisition_micros
1291 28 : .into_recorded()
1292 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1293 28 : read_lock_held_spawn_blocking_startup_micros: value
1294 28 : .read_lock_held_spawn_blocking_startup_micros
1295 28 : .into_recorded()
1296 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1297 28 : read_lock_held_key_sort_micros: value
1298 28 : .read_lock_held_key_sort_micros
1299 28 : .into_recorded()
1300 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1301 28 : read_lock_held_prerequisites_micros: value
1302 28 : .read_lock_held_prerequisites_micros
1303 28 : .into_recorded()
1304 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1305 28 : read_lock_held_compute_holes_micros: value
1306 28 : .read_lock_held_compute_holes_micros
1307 28 : .into_recorded()
1308 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1309 28 : read_lock_drop_micros: value
1310 28 : .read_lock_drop_micros
1311 28 : .into_recorded()
1312 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1313 28 : write_layer_files_micros: value
1314 28 : .write_layer_files_micros
1315 28 : .into_recorded()
1316 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1317 28 : level0_deltas_count: value
1318 28 : .level0_deltas_count
1319 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1320 28 : new_deltas_count: value
1321 28 : .new_deltas_count
1322 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1323 28 : new_deltas_size: value
1324 28 : .new_deltas_size
1325 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1326 : })
1327 28 : }
1328 : }
1329 :
1330 : impl Timeline {
1331 : /// Entry point for new tiered compaction algorithm.
1332 : ///
1333 : /// All the real work is in the implementation in the pageserver_compaction
1334 : /// crate. The code here would apply to any algorithm implemented by the
1335 : /// same interface, but tiered is the only one at the moment.
1336 : ///
1337 : /// TODO: cancellation
1338 0 : pub(crate) async fn compact_tiered(
1339 0 : self: &Arc<Self>,
1340 0 : _cancel: &CancellationToken,
1341 0 : ctx: &RequestContext,
1342 0 : ) -> Result<(), CompactionError> {
1343 0 : let fanout = self.get_compaction_threshold() as u64;
1344 0 : let target_file_size = self.get_checkpoint_distance();
1345 :
1346 : // Find the top of the historical layers
1347 0 : let end_lsn = {
1348 0 : let guard = self.layers.read().await;
1349 0 : let layers = guard.layer_map()?;
1350 :
1351 0 : let l0_deltas = layers.level0_deltas();
1352 0 :
1353 0 : // As an optimization, if we find that there are too few L0 layers,
1354 0 : // bail out early. We know that the compaction algorithm would do
1355 0 : // nothing in that case.
1356 0 : if l0_deltas.len() < fanout as usize {
1357 : // doesn't need compacting
1358 0 : return Ok(());
1359 0 : }
1360 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1361 0 : };
1362 0 :
1363 0 : // Is the timeline being deleted?
1364 0 : if self.is_stopping() {
1365 0 : trace!("Dropping out of compaction on timeline shutdown");
1366 0 : return Err(CompactionError::ShuttingDown);
1367 0 : }
1368 :
1369 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1370 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1371 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1372 0 :
1373 0 : pageserver_compaction::compact_tiered::compact_tiered(
1374 0 : &mut adaptor,
1375 0 : end_lsn,
1376 0 : target_file_size,
1377 0 : fanout,
1378 0 : ctx,
1379 0 : )
1380 0 : .await
1381 : // TODO: compact_tiered needs to return CompactionError
1382 0 : .map_err(CompactionError::Other)?;
1383 :
1384 0 : adaptor.flush_updates().await?;
1385 0 : Ok(())
1386 0 : }
1387 :
1388 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1389 : ///
1390 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1391 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1392 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1393 : ///
1394 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1395 : ///
1396 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1397 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1398 : ///
1399 : /// The function will produce:
1400 : ///
1401 : /// ```plain
1402 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1403 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1404 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1405 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1406 : /// ```
1407 : ///
1408 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1409 430 : pub(crate) async fn generate_key_retention(
1410 430 : self: &Arc<Timeline>,
1411 430 : key: Key,
1412 430 : full_history: &[(Key, Lsn, Value)],
1413 430 : horizon: Lsn,
1414 430 : retain_lsn_below_horizon: &[Lsn],
1415 430 : delta_threshold_cnt: usize,
1416 430 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1417 430 : ) -> anyhow::Result<KeyHistoryRetention> {
1418 430 : // Pre-checks for the invariants
1419 430 : if cfg!(debug_assertions) {
1420 1040 : for (log_key, _, _) in full_history {
1421 610 : assert_eq!(log_key, &key, "mismatched key");
1422 : }
1423 430 : for i in 1..full_history.len() {
1424 180 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1425 180 : if full_history[i - 1].1 == full_history[i].1 {
1426 0 : assert!(
1427 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1428 0 : "unordered delta/image, or duplicated delta"
1429 : );
1430 180 : }
1431 : }
1432 : // There was an assertion for no base image that checks if the first
1433 : // record in the history is `will_init` before, but it was removed.
1434 : // This is explained in the test cases for generate_key_retention.
1435 : // Search "incomplete history" for more information.
1436 1000 : for lsn in retain_lsn_below_horizon {
1437 570 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1438 : }
1439 430 : for i in 1..retain_lsn_below_horizon.len() {
1440 278 : assert!(
1441 278 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1442 0 : "unordered LSN"
1443 : );
1444 : }
1445 0 : }
1446 430 : let has_ancestor = base_img_from_ancestor.is_some();
1447 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1448 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1449 430 : let (mut split_history, lsn_split_points) = {
1450 430 : let mut split_history = Vec::new();
1451 430 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1452 430 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1453 1000 : for lsn in retain_lsn_below_horizon {
1454 570 : lsn_split_points.push(*lsn);
1455 570 : }
1456 430 : lsn_split_points.push(horizon);
1457 430 : let mut current_idx = 0;
1458 1040 : for item @ (_, lsn, _) in full_history {
1459 772 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1460 162 : current_idx += 1;
1461 162 : }
1462 610 : split_history[current_idx].push(item);
1463 : }
1464 430 : (split_history, lsn_split_points)
1465 : };
1466 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1467 1860 : for split_for_lsn in &mut split_history {
1468 1430 : let mut prev_lsn = None;
1469 1430 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1470 1430 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1471 610 : if let Some(prev_lsn) = &prev_lsn {
1472 66 : if *prev_lsn == lsn {
1473 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1474 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1475 : // drop this delta and keep the image.
1476 : //
1477 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1478 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1479 : // dropped.
1480 : //
1481 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1482 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1483 0 : continue;
1484 66 : }
1485 544 : }
1486 610 : prev_lsn = Some(lsn);
1487 610 : new_split_for_lsn.push(record);
1488 : }
1489 1430 : *split_for_lsn = new_split_for_lsn;
1490 : }
1491 : // Step 3: generate images when necessary
1492 430 : let mut retention = Vec::with_capacity(split_history.len());
1493 430 : let mut records_since_last_image = 0;
1494 430 : let batch_cnt = split_history.len();
1495 430 : assert!(
1496 430 : batch_cnt >= 2,
1497 0 : "should have at least below + above horizon batches"
1498 : );
1499 430 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1500 430 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1501 18 : replay_history.push((key, lsn, Value::Image(img)));
1502 412 : }
1503 :
1504 : /// Generate debug information for the replay history
1505 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1506 : use std::fmt::Write;
1507 0 : let mut output = String::new();
1508 0 : if let Some((key, _, _)) = replay_history.first() {
1509 0 : write!(output, "key={} ", key).unwrap();
1510 0 : let mut cnt = 0;
1511 0 : for (_, lsn, val) in replay_history {
1512 0 : if val.is_image() {
1513 0 : write!(output, "i@{} ", lsn).unwrap();
1514 0 : } else if val.will_init() {
1515 0 : write!(output, "di@{} ", lsn).unwrap();
1516 0 : } else {
1517 0 : write!(output, "d@{} ", lsn).unwrap();
1518 0 : }
1519 0 : cnt += 1;
1520 0 : if cnt >= 128 {
1521 0 : write!(output, "... and more").unwrap();
1522 0 : break;
1523 0 : }
1524 : }
1525 0 : } else {
1526 0 : write!(output, "<no history>").unwrap();
1527 0 : }
1528 0 : output
1529 0 : }
1530 :
1531 0 : fn generate_debug_trace(
1532 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1533 0 : full_history: &[(Key, Lsn, Value)],
1534 0 : lsns: &[Lsn],
1535 0 : horizon: Lsn,
1536 0 : ) -> String {
1537 : use std::fmt::Write;
1538 0 : let mut output = String::new();
1539 0 : if let Some(replay_history) = replay_history {
1540 0 : writeln!(
1541 0 : output,
1542 0 : "replay_history: {}",
1543 0 : generate_history_trace(replay_history)
1544 0 : )
1545 0 : .unwrap();
1546 0 : } else {
1547 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1548 0 : }
1549 0 : writeln!(
1550 0 : output,
1551 0 : "full_history: {}",
1552 0 : generate_history_trace(full_history)
1553 0 : )
1554 0 : .unwrap();
1555 0 : writeln!(
1556 0 : output,
1557 0 : "when processing: [{}] horizon={}",
1558 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1559 0 : horizon
1560 0 : )
1561 0 : .unwrap();
1562 0 : output
1563 0 : }
1564 :
1565 1430 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1566 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1567 1430 : records_since_last_image += split_for_lsn.len();
1568 1430 : let generate_image = if i == 0 && !has_ancestor {
1569 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1570 412 : true
1571 1018 : } else if i == batch_cnt - 1 {
1572 : // Do not generate images for the last batch (above horizon)
1573 430 : false
1574 588 : } else if records_since_last_image >= delta_threshold_cnt {
1575 : // Generate images when there are too many records
1576 6 : true
1577 : } else {
1578 582 : false
1579 : };
1580 1430 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1581 : // Only retain the items after the last image record
1582 1758 : for idx in (0..replay_history.len()).rev() {
1583 1758 : if replay_history[idx].2.will_init() {
1584 1430 : replay_history = replay_history[idx..].to_vec();
1585 1430 : break;
1586 328 : }
1587 : }
1588 1430 : if let Some((_, _, val)) = replay_history.first() {
1589 1430 : if !val.will_init() {
1590 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1591 0 : || {
1592 0 : generate_debug_trace(
1593 0 : Some(&replay_history),
1594 0 : full_history,
1595 0 : retain_lsn_below_horizon,
1596 0 : horizon,
1597 0 : )
1598 0 : },
1599 0 : );
1600 1430 : }
1601 0 : }
1602 1430 : if generate_image && records_since_last_image > 0 {
1603 418 : records_since_last_image = 0;
1604 418 : let replay_history_for_debug = if cfg!(debug_assertions) {
1605 418 : Some(replay_history.clone())
1606 : } else {
1607 0 : None
1608 : };
1609 418 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1610 418 : let history = std::mem::take(&mut replay_history);
1611 418 : let mut img = None;
1612 418 : let mut records = Vec::with_capacity(history.len());
1613 418 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1614 418 : img = Some((*lsn, val.clone()));
1615 418 : for (_, lsn, val) in history.into_iter().skip(1) {
1616 34 : let Value::WalRecord(rec) = val else {
1617 0 : return Err(anyhow::anyhow!(
1618 0 : "invalid record, first record is image, expect walrecords"
1619 0 : ))
1620 0 : .with_context(|| {
1621 0 : generate_debug_trace(
1622 0 : replay_history_for_debug_ref,
1623 0 : full_history,
1624 0 : retain_lsn_below_horizon,
1625 0 : horizon,
1626 0 : )
1627 0 : });
1628 : };
1629 34 : records.push((lsn, rec));
1630 : }
1631 : } else {
1632 0 : for (_, lsn, val) in history.into_iter() {
1633 0 : let Value::WalRecord(rec) = val else {
1634 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1635 0 : .with_context(|| generate_debug_trace(
1636 0 : replay_history_for_debug_ref,
1637 0 : full_history,
1638 0 : retain_lsn_below_horizon,
1639 0 : horizon,
1640 0 : ));
1641 : };
1642 0 : records.push((lsn, rec));
1643 : }
1644 : }
1645 418 : records.reverse();
1646 418 : let state = ValueReconstructState { img, records };
1647 418 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1648 418 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1649 418 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1650 418 : retention.push(vec![(request_lsn, Value::Image(img))]);
1651 1012 : } else {
1652 1012 : let deltas = split_for_lsn
1653 1012 : .iter()
1654 1012 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1655 1012 : .collect_vec();
1656 1012 : retention.push(deltas);
1657 1012 : }
1658 : }
1659 430 : let mut result = Vec::with_capacity(retention.len());
1660 430 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1661 1430 : for (idx, logs) in retention.into_iter().enumerate() {
1662 1430 : if idx == lsn_split_points.len() {
1663 430 : return Ok(KeyHistoryRetention {
1664 430 : below_horizon: result,
1665 430 : above_horizon: KeyLogAtLsn(logs),
1666 430 : });
1667 1000 : } else {
1668 1000 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1669 1000 : }
1670 : }
1671 0 : unreachable!("key retention is empty")
1672 430 : }
1673 :
1674 : /// Check how much space is left on the disk
1675 26 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
1676 26 : let tenants_dir = self.conf.tenants_path();
1677 :
1678 26 : let stat = Statvfs::get(&tenants_dir, None)
1679 26 : .context("statvfs failed, presumably directory got unlinked")?;
1680 :
1681 26 : let (avail_bytes, _) = stat.get_avail_total_bytes();
1682 26 :
1683 26 : Ok(avail_bytes)
1684 26 : }
1685 :
1686 : /// Check if the compaction can proceed safely without running out of space. We assume the size
1687 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
1688 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
1689 : /// compaction.
1690 26 : async fn check_compaction_space(
1691 26 : self: &Arc<Self>,
1692 26 : layer_selection: &[Layer],
1693 26 : ) -> anyhow::Result<()> {
1694 26 : let available_space = self.check_available_space().await?;
1695 26 : let mut remote_layer_size = 0;
1696 26 : let mut all_layer_size = 0;
1697 108 : for layer in layer_selection {
1698 82 : let needs_download = layer.needs_download().await?;
1699 82 : if needs_download.is_some() {
1700 0 : remote_layer_size += layer.layer_desc().file_size;
1701 82 : }
1702 82 : all_layer_size += layer.layer_desc().file_size;
1703 : }
1704 26 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
1705 26 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
1706 : {
1707 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
1708 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
1709 26 : }
1710 26 : Ok(())
1711 26 : }
1712 :
1713 : /// An experimental compaction building block that combines compaction with garbage collection.
1714 : ///
1715 : /// The current implementation picks all delta + image layers that are below or intersecting with
1716 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1717 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1718 : /// and create delta layers with all deltas >= gc horizon.
1719 26 : pub(crate) async fn compact_with_gc(
1720 26 : self: &Arc<Self>,
1721 26 : cancel: &CancellationToken,
1722 26 : flags: EnumSet<CompactFlags>,
1723 26 : ctx: &RequestContext,
1724 26 : ) -> anyhow::Result<()> {
1725 : use std::collections::BTreeSet;
1726 :
1727 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1728 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1729 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1730 :
1731 26 : let gc_lock = async {
1732 26 : tokio::select! {
1733 26 : guard = self.gc_lock.lock() => Ok(guard),
1734 : // TODO: refactor to CompactionError to correctly pass cancelled error
1735 26 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1736 : }
1737 26 : };
1738 :
1739 26 : let gc_lock = crate::timed(
1740 26 : gc_lock,
1741 26 : "acquires gc lock",
1742 26 : std::time::Duration::from_secs(5),
1743 26 : )
1744 1 : .await?;
1745 :
1746 26 : let dry_run = flags.contains(CompactFlags::DryRun);
1747 26 :
1748 26 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
1749 :
1750 26 : scopeguard::defer! {
1751 26 : info!("done enhanced gc bottom-most compaction");
1752 26 : };
1753 26 :
1754 26 : let mut stat = CompactionStatistics::default();
1755 :
1756 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1757 : // The layer selection has the following properties:
1758 : // 1. If a layer is in the selection, all layers below it are in the selection.
1759 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1760 26 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1761 26 : let guard = self.layers.read().await;
1762 26 : let layers = guard.layer_map()?;
1763 26 : let gc_info = self.gc_info.read().unwrap();
1764 26 : let mut retain_lsns_below_horizon = Vec::new();
1765 26 : let gc_cutoff = gc_info.cutoffs.select_min();
1766 34 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
1767 34 : if lsn < &gc_cutoff {
1768 34 : retain_lsns_below_horizon.push(*lsn);
1769 34 : }
1770 : }
1771 26 : for lsn in gc_info.leases.keys() {
1772 0 : if lsn < &gc_cutoff {
1773 0 : retain_lsns_below_horizon.push(*lsn);
1774 0 : }
1775 : }
1776 26 : let mut selected_layers = Vec::new();
1777 26 : drop(gc_info);
1778 : // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
1779 26 : let Some(max_layer_lsn) = layers
1780 26 : .iter_historic_layers()
1781 100 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
1782 82 : .map(|desc| desc.get_lsn_range().end)
1783 26 : .max()
1784 : else {
1785 0 : info!("no layers to compact with gc");
1786 0 : return Ok(());
1787 : };
1788 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
1789 : // layers to compact.
1790 100 : for desc in layers.iter_historic_layers() {
1791 100 : if desc.get_lsn_range().end <= max_layer_lsn {
1792 82 : selected_layers.push(guard.get_from_desc(&desc));
1793 82 : }
1794 : }
1795 26 : if selected_layers.is_empty() {
1796 0 : info!("no layers to compact with gc");
1797 0 : return Ok(());
1798 26 : }
1799 26 : retain_lsns_below_horizon.sort();
1800 26 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1801 : };
1802 26 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1803 2 : Lsn(self.ancestor_lsn.0 + 1)
1804 : } else {
1805 24 : let res = retain_lsns_below_horizon
1806 24 : .first()
1807 24 : .copied()
1808 24 : .unwrap_or(gc_cutoff);
1809 24 : if cfg!(debug_assertions) {
1810 24 : assert_eq!(
1811 24 : res,
1812 24 : retain_lsns_below_horizon
1813 24 : .iter()
1814 24 : .min()
1815 24 : .copied()
1816 24 : .unwrap_or(gc_cutoff)
1817 24 : );
1818 0 : }
1819 24 : res
1820 : };
1821 26 : info!(
1822 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1823 0 : layer_selection.len(),
1824 : gc_cutoff,
1825 : lowest_retain_lsn
1826 : );
1827 :
1828 28 : self.check_compaction_space(&layer_selection).await?;
1829 :
1830 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1831 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
1832 26 : let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
1833 108 : for layer in &layer_selection {
1834 82 : let desc = layer.layer_desc();
1835 82 : if desc.is_delta() {
1836 : // ignore single-key layer files
1837 46 : if desc.key_range.start.next() != desc.key_range.end {
1838 34 : let lsn_range = &desc.lsn_range;
1839 34 : lsn_split_point.insert(lsn_range.start);
1840 34 : lsn_split_point.insert(lsn_range.end);
1841 34 : }
1842 46 : stat.visit_delta_layer(desc.file_size());
1843 36 : } else {
1844 36 : stat.visit_image_layer(desc.file_size());
1845 36 : }
1846 : }
1847 26 : let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
1848 26 : .iter()
1849 82 : .map(|layer| layer.layer_desc().layer_name())
1850 26 : .collect_vec();
1851 26 : if let Some(err) = check_valid_layermap(&layer_names) {
1852 0 : bail!("cannot run gc-compaction because {}", err);
1853 26 : }
1854 26 : // The maximum LSN we are processing in this compaction loop
1855 26 : let end_lsn = layer_selection
1856 26 : .iter()
1857 82 : .map(|l| l.layer_desc().lsn_range.end)
1858 26 : .max()
1859 26 : .unwrap();
1860 26 : // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
1861 26 : // as an L0 layer.
1862 26 : let mut delta_layers = Vec::new();
1863 26 : let mut image_layers = Vec::new();
1864 26 : let mut downloaded_layers = Vec::new();
1865 108 : for layer in &layer_selection {
1866 82 : let resident_layer = layer.download_and_keep_resident().await?;
1867 82 : downloaded_layers.push(resident_layer);
1868 : }
1869 108 : for resident_layer in &downloaded_layers {
1870 82 : if resident_layer.layer_desc().is_delta() {
1871 46 : let layer = resident_layer.get_as_delta(ctx).await?;
1872 46 : delta_layers.push(layer);
1873 : } else {
1874 36 : let layer = resident_layer.get_as_image(ctx).await?;
1875 36 : image_layers.push(layer);
1876 : }
1877 : }
1878 26 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
1879 26 : let mut merge_iter = FilterIterator::create(
1880 26 : MergeIterator::create(&delta_layers, &image_layers, ctx),
1881 26 : dense_ks,
1882 26 : sparse_ks,
1883 26 : )?;
1884 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1885 : // Data of the same key.
1886 26 : let mut accumulated_values = Vec::new();
1887 26 : let mut last_key: Option<Key> = None;
1888 :
1889 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
1890 : // when some condition meet.
1891 26 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
1892 : Some(
1893 24 : SplitImageLayerWriter::new(
1894 24 : self.conf,
1895 24 : self.timeline_id,
1896 24 : self.tenant_shard_id,
1897 24 : Key::MIN,
1898 24 : lowest_retain_lsn,
1899 24 : self.get_compaction_target_size(),
1900 24 : ctx,
1901 24 : )
1902 12 : .await?,
1903 : )
1904 : } else {
1905 2 : None
1906 : };
1907 :
1908 26 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
1909 26 : self.conf,
1910 26 : self.timeline_id,
1911 26 : self.tenant_shard_id,
1912 26 : lowest_retain_lsn..end_lsn,
1913 26 : self.get_compaction_target_size(),
1914 26 : )
1915 0 : .await?;
1916 :
1917 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
1918 : ///
1919 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
1920 : /// is needed for reconstruction. This should be fixed in the future.
1921 : ///
1922 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
1923 : /// images.
1924 422 : async fn get_ancestor_image(
1925 422 : tline: &Arc<Timeline>,
1926 422 : key: Key,
1927 422 : ctx: &RequestContext,
1928 422 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
1929 422 : if tline.ancestor_timeline.is_none() {
1930 408 : return Ok(None);
1931 14 : };
1932 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
1933 : // as much existing code as possible.
1934 14 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
1935 14 : Ok(Some((key, tline.ancestor_lsn, img)))
1936 422 : }
1937 :
1938 : // Actually, we can decide not to write to the image layer at all at this point because
1939 : // the key and LSN range are determined. However, to keep things simple here, we still
1940 : // create this writer, and discard the writer in the end.
1941 :
1942 586 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1943 560 : if cancel.is_cancelled() {
1944 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
1945 560 : }
1946 560 : match val {
1947 420 : Value::Image(_) => stat.visit_image_key(&val),
1948 140 : Value::WalRecord(_) => stat.visit_wal_key(&val),
1949 : }
1950 560 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1951 164 : if last_key.is_none() {
1952 26 : last_key = Some(key);
1953 138 : }
1954 164 : accumulated_values.push((key, lsn, val));
1955 : } else {
1956 396 : let last_key = last_key.as_mut().unwrap();
1957 396 : stat.on_unique_key_visited();
1958 396 : let retention = self
1959 396 : .generate_key_retention(
1960 396 : *last_key,
1961 396 : &accumulated_values,
1962 396 : gc_cutoff,
1963 396 : &retain_lsns_below_horizon,
1964 396 : COMPACTION_DELTA_THRESHOLD,
1965 396 : get_ancestor_image(self, *last_key, ctx).await?,
1966 : )
1967 0 : .await?;
1968 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1969 396 : retention
1970 396 : .pipe_to(
1971 396 : *last_key,
1972 396 : &mut delta_layer_writer,
1973 396 : image_layer_writer.as_mut(),
1974 396 : &mut stat,
1975 396 : ctx,
1976 396 : )
1977 397 : .await?;
1978 396 : accumulated_values.clear();
1979 396 : *last_key = key;
1980 396 : accumulated_values.push((key, lsn, val));
1981 : }
1982 : }
1983 :
1984 26 : let last_key = last_key.expect("no keys produced during compaction");
1985 26 : // TODO: move this part to the loop body
1986 26 : stat.on_unique_key_visited();
1987 26 : let retention = self
1988 26 : .generate_key_retention(
1989 26 : last_key,
1990 26 : &accumulated_values,
1991 26 : gc_cutoff,
1992 26 : &retain_lsns_below_horizon,
1993 26 : COMPACTION_DELTA_THRESHOLD,
1994 26 : get_ancestor_image(self, last_key, ctx).await?,
1995 : )
1996 0 : .await?;
1997 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1998 26 : retention
1999 26 : .pipe_to(
2000 26 : last_key,
2001 26 : &mut delta_layer_writer,
2002 26 : image_layer_writer.as_mut(),
2003 26 : &mut stat,
2004 26 : ctx,
2005 26 : )
2006 24 : .await?;
2007 :
2008 38 : let discard = |key: &PersistentLayerKey| {
2009 38 : let key = key.clone();
2010 38 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2011 38 : };
2012 :
2013 26 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2014 24 : if !dry_run {
2015 20 : writer
2016 20 : .finish_with_discard_fn(self, ctx, Key::MAX, discard)
2017 25 : .await?
2018 : } else {
2019 4 : drop(writer);
2020 4 : Vec::new()
2021 : }
2022 : } else {
2023 2 : Vec::new()
2024 : };
2025 :
2026 26 : let produced_delta_layers = if !dry_run {
2027 22 : delta_layer_writer
2028 22 : .finish_with_discard_fn(self, ctx, discard)
2029 25 : .await?
2030 : } else {
2031 4 : drop(delta_layer_writer);
2032 4 : Vec::new()
2033 : };
2034 :
2035 26 : let mut compact_to = Vec::new();
2036 26 : let mut keep_layers = HashSet::new();
2037 26 : let produced_delta_layers_len = produced_delta_layers.len();
2038 26 : let produced_image_layers_len = produced_image_layers.len();
2039 44 : for action in produced_delta_layers {
2040 18 : match action {
2041 10 : SplitWriterResult::Produced(layer) => {
2042 10 : stat.produce_delta_layer(layer.layer_desc().file_size());
2043 10 : compact_to.push(layer);
2044 10 : }
2045 8 : SplitWriterResult::Discarded(l) => {
2046 8 : keep_layers.insert(l);
2047 8 : stat.discard_delta_layer();
2048 8 : }
2049 : }
2050 : }
2051 46 : for action in produced_image_layers {
2052 20 : match action {
2053 12 : SplitWriterResult::Produced(layer) => {
2054 12 : stat.produce_image_layer(layer.layer_desc().file_size());
2055 12 : compact_to.push(layer);
2056 12 : }
2057 8 : SplitWriterResult::Discarded(l) => {
2058 8 : keep_layers.insert(l);
2059 8 : stat.discard_image_layer();
2060 8 : }
2061 : }
2062 : }
2063 26 : let mut layer_selection = layer_selection;
2064 82 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2065 26 :
2066 26 : info!(
2067 0 : "gc-compaction statistics: {}",
2068 0 : serde_json::to_string(&stat)?
2069 : );
2070 :
2071 26 : if dry_run {
2072 4 : return Ok(());
2073 22 : }
2074 22 :
2075 22 : info!(
2076 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2077 0 : produced_delta_layers_len,
2078 0 : produced_image_layers_len,
2079 0 : layer_selection.len()
2080 : );
2081 :
2082 : // Step 3: Place back to the layer map.
2083 : {
2084 22 : let mut guard = self.layers.write().await;
2085 22 : guard
2086 22 : .open_mut()?
2087 22 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2088 22 : };
2089 22 : self.remote_client
2090 22 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2091 :
2092 22 : drop(gc_lock);
2093 22 :
2094 22 : Ok(())
2095 26 : }
2096 : }
2097 :
2098 : struct TimelineAdaptor {
2099 : timeline: Arc<Timeline>,
2100 :
2101 : keyspace: (Lsn, KeySpace),
2102 :
2103 : new_deltas: Vec<ResidentLayer>,
2104 : new_images: Vec<ResidentLayer>,
2105 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2106 : }
2107 :
2108 : impl TimelineAdaptor {
2109 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2110 0 : Self {
2111 0 : timeline: timeline.clone(),
2112 0 : keyspace,
2113 0 : new_images: Vec::new(),
2114 0 : new_deltas: Vec::new(),
2115 0 : layers_to_delete: Vec::new(),
2116 0 : }
2117 0 : }
2118 :
2119 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2120 0 : let layers_to_delete = {
2121 0 : let guard = self.timeline.layers.read().await;
2122 0 : self.layers_to_delete
2123 0 : .iter()
2124 0 : .map(|x| guard.get_from_desc(x))
2125 0 : .collect::<Vec<Layer>>()
2126 0 : };
2127 0 : self.timeline
2128 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2129 0 : .await?;
2130 :
2131 0 : self.timeline
2132 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2133 :
2134 0 : self.new_deltas.clear();
2135 0 : self.layers_to_delete.clear();
2136 0 : Ok(())
2137 0 : }
2138 : }
2139 :
2140 : #[derive(Clone)]
2141 : struct ResidentDeltaLayer(ResidentLayer);
2142 : #[derive(Clone)]
2143 : struct ResidentImageLayer(ResidentLayer);
2144 :
2145 : impl CompactionJobExecutor for TimelineAdaptor {
2146 : type Key = crate::repository::Key;
2147 :
2148 : type Layer = OwnArc<PersistentLayerDesc>;
2149 : type DeltaLayer = ResidentDeltaLayer;
2150 : type ImageLayer = ResidentImageLayer;
2151 :
2152 : type RequestContext = crate::context::RequestContext;
2153 :
2154 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2155 0 : self.timeline.get_shard_identity()
2156 0 : }
2157 :
2158 0 : async fn get_layers(
2159 0 : &mut self,
2160 0 : key_range: &Range<Key>,
2161 0 : lsn_range: &Range<Lsn>,
2162 0 : _ctx: &RequestContext,
2163 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2164 0 : self.flush_updates().await?;
2165 :
2166 0 : let guard = self.timeline.layers.read().await;
2167 0 : let layer_map = guard.layer_map()?;
2168 :
2169 0 : let result = layer_map
2170 0 : .iter_historic_layers()
2171 0 : .filter(|l| {
2172 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2173 0 : })
2174 0 : .map(OwnArc)
2175 0 : .collect();
2176 0 : Ok(result)
2177 0 : }
2178 :
2179 0 : async fn get_keyspace(
2180 0 : &mut self,
2181 0 : key_range: &Range<Key>,
2182 0 : lsn: Lsn,
2183 0 : _ctx: &RequestContext,
2184 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2185 0 : if lsn == self.keyspace.0 {
2186 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2187 0 : &self.keyspace.1.ranges,
2188 0 : key_range,
2189 0 : ))
2190 : } else {
2191 : // The current compaction implementation only ever requests the key space
2192 : // at the compaction end LSN.
2193 0 : anyhow::bail!("keyspace not available for requested lsn");
2194 : }
2195 0 : }
2196 :
2197 0 : async fn downcast_delta_layer(
2198 0 : &self,
2199 0 : layer: &OwnArc<PersistentLayerDesc>,
2200 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2201 0 : // this is a lot more complex than a simple downcast...
2202 0 : if layer.is_delta() {
2203 0 : let l = {
2204 0 : let guard = self.timeline.layers.read().await;
2205 0 : guard.get_from_desc(layer)
2206 : };
2207 0 : let result = l.download_and_keep_resident().await?;
2208 :
2209 0 : Ok(Some(ResidentDeltaLayer(result)))
2210 : } else {
2211 0 : Ok(None)
2212 : }
2213 0 : }
2214 :
2215 0 : async fn create_image(
2216 0 : &mut self,
2217 0 : lsn: Lsn,
2218 0 : key_range: &Range<Key>,
2219 0 : ctx: &RequestContext,
2220 0 : ) -> anyhow::Result<()> {
2221 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2222 0 : }
2223 :
2224 0 : async fn create_delta(
2225 0 : &mut self,
2226 0 : lsn_range: &Range<Lsn>,
2227 0 : key_range: &Range<Key>,
2228 0 : input_layers: &[ResidentDeltaLayer],
2229 0 : ctx: &RequestContext,
2230 0 : ) -> anyhow::Result<()> {
2231 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2232 :
2233 0 : let mut all_entries = Vec::new();
2234 0 : for dl in input_layers.iter() {
2235 0 : all_entries.extend(dl.load_keys(ctx).await?);
2236 : }
2237 :
2238 : // The current stdlib sorting implementation is designed in a way where it is
2239 : // particularly fast where the slice is made up of sorted sub-ranges.
2240 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2241 :
2242 0 : let mut writer = DeltaLayerWriter::new(
2243 0 : self.timeline.conf,
2244 0 : self.timeline.timeline_id,
2245 0 : self.timeline.tenant_shard_id,
2246 0 : key_range.start,
2247 0 : lsn_range.clone(),
2248 0 : ctx,
2249 0 : )
2250 0 : .await?;
2251 :
2252 0 : let mut dup_values = 0;
2253 0 :
2254 0 : // This iterator walks through all key-value pairs from all the layers
2255 0 : // we're compacting, in key, LSN order.
2256 0 : let mut prev: Option<(Key, Lsn)> = None;
2257 : for &DeltaEntry {
2258 0 : key, lsn, ref val, ..
2259 0 : } in all_entries.iter()
2260 : {
2261 0 : if prev == Some((key, lsn)) {
2262 : // This is a duplicate. Skip it.
2263 : //
2264 : // It can happen if compaction is interrupted after writing some
2265 : // layers but not all, and we are compacting the range again.
2266 : // The calculations in the algorithm assume that there are no
2267 : // duplicates, so the math on targeted file size is likely off,
2268 : // and we will create smaller files than expected.
2269 0 : dup_values += 1;
2270 0 : continue;
2271 0 : }
2272 :
2273 0 : let value = val.load(ctx).await?;
2274 :
2275 0 : writer.put_value(key, lsn, value, ctx).await?;
2276 :
2277 0 : prev = Some((key, lsn));
2278 : }
2279 :
2280 0 : if dup_values > 0 {
2281 0 : warn!("delta layer created with {} duplicate values", dup_values);
2282 0 : }
2283 :
2284 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2285 0 : Err(anyhow::anyhow!(
2286 0 : "failpoint delta-layer-writer-fail-before-finish"
2287 0 : ))
2288 0 : });
2289 :
2290 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2291 0 : let new_delta_layer =
2292 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2293 :
2294 0 : self.new_deltas.push(new_delta_layer);
2295 0 : Ok(())
2296 0 : }
2297 :
2298 0 : async fn delete_layer(
2299 0 : &mut self,
2300 0 : layer: &OwnArc<PersistentLayerDesc>,
2301 0 : _ctx: &RequestContext,
2302 0 : ) -> anyhow::Result<()> {
2303 0 : self.layers_to_delete.push(layer.clone().0);
2304 0 : Ok(())
2305 0 : }
2306 : }
2307 :
2308 : impl TimelineAdaptor {
2309 0 : async fn create_image_impl(
2310 0 : &mut self,
2311 0 : lsn: Lsn,
2312 0 : key_range: &Range<Key>,
2313 0 : ctx: &RequestContext,
2314 0 : ) -> Result<(), CreateImageLayersError> {
2315 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2316 :
2317 0 : let image_layer_writer = ImageLayerWriter::new(
2318 0 : self.timeline.conf,
2319 0 : self.timeline.timeline_id,
2320 0 : self.timeline.tenant_shard_id,
2321 0 : key_range,
2322 0 : lsn,
2323 0 : ctx,
2324 0 : )
2325 0 : .await?;
2326 :
2327 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2328 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2329 0 : "failpoint image-layer-writer-fail-before-finish"
2330 0 : )))
2331 0 : });
2332 :
2333 0 : let keyspace = KeySpace {
2334 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2335 : };
2336 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2337 0 : let start = Key::MIN;
2338 : let ImageLayerCreationOutcome {
2339 0 : image,
2340 : next_start_key: _,
2341 0 : } = self
2342 0 : .timeline
2343 0 : .create_image_layer_for_rel_blocks(
2344 0 : &keyspace,
2345 0 : image_layer_writer,
2346 0 : lsn,
2347 0 : ctx,
2348 0 : key_range.clone(),
2349 0 : start,
2350 0 : )
2351 0 : .await?;
2352 :
2353 0 : if let Some(image_layer) = image {
2354 0 : self.new_images.push(image_layer);
2355 0 : }
2356 :
2357 0 : timer.stop_and_record();
2358 0 :
2359 0 : Ok(())
2360 0 : }
2361 : }
2362 :
2363 : impl CompactionRequestContext for crate::context::RequestContext {}
2364 :
2365 : #[derive(Debug, Clone)]
2366 : pub struct OwnArc<T>(pub Arc<T>);
2367 :
2368 : impl<T> Deref for OwnArc<T> {
2369 : type Target = <Arc<T> as Deref>::Target;
2370 0 : fn deref(&self) -> &Self::Target {
2371 0 : &self.0
2372 0 : }
2373 : }
2374 :
2375 : impl<T> AsRef<T> for OwnArc<T> {
2376 0 : fn as_ref(&self) -> &T {
2377 0 : self.0.as_ref()
2378 0 : }
2379 : }
2380 :
2381 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2382 0 : fn key_range(&self) -> &Range<Key> {
2383 0 : &self.key_range
2384 0 : }
2385 0 : fn lsn_range(&self) -> &Range<Lsn> {
2386 0 : &self.lsn_range
2387 0 : }
2388 0 : fn file_size(&self) -> u64 {
2389 0 : self.file_size
2390 0 : }
2391 0 : fn short_id(&self) -> std::string::String {
2392 0 : self.as_ref().short_id().to_string()
2393 0 : }
2394 0 : fn is_delta(&self) -> bool {
2395 0 : self.as_ref().is_delta()
2396 0 : }
2397 : }
2398 :
2399 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2400 0 : fn key_range(&self) -> &Range<Key> {
2401 0 : &self.layer_desc().key_range
2402 0 : }
2403 0 : fn lsn_range(&self) -> &Range<Lsn> {
2404 0 : &self.layer_desc().lsn_range
2405 0 : }
2406 0 : fn file_size(&self) -> u64 {
2407 0 : self.layer_desc().file_size
2408 0 : }
2409 0 : fn short_id(&self) -> std::string::String {
2410 0 : self.layer_desc().short_id().to_string()
2411 0 : }
2412 0 : fn is_delta(&self) -> bool {
2413 0 : true
2414 0 : }
2415 : }
2416 :
2417 : use crate::tenant::timeline::DeltaEntry;
2418 :
2419 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2420 0 : fn key_range(&self) -> &Range<Key> {
2421 0 : &self.0.layer_desc().key_range
2422 0 : }
2423 0 : fn lsn_range(&self) -> &Range<Lsn> {
2424 0 : &self.0.layer_desc().lsn_range
2425 0 : }
2426 0 : fn file_size(&self) -> u64 {
2427 0 : self.0.layer_desc().file_size
2428 0 : }
2429 0 : fn short_id(&self) -> std::string::String {
2430 0 : self.0.layer_desc().short_id().to_string()
2431 0 : }
2432 0 : fn is_delta(&self) -> bool {
2433 0 : true
2434 0 : }
2435 : }
2436 :
2437 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2438 : type DeltaEntry<'a> = DeltaEntry<'a>;
2439 :
2440 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2441 0 : self.0.load_keys(ctx).await
2442 0 : }
2443 : }
2444 :
2445 : impl CompactionLayer<Key> for ResidentImageLayer {
2446 0 : fn key_range(&self) -> &Range<Key> {
2447 0 : &self.0.layer_desc().key_range
2448 0 : }
2449 0 : fn lsn_range(&self) -> &Range<Lsn> {
2450 0 : &self.0.layer_desc().lsn_range
2451 0 : }
2452 0 : fn file_size(&self) -> u64 {
2453 0 : self.0.layer_desc().file_size
2454 0 : }
2455 0 : fn short_id(&self) -> std::string::String {
2456 0 : self.0.layer_desc().short_id().to_string()
2457 0 : }
2458 0 : fn is_delta(&self) -> bool {
2459 0 : false
2460 0 : }
2461 : }
2462 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|