Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::keyspace::ShardedRange;
23 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
24 : use tokio_util::sync::CancellationToken;
25 : use tracing::{debug, info, info_span, trace, warn, Instrument};
26 : use utils::id::TimelineId;
27 :
28 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
29 : use crate::page_cache;
30 : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
31 : use crate::tenant::remote_timeline_client::WaitCompletionError;
32 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
33 : use crate::tenant::storage_layer::{
34 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::ImageLayerCreationOutcome;
37 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
38 : use crate::tenant::timeline::{Layer, ResidentLayer};
39 : use crate::tenant::DeltaLayer;
40 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
41 :
42 : use crate::keyspace::KeySpace;
43 : use crate::repository::{Key, Value};
44 :
45 : use utils::lsn::Lsn;
46 :
47 : use pageserver_compaction::helpers::overlaps_with;
48 : use pageserver_compaction::interface::*;
49 :
50 : use super::CompactionError;
51 :
52 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
53 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
54 :
55 : /// The result of bottom-most compaction for a single key at each LSN.
56 : #[derive(Debug)]
57 : #[cfg_attr(test, derive(PartialEq))]
58 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
59 :
60 : /// The result of bottom-most compaction.
61 : #[derive(Debug)]
62 : #[cfg_attr(test, derive(PartialEq))]
63 : pub(crate) struct KeyHistoryRetention {
64 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
65 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
66 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
67 : pub(crate) above_horizon: KeyLogAtLsn,
68 : }
69 :
70 : impl KeyHistoryRetention {
71 302 : async fn pipe_to(
72 302 : self,
73 302 : key: Key,
74 302 : delta_writer: &mut Vec<(Key, Lsn, Value)>,
75 302 : mut image_writer: Option<&mut ImageLayerWriter>,
76 302 : ctx: &RequestContext,
77 302 : ) -> anyhow::Result<()> {
78 302 : let mut first_batch = true;
79 906 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
80 604 : if first_batch {
81 302 : if logs.len() == 1 && logs[0].1.is_image() {
82 288 : let Value::Image(img) = &logs[0].1 else {
83 0 : unreachable!()
84 : };
85 288 : if let Some(image_writer) = image_writer.as_mut() {
86 291 : image_writer.put_image(key, img.clone(), ctx).await?;
87 0 : } else {
88 0 : delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
89 0 : }
90 : } else {
91 28 : for (lsn, val) in logs {
92 14 : delta_writer.push((key, lsn, val));
93 14 : }
94 : }
95 302 : first_batch = false;
96 : } else {
97 354 : for (lsn, val) in logs {
98 52 : delta_writer.push((key, lsn, val));
99 52 : }
100 : }
101 : }
102 302 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
103 314 : for (lsn, val) in above_horizon_logs {
104 12 : delta_writer.push((key, lsn, val));
105 12 : }
106 302 : Ok(())
107 302 : }
108 : }
109 :
110 : impl Timeline {
111 : /// TODO: cancellation
112 : ///
113 : /// Returns whether the compaction has pending tasks.
114 364 : pub(crate) async fn compact_legacy(
115 364 : self: &Arc<Self>,
116 364 : cancel: &CancellationToken,
117 364 : flags: EnumSet<CompactFlags>,
118 364 : ctx: &RequestContext,
119 364 : ) -> Result<bool, CompactionError> {
120 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
121 0 : self.compact_with_gc(cancel, ctx)
122 0 : .await
123 0 : .map_err(CompactionError::Other)?;
124 0 : return Ok(false);
125 364 : }
126 364 :
127 364 : // High level strategy for compaction / image creation:
128 364 : //
129 364 : // 1. First, calculate the desired "partitioning" of the
130 364 : // currently in-use key space. The goal is to partition the
131 364 : // key space into roughly fixed-size chunks, but also take into
132 364 : // account any existing image layers, and try to align the
133 364 : // chunk boundaries with the existing image layers to avoid
134 364 : // too much churn. Also try to align chunk boundaries with
135 364 : // relation boundaries. In principle, we don't know about
136 364 : // relation boundaries here, we just deal with key-value
137 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
138 364 : // map relations into key-value pairs. But in practice we know
139 364 : // that 'field6' is the block number, and the fields 1-5
140 364 : // identify a relation. This is just an optimization,
141 364 : // though.
142 364 : //
143 364 : // 2. Once we know the partitioning, for each partition,
144 364 : // decide if it's time to create a new image layer. The
145 364 : // criteria is: there has been too much "churn" since the last
146 364 : // image layer? The "churn" is fuzzy concept, it's a
147 364 : // combination of too many delta files, or too much WAL in
148 364 : // total in the delta file. Or perhaps: if creating an image
149 364 : // file would allow to delete some older files.
150 364 : //
151 364 : // 3. After that, we compact all level0 delta files if there
152 364 : // are too many of them. While compacting, we also garbage
153 364 : // collect any page versions that are no longer needed because
154 364 : // of the new image layers we created in step 2.
155 364 : //
156 364 : // TODO: This high level strategy hasn't been implemented yet.
157 364 : // Below are functions compact_level0() and create_image_layers()
158 364 : // but they are a bit ad hoc and don't quite work like it's explained
159 364 : // above. Rewrite it.
160 364 :
161 364 : // Is the timeline being deleted?
162 364 : if self.is_stopping() {
163 0 : trace!("Dropping out of compaction on timeline shutdown");
164 0 : return Err(CompactionError::ShuttingDown);
165 364 : }
166 364 :
167 364 : let target_file_size = self.get_checkpoint_distance();
168 :
169 : // Define partitioning schema if needed
170 :
171 : // FIXME: the match should only cover repartitioning, not the next steps
172 364 : let (partition_count, has_pending_tasks) = match self
173 364 : .repartition(
174 364 : self.get_last_record_lsn(),
175 364 : self.get_compaction_target_size(),
176 364 : flags,
177 364 : ctx,
178 364 : )
179 16708 : .await
180 : {
181 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
182 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
183 364 : let image_ctx = RequestContextBuilder::extend(ctx)
184 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
185 364 : .build();
186 364 :
187 364 : // 2. Compact
188 364 : let timer = self.metrics.compact_time_histo.start_timer();
189 45834 : let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
190 364 : timer.stop_and_record();
191 364 :
192 364 : let mut partitioning = dense_partitioning;
193 364 : partitioning
194 364 : .parts
195 364 : .extend(sparse_partitioning.into_dense().parts);
196 364 :
197 364 : // 3. Create new image layers for partitions that have been modified
198 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
199 364 : if fully_compacted {
200 364 : let image_layers = self
201 364 : .create_image_layers(
202 364 : &partitioning,
203 364 : lsn,
204 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
205 14 : ImageLayerCreationMode::Force
206 : } else {
207 350 : ImageLayerCreationMode::Try
208 : },
209 364 : &image_ctx,
210 : )
211 14446 : .await?;
212 :
213 364 : self.upload_new_image_layers(image_layers)?;
214 : } else {
215 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
216 : }
217 364 : (partitioning.parts.len(), !fully_compacted)
218 : }
219 0 : Err(err) => {
220 0 : // no partitioning? This is normal, if the timeline was just created
221 0 : // as an empty timeline. Also in unit tests, when we use the timeline
222 0 : // as a simple key-value store, ignoring the datadir layout. Log the
223 0 : // error but continue.
224 0 : //
225 0 : // Suppress error when it's due to cancellation
226 0 : if !self.cancel.is_cancelled() {
227 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
228 0 : }
229 0 : (1, false)
230 : }
231 : };
232 :
233 364 : if self.shard_identity.count >= ShardCount::new(2) {
234 : // Limit the number of layer rewrites to the number of partitions: this means its
235 : // runtime should be comparable to a full round of image layer creations, rather than
236 : // being potentially much longer.
237 0 : let rewrite_max = partition_count;
238 0 :
239 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
240 364 : }
241 :
242 364 : Ok(has_pending_tasks)
243 364 : }
244 :
245 : /// Check for layers that are elegible to be rewritten:
246 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
247 : /// we don't indefinitely retain keys in this shard that aren't needed.
248 : /// - For future use: layers beyond pitr_interval that are in formats we would
249 : /// rather not maintain compatibility with indefinitely.
250 : ///
251 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
252 : /// how much work it will try to do in each compaction pass.
253 0 : async fn compact_shard_ancestors(
254 0 : self: &Arc<Self>,
255 0 : rewrite_max: usize,
256 0 : ctx: &RequestContext,
257 0 : ) -> Result<(), CompactionError> {
258 0 : let mut drop_layers = Vec::new();
259 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
260 0 :
261 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
262 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
263 0 : // pitr_interval, for example because a branchpoint references it.
264 0 : //
265 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
266 0 : // are rewriting layers.
267 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
268 0 :
269 0 : tracing::info!(
270 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
271 0 : *latest_gc_cutoff,
272 0 : self.gc_info.read().unwrap().cutoffs.time
273 : );
274 :
275 0 : let layers = self.layers.read().await;
276 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
277 0 : let layer = layers.get_from_desc(&layer_desc);
278 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
279 : // This layer does not belong to a historic ancestor, no need to re-image it.
280 0 : continue;
281 0 : }
282 0 :
283 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
284 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
285 0 : let layer_local_page_count = sharded_range.page_count();
286 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
287 0 : if layer_local_page_count == 0 {
288 : // This ancestral layer only covers keys that belong to other shards.
289 : // We include the full metadata in the log: if we had some critical bug that caused
290 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
291 0 : info!(%layer, old_metadata=?layer.metadata(),
292 0 : "dropping layer after shard split, contains no keys for this shard.",
293 : );
294 :
295 0 : if cfg!(debug_assertions) {
296 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
297 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
298 : // should be !is_key_disposable()
299 0 : let range = layer_desc.get_key_range();
300 0 : let mut key = range.start;
301 0 : while key < range.end {
302 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
303 0 : key = key.next();
304 : }
305 0 : }
306 :
307 0 : drop_layers.push(layer);
308 0 : continue;
309 0 : } else if layer_local_page_count != u32::MAX
310 0 : && layer_local_page_count == layer_raw_page_count
311 : {
312 0 : debug!(%layer,
313 0 : "layer is entirely shard local ({} keys), no need to filter it",
314 : layer_local_page_count
315 : );
316 0 : continue;
317 0 : }
318 0 :
319 0 : // Don't bother re-writing a layer unless it will at least halve its size
320 0 : if layer_local_page_count != u32::MAX
321 0 : && layer_local_page_count > layer_raw_page_count / 2
322 : {
323 0 : debug!(%layer,
324 0 : "layer is already mostly local ({}/{}), not rewriting",
325 : layer_local_page_count,
326 : layer_raw_page_count
327 : );
328 0 : }
329 :
330 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
331 : // without incurring the I/O cost of a rewrite.
332 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
333 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
334 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
335 0 : continue;
336 0 : }
337 0 :
338 0 : if layer_desc.is_delta() {
339 : // We do not yet implement rewrite of delta layers
340 0 : debug!(%layer, "Skipping rewrite of delta layer");
341 0 : continue;
342 0 : }
343 0 :
344 0 : // Only rewrite layers if their generations differ. This guarantees:
345 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
346 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
347 0 : if layer.metadata().generation == self.generation {
348 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
349 0 : continue;
350 0 : }
351 0 :
352 0 : if layers_to_rewrite.len() >= rewrite_max {
353 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
354 0 : layers_to_rewrite.len()
355 : );
356 0 : continue;
357 0 : }
358 0 :
359 0 : // Fall through: all our conditions for doing a rewrite passed.
360 0 : layers_to_rewrite.push(layer);
361 : }
362 :
363 : // Drop read lock on layer map before we start doing time-consuming I/O
364 0 : drop(layers);
365 0 :
366 0 : let mut replace_image_layers = Vec::new();
367 :
368 0 : for layer in layers_to_rewrite {
369 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
370 0 : let mut image_layer_writer = ImageLayerWriter::new(
371 0 : self.conf,
372 0 : self.timeline_id,
373 0 : self.tenant_shard_id,
374 0 : &layer.layer_desc().key_range,
375 0 : layer.layer_desc().image_layer_lsn(),
376 0 : ctx,
377 0 : )
378 0 : .await
379 0 : .map_err(CompactionError::Other)?;
380 :
381 : // Safety of layer rewrites:
382 : // - We are writing to a different local file path than we are reading from, so the old Layer
383 : // cannot interfere with the new one.
384 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
385 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
386 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
387 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
388 : // - Any readers that have a reference to the old layer will keep it alive until they are done
389 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
390 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
391 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
392 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
393 : // - ingestion, which only inserts layers, therefore cannot collide with us.
394 0 : let resident = layer
395 0 : .download_and_keep_resident()
396 0 : .await
397 0 : .map_err(CompactionError::input_layer_download_failed)?;
398 :
399 0 : let keys_written = resident
400 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
401 0 : .await?;
402 :
403 0 : if keys_written > 0 {
404 0 : let new_layer = image_layer_writer
405 0 : .finish(self, ctx)
406 0 : .await
407 0 : .map_err(CompactionError::Other)?;
408 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
409 0 : layer.metadata().file_size,
410 0 : new_layer.metadata().file_size);
411 :
412 0 : replace_image_layers.push((layer, new_layer));
413 0 : } else {
414 0 : // Drop the old layer. Usually for this case we would already have noticed that
415 0 : // the layer has no data for us with the ShardedRange check above, but
416 0 : drop_layers.push(layer);
417 0 : }
418 : }
419 :
420 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
421 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
422 : // to remote index) and be removed. This is inefficient but safe.
423 : fail::fail_point!("compact-shard-ancestors-localonly");
424 :
425 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
426 0 : self.rewrite_layers(replace_image_layers, drop_layers)
427 0 : .await?;
428 :
429 : fail::fail_point!("compact-shard-ancestors-enqueued");
430 :
431 : // We wait for all uploads to complete before finishing this compaction stage. This is not
432 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
433 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
434 : // load.
435 0 : match self.remote_client.wait_completion().await {
436 0 : Ok(()) => (),
437 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
438 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
439 0 : return Err(CompactionError::ShuttingDown)
440 : }
441 : }
442 :
443 : fail::fail_point!("compact-shard-ancestors-persistent");
444 :
445 0 : Ok(())
446 0 : }
447 :
448 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
449 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
450 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
451 : ///
452 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
453 : /// that we know won't be needed for reads.
454 528 : pub(super) async fn update_layer_visibility(&self) {
455 528 : let head_lsn = self.get_last_record_lsn();
456 :
457 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
458 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
459 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
460 : // they will be subject to L0->L1 compaction in the near future.
461 528 : let layer_manager = self.layers.read().await;
462 528 : let layer_map = layer_manager.layer_map();
463 :
464 528 : let readable_points = {
465 528 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
466 528 :
467 528 : let mut readable_points = Vec::with_capacity(children.len() + 1);
468 528 : for (child_lsn, _child_timeline_id) in &children {
469 0 : readable_points.push(*child_lsn);
470 0 : }
471 528 : readable_points.push(head_lsn);
472 528 : readable_points
473 528 : };
474 528 :
475 528 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
476 3012 : for (layer_desc, visibility) in layer_visibility {
477 2484 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
478 2484 : let layer = layer_manager.get_from_desc(&layer_desc);
479 2484 : layer.set_visibility(visibility);
480 2484 : }
481 :
482 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
483 : // avoid assuming that everything at a branch point is visible.
484 528 : drop(covered);
485 528 : }
486 :
487 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
488 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
489 364 : async fn compact_level0(
490 364 : self: &Arc<Self>,
491 364 : target_file_size: u64,
492 364 : ctx: &RequestContext,
493 364 : ) -> Result<bool, CompactionError> {
494 : let CompactLevel0Phase1Result {
495 364 : new_layers,
496 364 : deltas_to_compact,
497 364 : fully_compacted,
498 : } = {
499 364 : let phase1_span = info_span!("compact_level0_phase1");
500 364 : let ctx = ctx.attached_child();
501 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
502 364 : version: Some(2),
503 364 : tenant_id: Some(self.tenant_shard_id),
504 364 : timeline_id: Some(self.timeline_id),
505 364 : ..Default::default()
506 364 : };
507 364 :
508 364 : let begin = tokio::time::Instant::now();
509 364 : let phase1_layers_locked = self.layers.read().await;
510 364 : let now = tokio::time::Instant::now();
511 364 : stats.read_lock_acquisition_micros =
512 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
513 364 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
514 364 : .instrument(phase1_span)
515 45834 : .await?
516 : };
517 :
518 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
519 : // nothing to do
520 336 : return Ok(true);
521 28 : }
522 28 :
523 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
524 0 : .await?;
525 28 : Ok(fully_compacted)
526 364 : }
527 :
528 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
529 364 : async fn compact_level0_phase1<'a>(
530 364 : self: &'a Arc<Self>,
531 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
532 364 : mut stats: CompactLevel0Phase1StatsBuilder,
533 364 : target_file_size: u64,
534 364 : ctx: &RequestContext,
535 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
536 364 : stats.read_lock_held_spawn_blocking_startup_micros =
537 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
538 364 : let layers = guard.layer_map();
539 364 : let level0_deltas = layers.get_level0_deltas();
540 364 : let mut level0_deltas = level0_deltas
541 364 : .into_iter()
542 1616 : .map(|x| guard.get_from_desc(&x))
543 364 : .collect_vec();
544 364 : stats.level0_deltas_count = Some(level0_deltas.len());
545 364 :
546 364 : // Only compact if enough layers have accumulated.
547 364 : let threshold = self.get_compaction_threshold();
548 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
549 336 : debug!(
550 0 : level0_deltas = level0_deltas.len(),
551 0 : threshold, "too few deltas to compact"
552 : );
553 336 : return Ok(CompactLevel0Phase1Result::default());
554 28 : }
555 28 :
556 28 : // Gather the files to compact in this iteration.
557 28 : //
558 28 : // Start with the oldest Level 0 delta file, and collect any other
559 28 : // level 0 files that form a contiguous sequence, such that the end
560 28 : // LSN of previous file matches the start LSN of the next file.
561 28 : //
562 28 : // Note that if the files don't form such a sequence, we might
563 28 : // "compact" just a single file. That's a bit pointless, but it allows
564 28 : // us to get rid of the level 0 file, and compact the other files on
565 28 : // the next iteration. This could probably made smarter, but such
566 28 : // "gaps" in the sequence of level 0 files should only happen in case
567 28 : // of a crash, partial download from cloud storage, or something like
568 28 : // that, so it's not a big deal in practice.
569 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
570 28 : let mut level0_deltas_iter = level0_deltas.iter();
571 28 :
572 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
573 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
574 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
575 28 :
576 28 : // Accumulate the size of layers in `deltas_to_compact`
577 28 : let mut deltas_to_compact_bytes = 0;
578 28 :
579 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
580 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
581 28 : // work in this function to only operate on this much delta data at once.
582 28 : //
583 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
584 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
585 28 : // still let them compact a full stack of L0s in one go.
586 28 : let delta_size_limit = std::cmp::max(
587 28 : self.get_compaction_threshold(),
588 28 : DEFAULT_COMPACTION_THRESHOLD,
589 28 : ) as u64
590 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
591 28 :
592 28 : let mut fully_compacted = true;
593 28 :
594 28 : deltas_to_compact.push(
595 28 : first_level0_delta
596 28 : .download_and_keep_resident()
597 0 : .await
598 28 : .map_err(CompactionError::input_layer_download_failed)?,
599 : );
600 402 : for l in level0_deltas_iter {
601 374 : let lsn_range = &l.layer_desc().lsn_range;
602 374 :
603 374 : if lsn_range.start != prev_lsn_end {
604 0 : break;
605 374 : }
606 374 : deltas_to_compact.push(
607 374 : l.download_and_keep_resident()
608 0 : .await
609 374 : .map_err(CompactionError::input_layer_download_failed)?,
610 : );
611 374 : deltas_to_compact_bytes += l.metadata().file_size;
612 374 : prev_lsn_end = lsn_range.end;
613 374 :
614 374 : if deltas_to_compact_bytes >= delta_size_limit {
615 0 : info!(
616 0 : l0_deltas_selected = deltas_to_compact.len(),
617 0 : l0_deltas_total = level0_deltas.len(),
618 0 : "L0 compaction picker hit max delta layer size limit: {}",
619 : delta_size_limit
620 : );
621 0 : fully_compacted = false;
622 0 :
623 0 : // Proceed with compaction, but only a subset of L0s
624 0 : break;
625 374 : }
626 : }
627 28 : let lsn_range = Range {
628 28 : start: deltas_to_compact
629 28 : .first()
630 28 : .unwrap()
631 28 : .layer_desc()
632 28 : .lsn_range
633 28 : .start,
634 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
635 28 : };
636 28 :
637 28 : info!(
638 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
639 0 : lsn_range.start,
640 0 : lsn_range.end,
641 0 : deltas_to_compact.len(),
642 0 : level0_deltas.len()
643 : );
644 :
645 402 : for l in deltas_to_compact.iter() {
646 402 : info!("compact includes {l}");
647 : }
648 :
649 : // We don't need the original list of layers anymore. Drop it so that
650 : // we don't accidentally use it later in the function.
651 28 : drop(level0_deltas);
652 28 :
653 28 : stats.read_lock_held_prerequisites_micros = stats
654 28 : .read_lock_held_spawn_blocking_startup_micros
655 28 : .till_now();
656 :
657 : // TODO: replace with streaming k-merge
658 28 : let all_keys = {
659 28 : let mut all_keys = Vec::new();
660 402 : for l in deltas_to_compact.iter() {
661 2363 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
662 : }
663 : // The current stdlib sorting implementation is designed in a way where it is
664 : // particularly fast where the slice is made up of sorted sub-ranges.
665 4431788 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
666 28 : all_keys
667 28 : };
668 28 :
669 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
670 :
671 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
672 : //
673 : // A hole is a key range for which this compaction doesn't have any WAL records.
674 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
675 : // cover the hole, but actually don't contain any WAL records for that key range.
676 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
677 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
678 : //
679 : // The algorithm chooses holes as follows.
680 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
681 : // - Filter: min threshold on range length
682 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
683 : //
684 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
685 : #[derive(PartialEq, Eq)]
686 : struct Hole {
687 : key_range: Range<Key>,
688 : coverage_size: usize,
689 : }
690 28 : let holes: Vec<Hole> = {
691 : use std::cmp::Ordering;
692 : impl Ord for Hole {
693 0 : fn cmp(&self, other: &Self) -> Ordering {
694 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
695 0 : }
696 : }
697 : impl PartialOrd for Hole {
698 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
699 0 : Some(self.cmp(other))
700 0 : }
701 : }
702 28 : let max_holes = deltas_to_compact.len();
703 28 : let last_record_lsn = self.get_last_record_lsn();
704 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
705 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
706 28 : // min-heap (reserve space for one more element added before eviction)
707 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
708 28 : let mut prev: Option<Key> = None;
709 :
710 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
711 2064038 : if let Some(prev_key) = prev {
712 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
713 : // compaction is the gap between data key and metadata keys.
714 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
715 0 : && !Key::is_metadata_key(&prev_key)
716 : {
717 0 : let key_range = prev_key..next_key;
718 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
719 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
720 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
721 0 : // That is why it is better to measure size of hole as number of covering image layers.
722 0 : let coverage_size =
723 0 : layers.image_coverage(&key_range, last_record_lsn).len();
724 0 : if coverage_size >= min_hole_coverage_size {
725 0 : heap.push(Hole {
726 0 : key_range,
727 0 : coverage_size,
728 0 : });
729 0 : if heap.len() > max_holes {
730 0 : heap.pop(); // remove smallest hole
731 0 : }
732 0 : }
733 2064010 : }
734 28 : }
735 2064038 : prev = Some(next_key.next());
736 : }
737 28 : let mut holes = heap.into_vec();
738 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
739 28 : holes
740 28 : };
741 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
742 28 : drop_rlock(guard);
743 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
744 :
745 : // This iterator walks through all key-value pairs from all the layers
746 : // we're compacting, in key, LSN order.
747 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
748 : // then the Value::Image is ordered before Value::WalRecord.
749 : //
750 : // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
751 : // option and validation code once we've reached confidence.
752 : enum AllValuesIter<'a> {
753 : PageCachedBlobIo {
754 : all_keys_iter: VecIter<'a>,
755 : },
756 : StreamingKmergeBypassingPageCache {
757 : merge_iter: MergeIterator<'a>,
758 : },
759 : ValidatingStreamingKmergeBypassingPageCache {
760 : mode: CompactL0BypassPageCacheValidation,
761 : merge_iter: MergeIterator<'a>,
762 : all_keys_iter: VecIter<'a>,
763 : },
764 : }
765 : type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
766 : impl AllValuesIter<'_> {
767 2064066 : async fn next_all_keys_iter(
768 2064066 : iter: &mut VecIter<'_>,
769 2064066 : ctx: &RequestContext,
770 2064066 : ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
771 : let Some(DeltaEntry {
772 2064038 : key,
773 2064038 : lsn,
774 2064038 : val: value_ref,
775 : ..
776 2064066 : }) = iter.next()
777 : else {
778 28 : return Ok(None);
779 : };
780 2064038 : let value = value_ref.load(ctx).await?;
781 2064038 : Ok(Some((*key, *lsn, value)))
782 2064066 : }
783 2064066 : async fn next(
784 2064066 : &mut self,
785 2064066 : ctx: &RequestContext,
786 2064066 : ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
787 2064066 : match self {
788 0 : AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
789 0 : Self::next_all_keys_iter(iter, ctx).await
790 : }
791 0 : AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
792 2064066 : AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
793 : // advance both iterators
794 2064066 : let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
795 2064066 : let merge_iter_item = merge_iter.next().await;
796 : // compare results & log warnings as needed
797 : macro_rules! rate_limited_warn {
798 : ($($arg:tt)*) => {{
799 : if cfg!(debug_assertions) || cfg!(feature = "testing") {
800 : warn!($($arg)*);
801 : panic!("CompactL0BypassPageCacheValidation failure, check logs");
802 : }
803 : use once_cell::sync::Lazy;
804 : use utils::rate_limit::RateLimit;
805 : use std::sync::Mutex;
806 : use std::time::Duration;
807 : static LOGGED: Lazy<Mutex<RateLimit>> =
808 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
809 : let mut rate_limit = LOGGED.lock().unwrap();
810 0 : rate_limit.call(|| {
811 0 : warn!($($arg)*);
812 0 : });
813 : }}
814 : }
815 2064066 : match (&all_keys_iter_item, &merge_iter_item) {
816 0 : (Err(_), Err(_)) => {
817 0 : // don't bother asserting equivality of the errors
818 0 : }
819 0 : (Err(all_keys), Ok(merge)) => {
820 0 : rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
821 0 : },
822 0 : (Ok(all_keys), Err(merge)) => {
823 0 : rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
824 0 : },
825 28 : (Ok(None), Ok(None)) => { }
826 0 : (Ok(Some(all_keys)), Ok(None)) => {
827 0 : rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
828 0 : }
829 0 : (Ok(None), Ok(Some(merge))) => {
830 0 : rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
831 0 : }
832 2064038 : (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
833 2064038 : match mode {
834 : // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
835 : CompactL0BypassPageCacheValidation::KeyLsn => {
836 0 : let all_keys = (all_keys_key, all_keys_lsn);
837 0 : let merge = (merge_key, merge_lsn);
838 0 : if all_keys != merge {
839 0 : rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
840 0 : }
841 : }
842 : CompactL0BypassPageCacheValidation::KeyLsnValue => {
843 2064038 : let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
844 2064038 : let merge = (merge_key, merge_lsn, merge_value);
845 2064038 : if all_keys != merge {
846 0 : rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
847 2064038 : }
848 : }
849 : }
850 : }
851 : }
852 : // in case of mismatch, trust the legacy all_keys_iter_item
853 2064066 : all_keys_iter_item
854 2064066 : }.instrument(info_span!("next")).await
855 : }
856 2064066 : }
857 : }
858 28 : let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
859 0 : CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
860 0 : all_keys_iter: all_keys.iter(),
861 0 : },
862 28 : CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
863 28 : let merge_iter = {
864 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
865 402 : for l in deltas_to_compact.iter() {
866 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
867 402 : deltas.push(l);
868 : }
869 28 : MergeIterator::create(&deltas, &[], ctx)
870 28 : };
871 28 : match validate {
872 0 : None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
873 28 : Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
874 28 : mode: validate.clone(),
875 28 : merge_iter,
876 28 : all_keys_iter: all_keys.iter(),
877 28 : },
878 : }
879 : }
880 : };
881 :
882 : // This iterator walks through all keys and is needed to calculate size used by each key
883 28 : let mut all_keys_iter = all_keys
884 28 : .iter()
885 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
886 2064010 : .coalesce(|mut prev, cur| {
887 2064010 : // Coalesce keys that belong to the same key pair.
888 2064010 : // This ensures that compaction doesn't put them
889 2064010 : // into different layer files.
890 2064010 : // Still limit this by the target file size,
891 2064010 : // so that we keep the size of the files in
892 2064010 : // check.
893 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
894 40038 : prev.2 += cur.2;
895 40038 : Ok(prev)
896 : } else {
897 2023972 : Err((prev, cur))
898 : }
899 2064010 : });
900 28 :
901 28 : // Merge the contents of all the input delta layers into a new set
902 28 : // of delta layers, based on the current partitioning.
903 28 : //
904 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
905 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
906 28 : // would be too large. In that case, we also split on the LSN dimension.
907 28 : //
908 28 : // LSN
909 28 : // ^
910 28 : // |
911 28 : // | +-----------+ +--+--+--+--+
912 28 : // | | | | | | | |
913 28 : // | +-----------+ | | | | |
914 28 : // | | | | | | | |
915 28 : // | +-----------+ ==> | | | | |
916 28 : // | | | | | | | |
917 28 : // | +-----------+ | | | | |
918 28 : // | | | | | | | |
919 28 : // | +-----------+ +--+--+--+--+
920 28 : // |
921 28 : // +--------------> key
922 28 : //
923 28 : //
924 28 : // If one key (X) has a lot of page versions:
925 28 : //
926 28 : // LSN
927 28 : // ^
928 28 : // | (X)
929 28 : // | +-----------+ +--+--+--+--+
930 28 : // | | | | | | | |
931 28 : // | +-----------+ | | +--+ |
932 28 : // | | | | | | | |
933 28 : // | +-----------+ ==> | | | | |
934 28 : // | | | | | +--+ |
935 28 : // | +-----------+ | | | | |
936 28 : // | | | | | | | |
937 28 : // | +-----------+ +--+--+--+--+
938 28 : // |
939 28 : // +--------------> key
940 28 : // TODO: this actually divides the layers into fixed-size chunks, not
941 28 : // based on the partitioning.
942 28 : //
943 28 : // TODO: we should also opportunistically materialize and
944 28 : // garbage collect what we can.
945 28 : let mut new_layers = Vec::new();
946 28 : let mut prev_key: Option<Key> = None;
947 28 : let mut writer: Option<DeltaLayerWriter> = None;
948 28 : let mut key_values_total_size = 0u64;
949 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
950 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
951 28 : let mut next_hole = 0; // index of next hole in holes vector
952 :
953 2064066 : while let Some((key, lsn, value)) = all_values_iter
954 2064066 : .next(ctx)
955 39360 : .await
956 2064066 : .map_err(CompactionError::Other)?
957 : {
958 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
959 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
960 2064038 : if !same_key || lsn == dup_end_lsn {
961 2024000 : let mut next_key_size = 0u64;
962 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
963 2024000 : dup_start_lsn = Lsn::INVALID;
964 2024000 : if !same_key {
965 2024000 : dup_end_lsn = Lsn::INVALID;
966 2024000 : }
967 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
968 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
969 2024000 : next_key_size = next_size;
970 2024000 : if key != next_key {
971 2023972 : if dup_end_lsn.is_valid() {
972 0 : // We are writting segment with duplicates:
973 0 : // place all remaining values of this key in separate segment
974 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
975 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
976 2023972 : }
977 2023972 : break;
978 28 : }
979 28 : key_values_total_size += next_size;
980 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
981 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
982 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
983 : // Split key between multiple layers: such layer can contain only single key
984 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
985 0 : dup_end_lsn // new segment with duplicates starts where old one stops
986 : } else {
987 0 : lsn // start with the first LSN for this key
988 : };
989 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
990 0 : break;
991 28 : }
992 : }
993 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
994 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
995 0 : dup_start_lsn = dup_end_lsn;
996 0 : dup_end_lsn = lsn_range.end;
997 2024000 : }
998 2024000 : if writer.is_some() {
999 2023972 : let written_size = writer.as_mut().unwrap().size();
1000 2023972 : let contains_hole =
1001 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1002 : // check if key cause layer overflow or contains hole...
1003 2023972 : if is_dup_layer
1004 2023972 : || dup_end_lsn.is_valid()
1005 2023972 : || written_size + key_values_total_size > target_file_size
1006 2023692 : || contains_hole
1007 : {
1008 : // ... if so, flush previous layer and prepare to write new one
1009 280 : new_layers.push(
1010 280 : writer
1011 280 : .take()
1012 280 : .unwrap()
1013 280 : .finish(prev_key.unwrap().next(), self, ctx)
1014 720 : .await
1015 280 : .map_err(CompactionError::Other)?,
1016 : );
1017 280 : writer = None;
1018 280 :
1019 280 : if contains_hole {
1020 0 : // skip hole
1021 0 : next_hole += 1;
1022 280 : }
1023 2023692 : }
1024 28 : }
1025 : // Remember size of key value because at next iteration we will access next item
1026 2024000 : key_values_total_size = next_key_size;
1027 40038 : }
1028 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1029 0 : Err(CompactionError::Other(anyhow::anyhow!(
1030 0 : "failpoint delta-layer-writer-fail-before-finish"
1031 0 : )))
1032 2064038 : });
1033 :
1034 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1035 2064038 : if writer.is_none() {
1036 308 : // Create writer if not initiaized yet
1037 308 : writer = Some(
1038 : DeltaLayerWriter::new(
1039 308 : self.conf,
1040 308 : self.timeline_id,
1041 308 : self.tenant_shard_id,
1042 308 : key,
1043 308 : if dup_end_lsn.is_valid() {
1044 : // this is a layer containing slice of values of the same key
1045 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1046 0 : dup_start_lsn..dup_end_lsn
1047 : } else {
1048 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1049 308 : lsn_range.clone()
1050 : },
1051 308 : ctx,
1052 : )
1053 154 : .await
1054 308 : .map_err(CompactionError::Other)?,
1055 : );
1056 2063730 : }
1057 :
1058 2064038 : writer
1059 2064038 : .as_mut()
1060 2064038 : .unwrap()
1061 2064038 : .put_value(key, lsn, value, ctx)
1062 1217 : .await
1063 2064038 : .map_err(CompactionError::Other)?;
1064 : } else {
1065 0 : debug!(
1066 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1067 0 : key,
1068 0 : self.shard_identity.get_shard_number(&key)
1069 : );
1070 : }
1071 :
1072 2064038 : if !new_layers.is_empty() {
1073 19786 : fail_point!("after-timeline-compacted-first-L1");
1074 2044252 : }
1075 :
1076 2064038 : prev_key = Some(key);
1077 : }
1078 28 : if let Some(writer) = writer {
1079 28 : new_layers.push(
1080 28 : writer
1081 28 : .finish(prev_key.unwrap().next(), self, ctx)
1082 1992 : .await
1083 28 : .map_err(CompactionError::Other)?,
1084 : );
1085 0 : }
1086 :
1087 : // Sync layers
1088 28 : if !new_layers.is_empty() {
1089 : // Print a warning if the created layer is larger than double the target size
1090 : // Add two pages for potential overhead. This should in theory be already
1091 : // accounted for in the target calculation, but for very small targets,
1092 : // we still might easily hit the limit otherwise.
1093 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1094 308 : for layer in new_layers.iter() {
1095 308 : if layer.layer_desc().file_size > warn_limit {
1096 0 : warn!(
1097 : %layer,
1098 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1099 : );
1100 308 : }
1101 : }
1102 :
1103 : // The writer.finish() above already did the fsync of the inodes.
1104 : // We just need to fsync the directory in which these inodes are linked,
1105 : // which we know to be the timeline directory.
1106 : //
1107 : // We use fatal_err() below because the after writer.finish() returns with success,
1108 : // the in-memory state of the filesystem already has the layer file in its final place,
1109 : // and subsequent pageserver code could think it's durable while it really isn't.
1110 28 : let timeline_dir = VirtualFile::open(
1111 28 : &self
1112 28 : .conf
1113 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1114 28 : ctx,
1115 28 : )
1116 14 : .await
1117 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1118 28 : timeline_dir
1119 28 : .sync_all()
1120 14 : .await
1121 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1122 0 : }
1123 :
1124 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1125 28 : stats.new_deltas_count = Some(new_layers.len());
1126 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1127 28 :
1128 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1129 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1130 : {
1131 28 : Ok(stats_json) => {
1132 28 : info!(
1133 0 : stats_json = stats_json.as_str(),
1134 0 : "compact_level0_phase1 stats available"
1135 : )
1136 : }
1137 0 : Err(e) => {
1138 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1139 : }
1140 : }
1141 :
1142 : // Without this, rustc complains about deltas_to_compact still
1143 : // being borrowed when we `.into_iter()` below.
1144 28 : drop(all_values_iter);
1145 28 :
1146 28 : Ok(CompactLevel0Phase1Result {
1147 28 : new_layers,
1148 28 : deltas_to_compact: deltas_to_compact
1149 28 : .into_iter()
1150 402 : .map(|x| x.drop_eviction_guard())
1151 28 : .collect::<Vec<_>>(),
1152 28 : fully_compacted,
1153 28 : })
1154 364 : }
1155 : }
1156 :
1157 : #[derive(Default)]
1158 : struct CompactLevel0Phase1Result {
1159 : new_layers: Vec<ResidentLayer>,
1160 : deltas_to_compact: Vec<Layer>,
1161 : // Whether we have included all L0 layers, or selected only part of them due to the
1162 : // L0 compaction size limit.
1163 : fully_compacted: bool,
1164 : }
1165 :
1166 : #[derive(Default)]
1167 : struct CompactLevel0Phase1StatsBuilder {
1168 : version: Option<u64>,
1169 : tenant_id: Option<TenantShardId>,
1170 : timeline_id: Option<TimelineId>,
1171 : read_lock_acquisition_micros: DurationRecorder,
1172 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1173 : read_lock_held_key_sort_micros: DurationRecorder,
1174 : read_lock_held_prerequisites_micros: DurationRecorder,
1175 : read_lock_held_compute_holes_micros: DurationRecorder,
1176 : read_lock_drop_micros: DurationRecorder,
1177 : write_layer_files_micros: DurationRecorder,
1178 : level0_deltas_count: Option<usize>,
1179 : new_deltas_count: Option<usize>,
1180 : new_deltas_size: Option<u64>,
1181 : }
1182 :
1183 : #[derive(serde::Serialize)]
1184 : struct CompactLevel0Phase1Stats {
1185 : version: u64,
1186 : tenant_id: TenantShardId,
1187 : timeline_id: TimelineId,
1188 : read_lock_acquisition_micros: RecordedDuration,
1189 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1190 : read_lock_held_key_sort_micros: RecordedDuration,
1191 : read_lock_held_prerequisites_micros: RecordedDuration,
1192 : read_lock_held_compute_holes_micros: RecordedDuration,
1193 : read_lock_drop_micros: RecordedDuration,
1194 : write_layer_files_micros: RecordedDuration,
1195 : level0_deltas_count: usize,
1196 : new_deltas_count: usize,
1197 : new_deltas_size: u64,
1198 : }
1199 :
1200 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1201 : type Error = anyhow::Error;
1202 :
1203 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1204 28 : Ok(Self {
1205 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1206 28 : tenant_id: value
1207 28 : .tenant_id
1208 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1209 28 : timeline_id: value
1210 28 : .timeline_id
1211 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1212 28 : read_lock_acquisition_micros: value
1213 28 : .read_lock_acquisition_micros
1214 28 : .into_recorded()
1215 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1216 28 : read_lock_held_spawn_blocking_startup_micros: value
1217 28 : .read_lock_held_spawn_blocking_startup_micros
1218 28 : .into_recorded()
1219 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1220 28 : read_lock_held_key_sort_micros: value
1221 28 : .read_lock_held_key_sort_micros
1222 28 : .into_recorded()
1223 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1224 28 : read_lock_held_prerequisites_micros: value
1225 28 : .read_lock_held_prerequisites_micros
1226 28 : .into_recorded()
1227 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1228 28 : read_lock_held_compute_holes_micros: value
1229 28 : .read_lock_held_compute_holes_micros
1230 28 : .into_recorded()
1231 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1232 28 : read_lock_drop_micros: value
1233 28 : .read_lock_drop_micros
1234 28 : .into_recorded()
1235 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1236 28 : write_layer_files_micros: value
1237 28 : .write_layer_files_micros
1238 28 : .into_recorded()
1239 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1240 28 : level0_deltas_count: value
1241 28 : .level0_deltas_count
1242 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1243 28 : new_deltas_count: value
1244 28 : .new_deltas_count
1245 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1246 28 : new_deltas_size: value
1247 28 : .new_deltas_size
1248 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1249 : })
1250 28 : }
1251 : }
1252 :
1253 0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
1254 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
1255 : pub enum CompactL0Phase1ValueAccess {
1256 : /// The old way.
1257 : PageCachedBlobIo,
1258 : /// The new way.
1259 : StreamingKmerge {
1260 : /// If set, we run both the old way and the new way, validate that
1261 : /// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
1262 : /// and if the validation fails,
1263 : /// - in tests: fail them with a panic or
1264 : /// - in prod, log a rate-limited warning and use the old way's results.
1265 : ///
1266 : /// If not set, we only run the new way and trust its results.
1267 : validate: Option<CompactL0BypassPageCacheValidation>,
1268 : },
1269 : }
1270 :
1271 : /// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
1272 0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
1273 : #[serde(rename_all = "kebab-case")]
1274 : pub enum CompactL0BypassPageCacheValidation {
1275 : /// Validate that the series of (key, lsn) pairs are the same.
1276 : KeyLsn,
1277 : /// Validate that the entire output of old and new way is identical.
1278 : KeyLsnValue,
1279 : }
1280 :
1281 : impl Default for CompactL0Phase1ValueAccess {
1282 202 : fn default() -> Self {
1283 202 : CompactL0Phase1ValueAccess::StreamingKmerge {
1284 202 : // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
1285 202 : validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
1286 202 : }
1287 202 : }
1288 : }
1289 :
1290 : impl Timeline {
1291 : /// Entry point for new tiered compaction algorithm.
1292 : ///
1293 : /// All the real work is in the implementation in the pageserver_compaction
1294 : /// crate. The code here would apply to any algorithm implemented by the
1295 : /// same interface, but tiered is the only one at the moment.
1296 : ///
1297 : /// TODO: cancellation
1298 0 : pub(crate) async fn compact_tiered(
1299 0 : self: &Arc<Self>,
1300 0 : _cancel: &CancellationToken,
1301 0 : ctx: &RequestContext,
1302 0 : ) -> Result<(), CompactionError> {
1303 0 : let fanout = self.get_compaction_threshold() as u64;
1304 0 : let target_file_size = self.get_checkpoint_distance();
1305 :
1306 : // Find the top of the historical layers
1307 0 : let end_lsn = {
1308 0 : let guard = self.layers.read().await;
1309 0 : let layers = guard.layer_map();
1310 0 :
1311 0 : let l0_deltas = layers.get_level0_deltas();
1312 0 : drop(guard);
1313 0 :
1314 0 : // As an optimization, if we find that there are too few L0 layers,
1315 0 : // bail out early. We know that the compaction algorithm would do
1316 0 : // nothing in that case.
1317 0 : if l0_deltas.len() < fanout as usize {
1318 : // doesn't need compacting
1319 0 : return Ok(());
1320 0 : }
1321 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1322 0 : };
1323 0 :
1324 0 : // Is the timeline being deleted?
1325 0 : if self.is_stopping() {
1326 0 : trace!("Dropping out of compaction on timeline shutdown");
1327 0 : return Err(CompactionError::ShuttingDown);
1328 0 : }
1329 :
1330 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1331 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1332 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1333 0 :
1334 0 : pageserver_compaction::compact_tiered::compact_tiered(
1335 0 : &mut adaptor,
1336 0 : end_lsn,
1337 0 : target_file_size,
1338 0 : fanout,
1339 0 : ctx,
1340 0 : )
1341 0 : .await
1342 : // TODO: compact_tiered needs to return CompactionError
1343 0 : .map_err(CompactionError::Other)?;
1344 :
1345 0 : adaptor.flush_updates().await?;
1346 0 : Ok(())
1347 0 : }
1348 :
1349 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1350 : ///
1351 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1352 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1353 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1354 : ///
1355 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1356 : ///
1357 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1358 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1359 : ///
1360 : /// The function will produce:
1361 : ///
1362 : /// ```plain
1363 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1364 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1365 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1366 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1367 : /// ```
1368 : ///
1369 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1370 310 : pub(crate) async fn generate_key_retention(
1371 310 : self: &Arc<Timeline>,
1372 310 : key: Key,
1373 310 : full_history: &[(Key, Lsn, Value)],
1374 310 : horizon: Lsn,
1375 310 : retain_lsn_below_horizon: &[Lsn],
1376 310 : delta_threshold_cnt: usize,
1377 310 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1378 310 : ) -> anyhow::Result<KeyHistoryRetention> {
1379 310 : // Pre-checks for the invariants
1380 310 : if cfg!(debug_assertions) {
1381 750 : for (log_key, _, _) in full_history {
1382 440 : assert_eq!(log_key, &key, "mismatched key");
1383 : }
1384 310 : for i in 1..full_history.len() {
1385 130 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1386 130 : if full_history[i - 1].1 == full_history[i].1 {
1387 0 : assert!(
1388 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1389 0 : "unordered delta/image, or duplicated delta"
1390 : );
1391 130 : }
1392 : }
1393 : // There was an assertion for no base image that checks if the first
1394 : // record in the history is `will_init` before, but it was removed.
1395 : // This is explained in the test cases for generate_key_retention.
1396 : // Search "incomplete history" for more information.
1397 624 : for lsn in retain_lsn_below_horizon {
1398 314 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1399 : }
1400 310 : for i in 1..retain_lsn_below_horizon.len() {
1401 150 : assert!(
1402 150 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1403 0 : "unordered LSN"
1404 : );
1405 : }
1406 0 : }
1407 310 : let has_ancestor = base_img_from_ancestor.is_some();
1408 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1409 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1410 310 : let (mut split_history, lsn_split_points) = {
1411 310 : let mut split_history = Vec::new();
1412 310 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1413 310 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1414 624 : for lsn in retain_lsn_below_horizon {
1415 314 : lsn_split_points.push(*lsn);
1416 314 : }
1417 310 : lsn_split_points.push(horizon);
1418 310 : let mut current_idx = 0;
1419 750 : for item @ (_, lsn, _) in full_history {
1420 532 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1421 92 : current_idx += 1;
1422 92 : }
1423 440 : split_history[current_idx].push(item);
1424 : }
1425 310 : (split_history, lsn_split_points)
1426 : };
1427 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1428 1244 : for split_for_lsn in &mut split_history {
1429 934 : let mut prev_lsn = None;
1430 934 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1431 934 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1432 440 : if let Some(prev_lsn) = &prev_lsn {
1433 58 : if *prev_lsn == lsn {
1434 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1435 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1436 : // drop this delta and keep the image.
1437 : //
1438 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1439 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1440 : // dropped.
1441 : //
1442 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1443 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1444 0 : continue;
1445 58 : }
1446 382 : }
1447 440 : prev_lsn = Some(lsn);
1448 440 : new_split_for_lsn.push(record);
1449 : }
1450 934 : *split_for_lsn = new_split_for_lsn;
1451 : }
1452 : // Step 3: generate images when necessary
1453 310 : let mut retention = Vec::with_capacity(split_history.len());
1454 310 : let mut records_since_last_image = 0;
1455 310 : let batch_cnt = split_history.len();
1456 310 : assert!(
1457 310 : batch_cnt >= 2,
1458 0 : "should have at least below + above horizon batches"
1459 : );
1460 310 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1461 310 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1462 18 : replay_history.push((key, lsn, Value::Image(img)));
1463 292 : }
1464 :
1465 : /// Generate debug information for the replay history
1466 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1467 0 : use std::fmt::Write;
1468 0 : let mut output = String::new();
1469 0 : if let Some((key, _, _)) = replay_history.first() {
1470 0 : write!(output, "key={} ", key).unwrap();
1471 0 : let mut cnt = 0;
1472 0 : for (_, lsn, val) in replay_history {
1473 0 : if val.is_image() {
1474 0 : write!(output, "i@{} ", lsn).unwrap();
1475 0 : } else if val.will_init() {
1476 0 : write!(output, "di@{} ", lsn).unwrap();
1477 0 : } else {
1478 0 : write!(output, "d@{} ", lsn).unwrap();
1479 0 : }
1480 0 : cnt += 1;
1481 0 : if cnt >= 128 {
1482 0 : write!(output, "... and more").unwrap();
1483 0 : break;
1484 0 : }
1485 : }
1486 0 : } else {
1487 0 : write!(output, "<no history>").unwrap();
1488 0 : }
1489 0 : output
1490 0 : }
1491 :
1492 0 : fn generate_debug_trace(
1493 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1494 0 : full_history: &[(Key, Lsn, Value)],
1495 0 : lsns: &[Lsn],
1496 0 : horizon: Lsn,
1497 0 : ) -> String {
1498 0 : use std::fmt::Write;
1499 0 : let mut output = String::new();
1500 0 : if let Some(replay_history) = replay_history {
1501 0 : writeln!(
1502 0 : output,
1503 0 : "replay_history: {}",
1504 0 : generate_history_trace(replay_history)
1505 0 : )
1506 0 : .unwrap();
1507 0 : } else {
1508 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1509 0 : }
1510 0 : writeln!(
1511 0 : output,
1512 0 : "full_history: {}",
1513 0 : generate_history_trace(full_history)
1514 0 : )
1515 0 : .unwrap();
1516 0 : writeln!(
1517 0 : output,
1518 0 : "when processing: [{}] horizon={}",
1519 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1520 0 : horizon
1521 0 : )
1522 0 : .unwrap();
1523 0 : output
1524 0 : }
1525 :
1526 934 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1527 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1528 934 : records_since_last_image += split_for_lsn.len();
1529 934 : let generate_image = if i == 0 && !has_ancestor {
1530 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1531 292 : true
1532 642 : } else if i == batch_cnt - 1 {
1533 : // Do not generate images for the last batch (above horizon)
1534 310 : false
1535 332 : } else if records_since_last_image >= delta_threshold_cnt {
1536 : // Generate images when there are too many records
1537 6 : true
1538 : } else {
1539 326 : false
1540 : };
1541 934 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1542 : // Only retain the items after the last image record
1543 1170 : for idx in (0..replay_history.len()).rev() {
1544 1170 : if replay_history[idx].2.will_init() {
1545 934 : replay_history = replay_history[idx..].to_vec();
1546 934 : break;
1547 236 : }
1548 : }
1549 934 : if let Some((_, _, val)) = replay_history.first() {
1550 934 : if !val.will_init() {
1551 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1552 0 : || {
1553 0 : generate_debug_trace(
1554 0 : Some(&replay_history),
1555 0 : full_history,
1556 0 : retain_lsn_below_horizon,
1557 0 : horizon,
1558 0 : )
1559 0 : },
1560 0 : );
1561 934 : }
1562 0 : }
1563 934 : if generate_image && records_since_last_image > 0 {
1564 298 : records_since_last_image = 0;
1565 298 : let replay_history_for_debug = if cfg!(debug_assertions) {
1566 298 : Some(replay_history.clone())
1567 : } else {
1568 0 : None
1569 : };
1570 298 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1571 298 : let history = std::mem::take(&mut replay_history);
1572 298 : let mut img = None;
1573 298 : let mut records = Vec::with_capacity(history.len());
1574 298 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1575 298 : img = Some((*lsn, val.clone()));
1576 298 : for (_, lsn, val) in history.into_iter().skip(1) {
1577 34 : let Value::WalRecord(rec) = val else {
1578 0 : return Err(anyhow::anyhow!(
1579 0 : "invalid record, first record is image, expect walrecords"
1580 0 : ))
1581 0 : .with_context(|| {
1582 0 : generate_debug_trace(
1583 0 : replay_history_for_debug_ref,
1584 0 : full_history,
1585 0 : retain_lsn_below_horizon,
1586 0 : horizon,
1587 0 : )
1588 0 : });
1589 : };
1590 34 : records.push((lsn, rec));
1591 : }
1592 : } else {
1593 0 : for (_, lsn, val) in history.into_iter() {
1594 0 : let Value::WalRecord(rec) = val else {
1595 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1596 0 : .with_context(|| generate_debug_trace(
1597 0 : replay_history_for_debug_ref,
1598 0 : full_history,
1599 0 : retain_lsn_below_horizon,
1600 0 : horizon,
1601 0 : ));
1602 : };
1603 0 : records.push((lsn, rec));
1604 : }
1605 : }
1606 298 : records.reverse();
1607 298 : let state = ValueReconstructState { img, records };
1608 298 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1609 298 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1610 298 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1611 298 : retention.push(vec![(request_lsn, Value::Image(img))]);
1612 636 : } else {
1613 636 : let deltas = split_for_lsn
1614 636 : .iter()
1615 636 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1616 636 : .collect_vec();
1617 636 : retention.push(deltas);
1618 636 : }
1619 : }
1620 310 : let mut result = Vec::with_capacity(retention.len());
1621 310 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1622 934 : for (idx, logs) in retention.into_iter().enumerate() {
1623 934 : if idx == lsn_split_points.len() {
1624 310 : return Ok(KeyHistoryRetention {
1625 310 : below_horizon: result,
1626 310 : above_horizon: KeyLogAtLsn(logs),
1627 310 : });
1628 624 : } else {
1629 624 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1630 624 : }
1631 : }
1632 0 : unreachable!("key retention is empty")
1633 310 : }
1634 :
1635 : /// An experimental compaction building block that combines compaction with garbage collection.
1636 : ///
1637 : /// The current implementation picks all delta + image layers that are below or intersecting with
1638 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1639 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1640 : /// and create delta layers with all deltas >= gc horizon.
1641 18 : pub(crate) async fn compact_with_gc(
1642 18 : self: &Arc<Self>,
1643 18 : cancel: &CancellationToken,
1644 18 : ctx: &RequestContext,
1645 18 : ) -> anyhow::Result<()> {
1646 18 : use std::collections::BTreeSet;
1647 18 :
1648 18 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1649 18 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1650 18 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1651 18 :
1652 18 : let gc_lock = async {
1653 : tokio::select! {
1654 : guard = self.gc_lock.lock() => Ok(guard),
1655 : // TODO: refactor to CompactionError to correctly pass cancelled error
1656 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1657 : }
1658 18 : };
1659 :
1660 18 : let gc_lock = crate::timed(
1661 18 : gc_lock,
1662 18 : "acquires gc lock",
1663 18 : std::time::Duration::from_secs(5),
1664 18 : )
1665 0 : .await?;
1666 :
1667 18 : info!("running enhanced gc bottom-most compaction");
1668 :
1669 : scopeguard::defer! {
1670 : info!("done enhanced gc bottom-most compaction");
1671 : };
1672 :
1673 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1674 : // The layer selection has the following properties:
1675 : // 1. If a layer is in the selection, all layers below it are in the selection.
1676 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1677 18 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1678 18 : let guard = self.layers.read().await;
1679 18 : let layers = guard.layer_map();
1680 18 : let gc_info = self.gc_info.read().unwrap();
1681 18 : let mut retain_lsns_below_horizon = Vec::new();
1682 18 : let gc_cutoff = gc_info.cutoffs.select_min();
1683 18 : for (lsn, _timeline_id) in &gc_info.retain_lsns {
1684 18 : if lsn < &gc_cutoff {
1685 18 : retain_lsns_below_horizon.push(*lsn);
1686 18 : }
1687 : }
1688 18 : for lsn in gc_info.leases.keys() {
1689 0 : if lsn < &gc_cutoff {
1690 0 : retain_lsns_below_horizon.push(*lsn);
1691 0 : }
1692 : }
1693 18 : let mut selected_layers = Vec::new();
1694 18 : drop(gc_info);
1695 88 : for desc in layers.iter_historic_layers() {
1696 88 : if desc.get_lsn_range().start <= gc_cutoff {
1697 72 : selected_layers.push(guard.get_from_desc(&desc));
1698 72 : }
1699 : }
1700 18 : retain_lsns_below_horizon.sort();
1701 18 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1702 : };
1703 18 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1704 2 : Lsn(self.ancestor_lsn.0 + 1)
1705 : } else {
1706 16 : let res = retain_lsns_below_horizon
1707 16 : .first()
1708 16 : .copied()
1709 16 : .unwrap_or(gc_cutoff);
1710 16 : if cfg!(debug_assertions) {
1711 16 : assert_eq!(
1712 16 : res,
1713 16 : retain_lsns_below_horizon
1714 16 : .iter()
1715 16 : .min()
1716 16 : .copied()
1717 16 : .unwrap_or(gc_cutoff)
1718 16 : );
1719 0 : }
1720 16 : res
1721 : };
1722 18 : info!(
1723 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1724 0 : layer_selection.len(),
1725 : gc_cutoff,
1726 : lowest_retain_lsn
1727 : );
1728 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1729 : // Also, collect the layer information to decide when to split the new delta layers.
1730 18 : let mut downloaded_layers = Vec::new();
1731 18 : let mut delta_split_points = BTreeSet::new();
1732 90 : for layer in &layer_selection {
1733 72 : let resident_layer = layer.download_and_keep_resident().await?;
1734 72 : downloaded_layers.push(resident_layer);
1735 72 :
1736 72 : let desc = layer.layer_desc();
1737 72 : if desc.is_delta() {
1738 50 : // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
1739 50 : // so that we can avoid having too many small delta layers.
1740 50 : let key_range = desc.get_key_range();
1741 50 : delta_split_points.insert(key_range.start);
1742 50 : delta_split_points.insert(key_range.end);
1743 50 : }
1744 : }
1745 18 : let mut delta_layers = Vec::new();
1746 18 : let mut image_layers = Vec::new();
1747 90 : for resident_layer in &downloaded_layers {
1748 72 : if resident_layer.layer_desc().is_delta() {
1749 50 : let layer = resident_layer.get_as_delta(ctx).await?;
1750 50 : delta_layers.push(layer);
1751 22 : } else {
1752 22 : let layer = resident_layer.get_as_image(ctx).await?;
1753 22 : image_layers.push(layer);
1754 : }
1755 : }
1756 18 : let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
1757 18 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1758 18 : // Data of the same key.
1759 18 : let mut accumulated_values = Vec::new();
1760 18 : let mut last_key: Option<Key> = None;
1761 18 :
1762 18 : enum FlushDeltaResult {
1763 18 : /// Create a new resident layer
1764 18 : CreateResidentLayer(ResidentLayer),
1765 18 : /// Keep an original delta layer
1766 18 : KeepLayer(PersistentLayerKey),
1767 18 : }
1768 18 :
1769 18 : #[allow(clippy::too_many_arguments)]
1770 302 : async fn flush_deltas(
1771 302 : deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
1772 302 : last_key: Key,
1773 302 : delta_split_points: &[Key],
1774 302 : current_delta_split_point: &mut usize,
1775 302 : tline: &Arc<Timeline>,
1776 302 : lowest_retain_lsn: Lsn,
1777 302 : ctx: &RequestContext,
1778 302 : last_batch: bool,
1779 302 : ) -> anyhow::Result<Option<FlushDeltaResult>> {
1780 302 : // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
1781 302 : // overlapping layers.
1782 302 : //
1783 302 : // If we have a structure like this:
1784 302 : //
1785 302 : // | Delta 1 | | Delta 4 |
1786 302 : // |---------| Delta 2 |---------|
1787 302 : // | Delta 3 | | Delta 5 |
1788 302 : //
1789 302 : // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
1790 302 : // A simple solution here is to split the delta layers using the original boundary, while this
1791 302 : // might produce a lot of small layers. This should be improved and fixed in the future.
1792 302 : let mut need_split = false;
1793 384 : while *current_delta_split_point < delta_split_points.len()
1794 324 : && last_key >= delta_split_points[*current_delta_split_point]
1795 82 : {
1796 82 : *current_delta_split_point += 1;
1797 82 : need_split = true;
1798 82 : }
1799 302 : if !need_split && !last_batch {
1800 206 : return Ok(None);
1801 96 : }
1802 96 : let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
1803 96 : if deltas.is_empty() {
1804 46 : return Ok(None);
1805 50 : }
1806 78 : let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
1807 50 : let delta_key = PersistentLayerKey {
1808 50 : key_range: {
1809 50 : let key_start = deltas.first().unwrap().0;
1810 50 : let key_end = deltas.last().unwrap().0.next();
1811 50 : key_start..key_end
1812 50 : },
1813 50 : lsn_range: lowest_retain_lsn..end_lsn,
1814 50 : is_delta: true,
1815 50 : };
1816 18 : {
1817 18 : // Hack: skip delta layer if we need to produce a layer of a same key-lsn.
1818 18 : //
1819 18 : // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
1820 18 : // For example, consider the case where a single delta with range [0x10,0x50) exists.
1821 18 : // And we have branches at LSN 0x10, 0x20, 0x30.
1822 18 : // Then we delete branch @ 0x20.
1823 18 : // Bottom-most compaction may now delete the delta [0x20,0x30).
1824 18 : // And that wouldnt' change the shape of the layer.
1825 18 : //
1826 18 : // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
1827 18 : // That's why it's safe to skip.
1828 50 : let guard = tline.layers.read().await;
1829 18 :
1830 50 : if guard.contains_key(&delta_key) {
1831 26 : let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
1832 26 : drop(guard);
1833 26 : if layer_generation == tline.generation {
1834 18 : // TODO: depending on whether we design this compaction process to run along with
1835 18 : // other compactions, there could be layer map modifications after we drop the
1836 18 : // layer guard, and in case it creates duplicated layer key, we will still error
1837 18 : // in the end.
1838 26 : info!(
1839 18 : key=%delta_key,
1840 18 : ?layer_generation,
1841 18 : "discard delta layer due to duplicated layer in the same generation"
1842 18 : );
1843 26 : return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
1844 18 : }
1845 24 : }
1846 18 : }
1847 18 :
1848 24 : let mut delta_layer_writer = DeltaLayerWriter::new(
1849 24 : tline.conf,
1850 24 : tline.timeline_id,
1851 24 : tline.tenant_shard_id,
1852 24 : delta_key.key_range.start,
1853 24 : lowest_retain_lsn..end_lsn,
1854 24 : ctx,
1855 24 : )
1856 18 : .await?;
1857 68 : for (key, lsn, val) in deltas {
1858 44 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1859 18 : }
1860 24 : let delta_layer = delta_layer_writer
1861 24 : .finish(delta_key.key_range.end, tline, ctx)
1862 62 : .await?;
1863 24 : Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
1864 302 : }
1865 18 :
1866 18 : // Hack the key range to be min..(max-1). Otherwise, the image layer will be
1867 18 : // interpreted as an L0 delta layer.
1868 18 : let hack_image_layer_range = {
1869 18 : let mut end_key = Key::MAX;
1870 18 : end_key.field6 -= 1;
1871 18 : Key::MIN..end_key
1872 : };
1873 :
1874 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
1875 : // when some condition meet.
1876 18 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
1877 : Some(
1878 16 : ImageLayerWriter::new(
1879 16 : self.conf,
1880 16 : self.timeline_id,
1881 16 : self.tenant_shard_id,
1882 16 : &hack_image_layer_range, // covers the full key range
1883 16 : lowest_retain_lsn,
1884 16 : ctx,
1885 16 : )
1886 8 : .await?,
1887 : )
1888 : } else {
1889 2 : None
1890 : };
1891 :
1892 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
1893 : ///
1894 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
1895 : /// is needed for reconstruction. This should be fixed in the future.
1896 : ///
1897 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
1898 : /// images.
1899 302 : async fn get_ancestor_image(
1900 302 : tline: &Arc<Timeline>,
1901 302 : key: Key,
1902 302 : ctx: &RequestContext,
1903 302 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
1904 302 : if tline.ancestor_timeline.is_none() {
1905 288 : return Ok(None);
1906 14 : };
1907 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
1908 : // as much existing code as possible.
1909 14 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
1910 14 : Ok(Some((key, tline.ancestor_lsn, img)))
1911 302 : }
1912 18 : let image_layer_key = PersistentLayerKey {
1913 18 : key_range: hack_image_layer_range,
1914 18 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
1915 18 : is_delta: false,
1916 18 : };
1917 :
1918 : // Like with delta layers, it can happen that we re-produce an already existing image layer.
1919 : // This could happen when a user triggers force compaction and image generation. In this case,
1920 : // it's always safe to rewrite the layer.
1921 18 : let discard_image_layer = {
1922 18 : let guard = self.layers.read().await;
1923 18 : if guard.contains_key(&image_layer_key) {
1924 6 : let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
1925 6 : drop(guard);
1926 6 : if layer_generation == self.generation {
1927 : // TODO: depending on whether we design this compaction process to run along with
1928 : // other compactions, there could be layer map modifications after we drop the
1929 : // layer guard, and in case it creates duplicated layer key, we will still error
1930 : // in the end.
1931 6 : info!(
1932 : key=%image_layer_key,
1933 : ?layer_generation,
1934 0 : "discard image layer due to duplicated layer key in the same generation",
1935 : );
1936 6 : true
1937 : } else {
1938 0 : false
1939 : }
1940 : } else {
1941 12 : false
1942 : }
1943 : };
1944 :
1945 : // Actually, we can decide not to write to the image layer at all at this point because
1946 : // the key and LSN range are determined. However, to keep things simple here, we still
1947 : // create this writer, and discard the writer in the end.
1948 :
1949 18 : let mut delta_values = Vec::new();
1950 18 : let delta_split_points = delta_split_points.into_iter().collect_vec();
1951 18 : let mut current_delta_split_point = 0;
1952 18 : let mut delta_layers = Vec::new();
1953 408 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1954 390 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1955 106 : if last_key.is_none() {
1956 18 : last_key = Some(key);
1957 88 : }
1958 106 : accumulated_values.push((key, lsn, val));
1959 : } else {
1960 284 : let last_key = last_key.as_mut().unwrap();
1961 284 : let retention = self
1962 284 : .generate_key_retention(
1963 284 : *last_key,
1964 284 : &accumulated_values,
1965 284 : gc_cutoff,
1966 284 : &retain_lsns_below_horizon,
1967 284 : COMPACTION_DELTA_THRESHOLD,
1968 284 : get_ancestor_image(self, *last_key, ctx).await?,
1969 : )
1970 0 : .await?;
1971 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1972 284 : retention
1973 284 : .pipe_to(
1974 284 : *last_key,
1975 284 : &mut delta_values,
1976 284 : image_layer_writer.as_mut(),
1977 284 : ctx,
1978 284 : )
1979 275 : .await?;
1980 284 : delta_layers.extend(
1981 284 : flush_deltas(
1982 284 : &mut delta_values,
1983 284 : *last_key,
1984 284 : &delta_split_points,
1985 284 : &mut current_delta_split_point,
1986 284 : self,
1987 284 : lowest_retain_lsn,
1988 284 : ctx,
1989 284 : false,
1990 284 : )
1991 68 : .await?,
1992 : );
1993 284 : accumulated_values.clear();
1994 284 : *last_key = key;
1995 284 : accumulated_values.push((key, lsn, val));
1996 : }
1997 : }
1998 :
1999 18 : let last_key = last_key.expect("no keys produced during compaction");
2000 : // TODO: move this part to the loop body
2001 18 : let retention = self
2002 18 : .generate_key_retention(
2003 18 : last_key,
2004 18 : &accumulated_values,
2005 18 : gc_cutoff,
2006 18 : &retain_lsns_below_horizon,
2007 18 : COMPACTION_DELTA_THRESHOLD,
2008 18 : get_ancestor_image(self, last_key, ctx).await?,
2009 : )
2010 0 : .await?;
2011 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
2012 18 : retention
2013 18 : .pipe_to(
2014 18 : last_key,
2015 18 : &mut delta_values,
2016 18 : image_layer_writer.as_mut(),
2017 18 : ctx,
2018 18 : )
2019 16 : .await?;
2020 18 : delta_layers.extend(
2021 18 : flush_deltas(
2022 18 : &mut delta_values,
2023 18 : last_key,
2024 18 : &delta_split_points,
2025 18 : &mut current_delta_split_point,
2026 18 : self,
2027 18 : lowest_retain_lsn,
2028 18 : ctx,
2029 18 : true,
2030 18 : )
2031 7 : .await?,
2032 : );
2033 18 : assert!(delta_values.is_empty(), "unprocessed keys");
2034 :
2035 18 : let image_layer = if discard_image_layer {
2036 6 : None
2037 12 : } else if let Some(writer) = image_layer_writer {
2038 20 : Some(writer.finish(self, ctx).await?)
2039 : } else {
2040 2 : None
2041 : };
2042 18 : info!(
2043 0 : "produced {} delta layers and {} image layers",
2044 0 : delta_layers.len(),
2045 0 : if image_layer.is_some() { 1 } else { 0 }
2046 : );
2047 18 : let mut compact_to = Vec::new();
2048 18 : let mut keep_layers = HashSet::new();
2049 68 : for action in delta_layers {
2050 50 : match action {
2051 24 : FlushDeltaResult::CreateResidentLayer(layer) => {
2052 24 : compact_to.push(layer);
2053 24 : }
2054 26 : FlushDeltaResult::KeepLayer(l) => {
2055 26 : keep_layers.insert(l);
2056 26 : }
2057 : }
2058 : }
2059 18 : if discard_image_layer {
2060 6 : keep_layers.insert(image_layer_key);
2061 12 : }
2062 18 : let mut layer_selection = layer_selection;
2063 72 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2064 18 : compact_to.extend(image_layer);
2065 : // Step 3: Place back to the layer map.
2066 : {
2067 18 : let mut guard = self.layers.write().await;
2068 18 : guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2069 18 : };
2070 18 : self.remote_client
2071 18 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2072 :
2073 18 : drop(gc_lock);
2074 18 :
2075 18 : Ok(())
2076 18 : }
2077 : }
2078 :
2079 : struct TimelineAdaptor {
2080 : timeline: Arc<Timeline>,
2081 :
2082 : keyspace: (Lsn, KeySpace),
2083 :
2084 : new_deltas: Vec<ResidentLayer>,
2085 : new_images: Vec<ResidentLayer>,
2086 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2087 : }
2088 :
2089 : impl TimelineAdaptor {
2090 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2091 0 : Self {
2092 0 : timeline: timeline.clone(),
2093 0 : keyspace,
2094 0 : new_images: Vec::new(),
2095 0 : new_deltas: Vec::new(),
2096 0 : layers_to_delete: Vec::new(),
2097 0 : }
2098 0 : }
2099 :
2100 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2101 0 : let layers_to_delete = {
2102 0 : let guard = self.timeline.layers.read().await;
2103 0 : self.layers_to_delete
2104 0 : .iter()
2105 0 : .map(|x| guard.get_from_desc(x))
2106 0 : .collect::<Vec<Layer>>()
2107 0 : };
2108 0 : self.timeline
2109 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2110 0 : .await?;
2111 :
2112 0 : self.timeline
2113 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2114 :
2115 0 : self.new_deltas.clear();
2116 0 : self.layers_to_delete.clear();
2117 0 : Ok(())
2118 0 : }
2119 : }
2120 :
2121 : #[derive(Clone)]
2122 : struct ResidentDeltaLayer(ResidentLayer);
2123 : #[derive(Clone)]
2124 : struct ResidentImageLayer(ResidentLayer);
2125 :
2126 : impl CompactionJobExecutor for TimelineAdaptor {
2127 : type Key = crate::repository::Key;
2128 :
2129 : type Layer = OwnArc<PersistentLayerDesc>;
2130 : type DeltaLayer = ResidentDeltaLayer;
2131 : type ImageLayer = ResidentImageLayer;
2132 :
2133 : type RequestContext = crate::context::RequestContext;
2134 :
2135 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2136 0 : self.timeline.get_shard_identity()
2137 0 : }
2138 :
2139 0 : async fn get_layers(
2140 0 : &mut self,
2141 0 : key_range: &Range<Key>,
2142 0 : lsn_range: &Range<Lsn>,
2143 0 : _ctx: &RequestContext,
2144 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2145 0 : self.flush_updates().await?;
2146 :
2147 0 : let guard = self.timeline.layers.read().await;
2148 0 : let layer_map = guard.layer_map();
2149 0 :
2150 0 : let result = layer_map
2151 0 : .iter_historic_layers()
2152 0 : .filter(|l| {
2153 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2154 0 : })
2155 0 : .map(OwnArc)
2156 0 : .collect();
2157 0 : Ok(result)
2158 0 : }
2159 :
2160 0 : async fn get_keyspace(
2161 0 : &mut self,
2162 0 : key_range: &Range<Key>,
2163 0 : lsn: Lsn,
2164 0 : _ctx: &RequestContext,
2165 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2166 0 : if lsn == self.keyspace.0 {
2167 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2168 0 : &self.keyspace.1.ranges,
2169 0 : key_range,
2170 0 : ))
2171 : } else {
2172 : // The current compaction implementatin only ever requests the key space
2173 : // at the compaction end LSN.
2174 0 : anyhow::bail!("keyspace not available for requested lsn");
2175 : }
2176 0 : }
2177 :
2178 0 : async fn downcast_delta_layer(
2179 0 : &self,
2180 0 : layer: &OwnArc<PersistentLayerDesc>,
2181 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2182 0 : // this is a lot more complex than a simple downcast...
2183 0 : if layer.is_delta() {
2184 0 : let l = {
2185 0 : let guard = self.timeline.layers.read().await;
2186 0 : guard.get_from_desc(layer)
2187 : };
2188 0 : let result = l.download_and_keep_resident().await?;
2189 :
2190 0 : Ok(Some(ResidentDeltaLayer(result)))
2191 : } else {
2192 0 : Ok(None)
2193 : }
2194 0 : }
2195 :
2196 0 : async fn create_image(
2197 0 : &mut self,
2198 0 : lsn: Lsn,
2199 0 : key_range: &Range<Key>,
2200 0 : ctx: &RequestContext,
2201 0 : ) -> anyhow::Result<()> {
2202 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2203 0 : }
2204 :
2205 0 : async fn create_delta(
2206 0 : &mut self,
2207 0 : lsn_range: &Range<Lsn>,
2208 0 : key_range: &Range<Key>,
2209 0 : input_layers: &[ResidentDeltaLayer],
2210 0 : ctx: &RequestContext,
2211 0 : ) -> anyhow::Result<()> {
2212 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2213 :
2214 0 : let mut all_entries = Vec::new();
2215 0 : for dl in input_layers.iter() {
2216 0 : all_entries.extend(dl.load_keys(ctx).await?);
2217 : }
2218 :
2219 : // The current stdlib sorting implementation is designed in a way where it is
2220 : // particularly fast where the slice is made up of sorted sub-ranges.
2221 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2222 :
2223 0 : let mut writer = DeltaLayerWriter::new(
2224 0 : self.timeline.conf,
2225 0 : self.timeline.timeline_id,
2226 0 : self.timeline.tenant_shard_id,
2227 0 : key_range.start,
2228 0 : lsn_range.clone(),
2229 0 : ctx,
2230 0 : )
2231 0 : .await?;
2232 :
2233 0 : let mut dup_values = 0;
2234 0 :
2235 0 : // This iterator walks through all key-value pairs from all the layers
2236 0 : // we're compacting, in key, LSN order.
2237 0 : let mut prev: Option<(Key, Lsn)> = None;
2238 : for &DeltaEntry {
2239 0 : key, lsn, ref val, ..
2240 0 : } in all_entries.iter()
2241 : {
2242 0 : if prev == Some((key, lsn)) {
2243 : // This is a duplicate. Skip it.
2244 : //
2245 : // It can happen if compaction is interrupted after writing some
2246 : // layers but not all, and we are compacting the range again.
2247 : // The calculations in the algorithm assume that there are no
2248 : // duplicates, so the math on targeted file size is likely off,
2249 : // and we will create smaller files than expected.
2250 0 : dup_values += 1;
2251 0 : continue;
2252 0 : }
2253 :
2254 0 : let value = val.load(ctx).await?;
2255 :
2256 0 : writer.put_value(key, lsn, value, ctx).await?;
2257 :
2258 0 : prev = Some((key, lsn));
2259 : }
2260 :
2261 0 : if dup_values > 0 {
2262 0 : warn!("delta layer created with {} duplicate values", dup_values);
2263 0 : }
2264 :
2265 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2266 0 : Err(anyhow::anyhow!(
2267 0 : "failpoint delta-layer-writer-fail-before-finish"
2268 0 : ))
2269 0 : });
2270 :
2271 0 : let new_delta_layer = writer
2272 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
2273 0 : .await?;
2274 :
2275 0 : self.new_deltas.push(new_delta_layer);
2276 0 : Ok(())
2277 0 : }
2278 :
2279 0 : async fn delete_layer(
2280 0 : &mut self,
2281 0 : layer: &OwnArc<PersistentLayerDesc>,
2282 0 : _ctx: &RequestContext,
2283 0 : ) -> anyhow::Result<()> {
2284 0 : self.layers_to_delete.push(layer.clone().0);
2285 0 : Ok(())
2286 0 : }
2287 : }
2288 :
2289 : impl TimelineAdaptor {
2290 0 : async fn create_image_impl(
2291 0 : &mut self,
2292 0 : lsn: Lsn,
2293 0 : key_range: &Range<Key>,
2294 0 : ctx: &RequestContext,
2295 0 : ) -> Result<(), CreateImageLayersError> {
2296 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2297 :
2298 0 : let image_layer_writer = ImageLayerWriter::new(
2299 0 : self.timeline.conf,
2300 0 : self.timeline.timeline_id,
2301 0 : self.timeline.tenant_shard_id,
2302 0 : key_range,
2303 0 : lsn,
2304 0 : ctx,
2305 0 : )
2306 0 : .await?;
2307 :
2308 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2309 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2310 0 : "failpoint image-layer-writer-fail-before-finish"
2311 0 : )))
2312 0 : });
2313 :
2314 0 : let keyspace = KeySpace {
2315 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2316 : };
2317 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2318 0 : let start = Key::MIN;
2319 : let ImageLayerCreationOutcome {
2320 0 : image,
2321 : next_start_key: _,
2322 0 : } = self
2323 0 : .timeline
2324 0 : .create_image_layer_for_rel_blocks(
2325 0 : &keyspace,
2326 0 : image_layer_writer,
2327 0 : lsn,
2328 0 : ctx,
2329 0 : key_range.clone(),
2330 0 : start,
2331 0 : )
2332 0 : .await?;
2333 :
2334 0 : if let Some(image_layer) = image {
2335 0 : self.new_images.push(image_layer);
2336 0 : }
2337 :
2338 0 : timer.stop_and_record();
2339 0 :
2340 0 : Ok(())
2341 0 : }
2342 : }
2343 :
2344 : impl CompactionRequestContext for crate::context::RequestContext {}
2345 :
2346 : #[derive(Debug, Clone)]
2347 : pub struct OwnArc<T>(pub Arc<T>);
2348 :
2349 : impl<T> Deref for OwnArc<T> {
2350 : type Target = <Arc<T> as Deref>::Target;
2351 0 : fn deref(&self) -> &Self::Target {
2352 0 : &self.0
2353 0 : }
2354 : }
2355 :
2356 : impl<T> AsRef<T> for OwnArc<T> {
2357 0 : fn as_ref(&self) -> &T {
2358 0 : self.0.as_ref()
2359 0 : }
2360 : }
2361 :
2362 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2363 0 : fn key_range(&self) -> &Range<Key> {
2364 0 : &self.key_range
2365 0 : }
2366 0 : fn lsn_range(&self) -> &Range<Lsn> {
2367 0 : &self.lsn_range
2368 0 : }
2369 0 : fn file_size(&self) -> u64 {
2370 0 : self.file_size
2371 0 : }
2372 0 : fn short_id(&self) -> std::string::String {
2373 0 : self.as_ref().short_id().to_string()
2374 0 : }
2375 0 : fn is_delta(&self) -> bool {
2376 0 : self.as_ref().is_delta()
2377 0 : }
2378 : }
2379 :
2380 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2381 0 : fn key_range(&self) -> &Range<Key> {
2382 0 : &self.layer_desc().key_range
2383 0 : }
2384 0 : fn lsn_range(&self) -> &Range<Lsn> {
2385 0 : &self.layer_desc().lsn_range
2386 0 : }
2387 0 : fn file_size(&self) -> u64 {
2388 0 : self.layer_desc().file_size
2389 0 : }
2390 0 : fn short_id(&self) -> std::string::String {
2391 0 : self.layer_desc().short_id().to_string()
2392 0 : }
2393 0 : fn is_delta(&self) -> bool {
2394 0 : true
2395 0 : }
2396 : }
2397 :
2398 : use crate::tenant::timeline::DeltaEntry;
2399 :
2400 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2401 0 : fn key_range(&self) -> &Range<Key> {
2402 0 : &self.0.layer_desc().key_range
2403 0 : }
2404 0 : fn lsn_range(&self) -> &Range<Lsn> {
2405 0 : &self.0.layer_desc().lsn_range
2406 0 : }
2407 0 : fn file_size(&self) -> u64 {
2408 0 : self.0.layer_desc().file_size
2409 0 : }
2410 0 : fn short_id(&self) -> std::string::String {
2411 0 : self.0.layer_desc().short_id().to_string()
2412 0 : }
2413 0 : fn is_delta(&self) -> bool {
2414 0 : true
2415 0 : }
2416 : }
2417 :
2418 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2419 : type DeltaEntry<'a> = DeltaEntry<'a>;
2420 :
2421 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2422 0 : self.0.load_keys(ctx).await
2423 0 : }
2424 : }
2425 :
2426 : impl CompactionLayer<Key> for ResidentImageLayer {
2427 0 : fn key_range(&self) -> &Range<Key> {
2428 0 : &self.0.layer_desc().key_range
2429 0 : }
2430 0 : fn lsn_range(&self) -> &Range<Lsn> {
2431 0 : &self.0.layer_desc().lsn_range
2432 0 : }
2433 0 : fn file_size(&self) -> u64 {
2434 0 : self.0.layer_desc().file_size
2435 0 : }
2436 0 : fn short_id(&self) -> std::string::String {
2437 0 : self.0.layer_desc().short_id().to_string()
2438 0 : }
2439 0 : fn is_delta(&self) -> bool {
2440 0 : false
2441 0 : }
2442 : }
2443 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|