Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, Context};
18 : use enumset::EnumSet;
19 : use fail::fail_point;
20 : use itertools::Itertools;
21 : use pageserver_api::keyspace::ShardedRange;
22 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, info, info_span, trace, warn, Instrument};
25 : use utils::id::TimelineId;
26 :
27 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
28 : use crate::page_cache;
29 : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
30 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
31 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
32 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
33 : use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
34 : use crate::tenant::timeline::{Layer, ResidentLayer};
35 : use crate::tenant::DeltaLayer;
36 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
37 :
38 : use crate::keyspace::KeySpace;
39 : use crate::repository::Key;
40 :
41 : use utils::lsn::Lsn;
42 :
43 : use pageserver_compaction::helpers::overlaps_with;
44 : use pageserver_compaction::interface::*;
45 :
46 : use super::CompactionError;
47 :
48 : impl Timeline {
49 : /// TODO: cancellation
50 364 : pub(crate) async fn compact_legacy(
51 364 : self: &Arc<Self>,
52 364 : cancel: &CancellationToken,
53 364 : flags: EnumSet<CompactFlags>,
54 364 : ctx: &RequestContext,
55 364 : ) -> Result<(), CompactionError> {
56 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
57 0 : return self.compact_with_gc(cancel, ctx).await;
58 364 : }
59 364 :
60 364 : // High level strategy for compaction / image creation:
61 364 : //
62 364 : // 1. First, calculate the desired "partitioning" of the
63 364 : // currently in-use key space. The goal is to partition the
64 364 : // key space into roughly fixed-size chunks, but also take into
65 364 : // account any existing image layers, and try to align the
66 364 : // chunk boundaries with the existing image layers to avoid
67 364 : // too much churn. Also try to align chunk boundaries with
68 364 : // relation boundaries. In principle, we don't know about
69 364 : // relation boundaries here, we just deal with key-value
70 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
71 364 : // map relations into key-value pairs. But in practice we know
72 364 : // that 'field6' is the block number, and the fields 1-5
73 364 : // identify a relation. This is just an optimization,
74 364 : // though.
75 364 : //
76 364 : // 2. Once we know the partitioning, for each partition,
77 364 : // decide if it's time to create a new image layer. The
78 364 : // criteria is: there has been too much "churn" since the last
79 364 : // image layer? The "churn" is fuzzy concept, it's a
80 364 : // combination of too many delta files, or too much WAL in
81 364 : // total in the delta file. Or perhaps: if creating an image
82 364 : // file would allow to delete some older files.
83 364 : //
84 364 : // 3. After that, we compact all level0 delta files if there
85 364 : // are too many of them. While compacting, we also garbage
86 364 : // collect any page versions that are no longer needed because
87 364 : // of the new image layers we created in step 2.
88 364 : //
89 364 : // TODO: This high level strategy hasn't been implemented yet.
90 364 : // Below are functions compact_level0() and create_image_layers()
91 364 : // but they are a bit ad hoc and don't quite work like it's explained
92 364 : // above. Rewrite it.
93 364 :
94 364 : // Is the timeline being deleted?
95 364 : if self.is_stopping() {
96 0 : trace!("Dropping out of compaction on timeline shutdown");
97 0 : return Err(CompactionError::ShuttingDown);
98 364 : }
99 364 :
100 364 : let target_file_size = self.get_checkpoint_distance();
101 :
102 : // Define partitioning schema if needed
103 :
104 : // FIXME: the match should only cover repartitioning, not the next steps
105 364 : let partition_count = match self
106 364 : .repartition(
107 364 : self.get_last_record_lsn(),
108 364 : self.get_compaction_target_size(),
109 364 : flags,
110 364 : ctx,
111 364 : )
112 14135 : .await
113 : {
114 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
115 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
116 364 : let image_ctx = RequestContextBuilder::extend(ctx)
117 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
118 364 : .build();
119 364 :
120 364 : // 2. Compact
121 364 : let timer = self.metrics.compact_time_histo.start_timer();
122 42647 : self.compact_level0(target_file_size, ctx).await?;
123 364 : timer.stop_and_record();
124 364 :
125 364 : // 3. Create new image layers for partitions that have been modified
126 364 : // "enough".
127 364 : let mut partitioning = dense_partitioning;
128 364 : partitioning
129 364 : .parts
130 364 : .extend(sparse_partitioning.into_dense().parts);
131 364 : let image_layers = self
132 364 : .create_image_layers(
133 364 : &partitioning,
134 364 : lsn,
135 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
136 14 : ImageLayerCreationMode::Force
137 : } else {
138 350 : ImageLayerCreationMode::Try
139 : },
140 364 : &image_ctx,
141 : )
142 14415 : .await?;
143 :
144 364 : self.upload_new_image_layers(image_layers)?;
145 364 : partitioning.parts.len()
146 : }
147 0 : Err(err) => {
148 0 : // no partitioning? This is normal, if the timeline was just created
149 0 : // as an empty timeline. Also in unit tests, when we use the timeline
150 0 : // as a simple key-value store, ignoring the datadir layout. Log the
151 0 : // error but continue.
152 0 : //
153 0 : // Suppress error when it's due to cancellation
154 0 : if !self.cancel.is_cancelled() {
155 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
156 0 : }
157 0 : 1
158 : }
159 : };
160 :
161 364 : if self.shard_identity.count >= ShardCount::new(2) {
162 : // Limit the number of layer rewrites to the number of partitions: this means its
163 : // runtime should be comparable to a full round of image layer creations, rather than
164 : // being potentially much longer.
165 0 : let rewrite_max = partition_count;
166 0 :
167 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
168 364 : }
169 :
170 364 : Ok(())
171 364 : }
172 :
173 : /// Check for layers that are elegible to be rewritten:
174 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
175 : /// we don't indefinitely retain keys in this shard that aren't needed.
176 : /// - For future use: layers beyond pitr_interval that are in formats we would
177 : /// rather not maintain compatibility with indefinitely.
178 : ///
179 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
180 : /// how much work it will try to do in each compaction pass.
181 0 : async fn compact_shard_ancestors(
182 0 : self: &Arc<Self>,
183 0 : rewrite_max: usize,
184 0 : ctx: &RequestContext,
185 0 : ) -> anyhow::Result<()> {
186 0 : let mut drop_layers = Vec::new();
187 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
188 0 :
189 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
190 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
191 0 : // pitr_interval, for example because a branchpoint references it.
192 0 : //
193 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
194 0 : // are rewriting layers.
195 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
196 0 :
197 0 : tracing::info!(
198 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
199 0 : *latest_gc_cutoff,
200 0 : self.gc_info.read().unwrap().cutoffs.time
201 : );
202 :
203 0 : let layers = self.layers.read().await;
204 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
205 0 : let layer = layers.get_from_desc(&layer_desc);
206 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
207 : // This layer does not belong to a historic ancestor, no need to re-image it.
208 0 : continue;
209 0 : }
210 0 :
211 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
212 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
213 0 : let layer_local_page_count = sharded_range.page_count();
214 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
215 0 : if layer_local_page_count == 0 {
216 : // This ancestral layer only covers keys that belong to other shards.
217 : // We include the full metadata in the log: if we had some critical bug that caused
218 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
219 0 : info!(%layer, old_metadata=?layer.metadata(),
220 0 : "dropping layer after shard split, contains no keys for this shard.",
221 : );
222 :
223 0 : if cfg!(debug_assertions) {
224 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
225 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
226 : // should be !is_key_disposable()
227 0 : let range = layer_desc.get_key_range();
228 0 : let mut key = range.start;
229 0 : while key < range.end {
230 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
231 0 : key = key.next();
232 : }
233 0 : }
234 :
235 0 : drop_layers.push(layer);
236 0 : continue;
237 0 : } else if layer_local_page_count != u32::MAX
238 0 : && layer_local_page_count == layer_raw_page_count
239 : {
240 0 : debug!(%layer,
241 0 : "layer is entirely shard local ({} keys), no need to filter it",
242 : layer_local_page_count
243 : );
244 0 : continue;
245 0 : }
246 0 :
247 0 : // Don't bother re-writing a layer unless it will at least halve its size
248 0 : if layer_local_page_count != u32::MAX
249 0 : && layer_local_page_count > layer_raw_page_count / 2
250 : {
251 0 : debug!(%layer,
252 0 : "layer is already mostly local ({}/{}), not rewriting",
253 : layer_local_page_count,
254 : layer_raw_page_count
255 : );
256 0 : }
257 :
258 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
259 : // without incurring the I/O cost of a rewrite.
260 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
261 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
262 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
263 0 : continue;
264 0 : }
265 0 :
266 0 : if layer_desc.is_delta() {
267 : // We do not yet implement rewrite of delta layers
268 0 : debug!(%layer, "Skipping rewrite of delta layer");
269 0 : continue;
270 0 : }
271 0 :
272 0 : // Only rewrite layers if their generations differ. This guarantees:
273 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
274 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
275 0 : if layer.metadata().generation == self.generation {
276 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
277 0 : continue;
278 0 : }
279 0 :
280 0 : if layers_to_rewrite.len() >= rewrite_max {
281 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
282 0 : layers_to_rewrite.len()
283 : );
284 0 : continue;
285 0 : }
286 0 :
287 0 : // Fall through: all our conditions for doing a rewrite passed.
288 0 : layers_to_rewrite.push(layer);
289 : }
290 :
291 : // Drop read lock on layer map before we start doing time-consuming I/O
292 0 : drop(layers);
293 0 :
294 0 : let mut replace_image_layers = Vec::new();
295 :
296 0 : for layer in layers_to_rewrite {
297 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
298 0 : let mut image_layer_writer = ImageLayerWriter::new(
299 0 : self.conf,
300 0 : self.timeline_id,
301 0 : self.tenant_shard_id,
302 0 : &layer.layer_desc().key_range,
303 0 : layer.layer_desc().image_layer_lsn(),
304 0 : ctx,
305 0 : )
306 0 : .await?;
307 :
308 : // Safety of layer rewrites:
309 : // - We are writing to a different local file path than we are reading from, so the old Layer
310 : // cannot interfere with the new one.
311 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
312 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
313 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
314 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
315 : // - Any readers that have a reference to the old layer will keep it alive until they are done
316 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
317 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
318 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
319 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
320 : // - ingestion, which only inserts layers, therefore cannot collide with us.
321 0 : let resident = layer.download_and_keep_resident().await?;
322 :
323 0 : let keys_written = resident
324 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
325 0 : .await?;
326 :
327 0 : if keys_written > 0 {
328 0 : let new_layer = image_layer_writer.finish(self, ctx).await?;
329 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
330 0 : layer.metadata().file_size,
331 0 : new_layer.metadata().file_size);
332 :
333 0 : replace_image_layers.push((layer, new_layer));
334 0 : } else {
335 0 : // Drop the old layer. Usually for this case we would already have noticed that
336 0 : // the layer has no data for us with the ShardedRange check above, but
337 0 : drop_layers.push(layer);
338 0 : }
339 : }
340 :
341 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
342 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
343 : // to remote index) and be removed. This is inefficient but safe.
344 : fail::fail_point!("compact-shard-ancestors-localonly");
345 :
346 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
347 0 : self.rewrite_layers(replace_image_layers, drop_layers)
348 0 : .await?;
349 :
350 : fail::fail_point!("compact-shard-ancestors-enqueued");
351 :
352 : // We wait for all uploads to complete before finishing this compaction stage. This is not
353 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
354 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
355 : // load.
356 0 : self.remote_client.wait_completion().await?;
357 :
358 : fail::fail_point!("compact-shard-ancestors-persistent");
359 :
360 0 : Ok(())
361 0 : }
362 :
363 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
364 : /// as Level 1 files.
365 364 : async fn compact_level0(
366 364 : self: &Arc<Self>,
367 364 : target_file_size: u64,
368 364 : ctx: &RequestContext,
369 364 : ) -> Result<(), CompactionError> {
370 : let CompactLevel0Phase1Result {
371 364 : new_layers,
372 364 : deltas_to_compact,
373 : } = {
374 364 : let phase1_span = info_span!("compact_level0_phase1");
375 364 : let ctx = ctx.attached_child();
376 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
377 364 : version: Some(2),
378 364 : tenant_id: Some(self.tenant_shard_id),
379 364 : timeline_id: Some(self.timeline_id),
380 364 : ..Default::default()
381 364 : };
382 364 :
383 364 : let begin = tokio::time::Instant::now();
384 364 : let phase1_layers_locked = self.layers.read().await;
385 364 : let now = tokio::time::Instant::now();
386 364 : stats.read_lock_acquisition_micros =
387 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
388 364 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
389 364 : .instrument(phase1_span)
390 42647 : .await?
391 : };
392 :
393 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
394 : // nothing to do
395 336 : return Ok(());
396 28 : }
397 28 :
398 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
399 0 : .await?;
400 28 : Ok(())
401 364 : }
402 :
403 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
404 364 : async fn compact_level0_phase1<'a>(
405 364 : self: &'a Arc<Self>,
406 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
407 364 : mut stats: CompactLevel0Phase1StatsBuilder,
408 364 : target_file_size: u64,
409 364 : ctx: &RequestContext,
410 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
411 364 : stats.read_lock_held_spawn_blocking_startup_micros =
412 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
413 364 : let layers = guard.layer_map();
414 364 : let level0_deltas = layers.get_level0_deltas()?;
415 364 : let mut level0_deltas = level0_deltas
416 364 : .into_iter()
417 1616 : .map(|x| guard.get_from_desc(&x))
418 364 : .collect_vec();
419 364 : stats.level0_deltas_count = Some(level0_deltas.len());
420 364 :
421 364 : // Only compact if enough layers have accumulated.
422 364 : let threshold = self.get_compaction_threshold();
423 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
424 336 : debug!(
425 0 : level0_deltas = level0_deltas.len(),
426 0 : threshold, "too few deltas to compact"
427 : );
428 336 : return Ok(CompactLevel0Phase1Result::default());
429 28 : }
430 28 :
431 28 : // Gather the files to compact in this iteration.
432 28 : //
433 28 : // Start with the oldest Level 0 delta file, and collect any other
434 28 : // level 0 files that form a contiguous sequence, such that the end
435 28 : // LSN of previous file matches the start LSN of the next file.
436 28 : //
437 28 : // Note that if the files don't form such a sequence, we might
438 28 : // "compact" just a single file. That's a bit pointless, but it allows
439 28 : // us to get rid of the level 0 file, and compact the other files on
440 28 : // the next iteration. This could probably made smarter, but such
441 28 : // "gaps" in the sequence of level 0 files should only happen in case
442 28 : // of a crash, partial download from cloud storage, or something like
443 28 : // that, so it's not a big deal in practice.
444 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
445 28 : let mut level0_deltas_iter = level0_deltas.iter();
446 28 :
447 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
448 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
449 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
450 28 :
451 28 : // Accumulate the size of layers in `deltas_to_compact`
452 28 : let mut deltas_to_compact_bytes = 0;
453 28 :
454 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
455 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
456 28 : // work in this function to only operate on this much delta data at once.
457 28 : //
458 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
459 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
460 28 : // still let them compact a full stack of L0s in one go.
461 28 : let delta_size_limit = std::cmp::max(
462 28 : self.get_compaction_threshold(),
463 28 : DEFAULT_COMPACTION_THRESHOLD,
464 28 : ) as u64
465 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
466 28 :
467 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
468 402 : for l in level0_deltas_iter {
469 374 : let lsn_range = &l.layer_desc().lsn_range;
470 374 :
471 374 : if lsn_range.start != prev_lsn_end {
472 0 : break;
473 374 : }
474 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
475 374 : deltas_to_compact_bytes += l.metadata().file_size;
476 374 : prev_lsn_end = lsn_range.end;
477 374 :
478 374 : if deltas_to_compact_bytes >= delta_size_limit {
479 0 : info!(
480 0 : l0_deltas_selected = deltas_to_compact.len(),
481 0 : l0_deltas_total = level0_deltas.len(),
482 0 : "L0 compaction picker hit max delta layer size limit: {}",
483 : delta_size_limit
484 : );
485 :
486 : // Proceed with compaction, but only a subset of L0s
487 0 : break;
488 374 : }
489 : }
490 28 : let lsn_range = Range {
491 28 : start: deltas_to_compact
492 28 : .first()
493 28 : .unwrap()
494 28 : .layer_desc()
495 28 : .lsn_range
496 28 : .start,
497 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
498 28 : };
499 28 :
500 28 : info!(
501 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
502 0 : lsn_range.start,
503 0 : lsn_range.end,
504 0 : deltas_to_compact.len(),
505 0 : level0_deltas.len()
506 : );
507 :
508 402 : for l in deltas_to_compact.iter() {
509 402 : info!("compact includes {l}");
510 : }
511 :
512 : // We don't need the original list of layers anymore. Drop it so that
513 : // we don't accidentally use it later in the function.
514 28 : drop(level0_deltas);
515 28 :
516 28 : stats.read_lock_held_prerequisites_micros = stats
517 28 : .read_lock_held_spawn_blocking_startup_micros
518 28 : .till_now();
519 28 :
520 28 : // Determine N largest holes where N is number of compacted layers.
521 28 : let max_holes = deltas_to_compact.len();
522 28 : let last_record_lsn = self.get_last_record_lsn();
523 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
524 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
525 28 :
526 28 : // min-heap (reserve space for one more element added before eviction)
527 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
528 28 : let mut prev: Option<Key> = None;
529 28 :
530 28 : let mut all_keys = Vec::new();
531 :
532 402 : for l in deltas_to_compact.iter() {
533 2413 : all_keys.extend(l.load_keys(ctx).await?);
534 : }
535 :
536 : // FIXME: should spawn_blocking the rest of this function
537 :
538 : // The current stdlib sorting implementation is designed in a way where it is
539 : // particularly fast where the slice is made up of sorted sub-ranges.
540 4431792 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
541 28 :
542 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
543 :
544 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
545 2064038 : if let Some(prev_key) = prev {
546 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
547 : // compaction is the gap between data key and metadata keys.
548 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
549 0 : && !Key::is_metadata_key(&prev_key)
550 : {
551 0 : let key_range = prev_key..next_key;
552 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
553 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
554 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
555 0 : // That is why it is better to measure size of hole as number of covering image layers.
556 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
557 0 : if coverage_size >= min_hole_coverage_size {
558 0 : heap.push(Hole {
559 0 : key_range,
560 0 : coverage_size,
561 0 : });
562 0 : if heap.len() > max_holes {
563 0 : heap.pop(); // remove smallest hole
564 0 : }
565 0 : }
566 2064010 : }
567 28 : }
568 2064038 : prev = Some(next_key.next());
569 : }
570 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
571 28 : drop_rlock(guard);
572 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
573 28 : let mut holes = heap.into_vec();
574 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
575 28 : let mut next_hole = 0; // index of next hole in holes vector
576 28 :
577 28 : // This iterator walks through all key-value pairs from all the layers
578 28 : // we're compacting, in key, LSN order.
579 28 : let all_values_iter = all_keys.iter();
580 28 :
581 28 : // This iterator walks through all keys and is needed to calculate size used by each key
582 28 : let mut all_keys_iter = all_keys
583 28 : .iter()
584 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
585 2064010 : .coalesce(|mut prev, cur| {
586 2064010 : // Coalesce keys that belong to the same key pair.
587 2064010 : // This ensures that compaction doesn't put them
588 2064010 : // into different layer files.
589 2064010 : // Still limit this by the target file size,
590 2064010 : // so that we keep the size of the files in
591 2064010 : // check.
592 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
593 40038 : prev.2 += cur.2;
594 40038 : Ok(prev)
595 : } else {
596 2023972 : Err((prev, cur))
597 : }
598 2064010 : });
599 28 :
600 28 : // Merge the contents of all the input delta layers into a new set
601 28 : // of delta layers, based on the current partitioning.
602 28 : //
603 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
604 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
605 28 : // would be too large. In that case, we also split on the LSN dimension.
606 28 : //
607 28 : // LSN
608 28 : // ^
609 28 : // |
610 28 : // | +-----------+ +--+--+--+--+
611 28 : // | | | | | | | |
612 28 : // | +-----------+ | | | | |
613 28 : // | | | | | | | |
614 28 : // | +-----------+ ==> | | | | |
615 28 : // | | | | | | | |
616 28 : // | +-----------+ | | | | |
617 28 : // | | | | | | | |
618 28 : // | +-----------+ +--+--+--+--+
619 28 : // |
620 28 : // +--------------> key
621 28 : //
622 28 : //
623 28 : // If one key (X) has a lot of page versions:
624 28 : //
625 28 : // LSN
626 28 : // ^
627 28 : // | (X)
628 28 : // | +-----------+ +--+--+--+--+
629 28 : // | | | | | | | |
630 28 : // | +-----------+ | | +--+ |
631 28 : // | | | | | | | |
632 28 : // | +-----------+ ==> | | | | |
633 28 : // | | | | | +--+ |
634 28 : // | +-----------+ | | | | |
635 28 : // | | | | | | | |
636 28 : // | +-----------+ +--+--+--+--+
637 28 : // |
638 28 : // +--------------> key
639 28 : // TODO: this actually divides the layers into fixed-size chunks, not
640 28 : // based on the partitioning.
641 28 : //
642 28 : // TODO: we should also opportunistically materialize and
643 28 : // garbage collect what we can.
644 28 : let mut new_layers = Vec::new();
645 28 : let mut prev_key: Option<Key> = None;
646 28 : let mut writer: Option<DeltaLayerWriter> = None;
647 28 : let mut key_values_total_size = 0u64;
648 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
649 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
650 :
651 : for &DeltaEntry {
652 2064038 : key, lsn, ref val, ..
653 2064066 : } in all_values_iter
654 : {
655 2064038 : let value = val.load(ctx).await?;
656 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
657 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
658 2064038 : if !same_key || lsn == dup_end_lsn {
659 2024000 : let mut next_key_size = 0u64;
660 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
661 2024000 : dup_start_lsn = Lsn::INVALID;
662 2024000 : if !same_key {
663 2024000 : dup_end_lsn = Lsn::INVALID;
664 2024000 : }
665 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
666 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
667 2024000 : next_key_size = next_size;
668 2024000 : if key != next_key {
669 2023972 : if dup_end_lsn.is_valid() {
670 0 : // We are writting segment with duplicates:
671 0 : // place all remaining values of this key in separate segment
672 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
673 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
674 2023972 : }
675 2023972 : break;
676 28 : }
677 28 : key_values_total_size += next_size;
678 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
679 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
680 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
681 : // Split key between multiple layers: such layer can contain only single key
682 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
683 0 : dup_end_lsn // new segment with duplicates starts where old one stops
684 : } else {
685 0 : lsn // start with the first LSN for this key
686 : };
687 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
688 0 : break;
689 28 : }
690 : }
691 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
692 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
693 0 : dup_start_lsn = dup_end_lsn;
694 0 : dup_end_lsn = lsn_range.end;
695 2024000 : }
696 2024000 : if writer.is_some() {
697 2023972 : let written_size = writer.as_mut().unwrap().size();
698 2023972 : let contains_hole =
699 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
700 : // check if key cause layer overflow or contains hole...
701 2023972 : if is_dup_layer
702 2023972 : || dup_end_lsn.is_valid()
703 2023972 : || written_size + key_values_total_size > target_file_size
704 2023692 : || contains_hole
705 : {
706 : // ... if so, flush previous layer and prepare to write new one
707 280 : new_layers.push(
708 280 : writer
709 280 : .take()
710 280 : .unwrap()
711 280 : .finish(prev_key.unwrap().next(), self, ctx)
712 720 : .await?,
713 : );
714 280 : writer = None;
715 280 :
716 280 : if contains_hole {
717 0 : // skip hole
718 0 : next_hole += 1;
719 280 : }
720 2023692 : }
721 28 : }
722 : // Remember size of key value because at next iteration we will access next item
723 2024000 : key_values_total_size = next_key_size;
724 40038 : }
725 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
726 0 : Err(CompactionError::Other(anyhow::anyhow!(
727 0 : "failpoint delta-layer-writer-fail-before-finish"
728 0 : )))
729 2064038 : });
730 :
731 2064038 : if !self.shard_identity.is_key_disposable(&key) {
732 2064038 : if writer.is_none() {
733 308 : // Create writer if not initiaized yet
734 308 : writer = Some(
735 : DeltaLayerWriter::new(
736 308 : self.conf,
737 308 : self.timeline_id,
738 308 : self.tenant_shard_id,
739 308 : key,
740 308 : if dup_end_lsn.is_valid() {
741 : // this is a layer containing slice of values of the same key
742 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
743 0 : dup_start_lsn..dup_end_lsn
744 : } else {
745 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
746 308 : lsn_range.clone()
747 : },
748 308 : ctx,
749 : )
750 154 : .await?,
751 : );
752 2063730 : }
753 :
754 2064038 : writer
755 2064038 : .as_mut()
756 2064038 : .unwrap()
757 2064038 : .put_value(key, lsn, value, ctx)
758 1566 : .await?;
759 : } else {
760 0 : debug!(
761 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
762 0 : key,
763 0 : self.shard_identity.get_shard_number(&key)
764 : );
765 : }
766 :
767 2064038 : if !new_layers.is_empty() {
768 19786 : fail_point!("after-timeline-compacted-first-L1");
769 2044252 : }
770 :
771 2064038 : prev_key = Some(key);
772 : }
773 28 : if let Some(writer) = writer {
774 1991 : new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
775 0 : }
776 :
777 : // Sync layers
778 28 : if !new_layers.is_empty() {
779 : // Print a warning if the created layer is larger than double the target size
780 : // Add two pages for potential overhead. This should in theory be already
781 : // accounted for in the target calculation, but for very small targets,
782 : // we still might easily hit the limit otherwise.
783 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
784 308 : for layer in new_layers.iter() {
785 308 : if layer.layer_desc().file_size > warn_limit {
786 0 : warn!(
787 : %layer,
788 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
789 : );
790 308 : }
791 : }
792 :
793 : // The writer.finish() above already did the fsync of the inodes.
794 : // We just need to fsync the directory in which these inodes are linked,
795 : // which we know to be the timeline directory.
796 : //
797 : // We use fatal_err() below because the after writer.finish() returns with success,
798 : // the in-memory state of the filesystem already has the layer file in its final place,
799 : // and subsequent pageserver code could think it's durable while it really isn't.
800 28 : let timeline_dir = VirtualFile::open(
801 28 : &self
802 28 : .conf
803 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
804 28 : ctx,
805 28 : )
806 14 : .await
807 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
808 28 : timeline_dir
809 28 : .sync_all()
810 14 : .await
811 28 : .fatal_err("VirtualFile::sync_all timeline dir");
812 0 : }
813 :
814 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
815 28 : stats.new_deltas_count = Some(new_layers.len());
816 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
817 28 :
818 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
819 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
820 : {
821 28 : Ok(stats_json) => {
822 28 : info!(
823 0 : stats_json = stats_json.as_str(),
824 0 : "compact_level0_phase1 stats available"
825 : )
826 : }
827 0 : Err(e) => {
828 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
829 : }
830 : }
831 :
832 28 : Ok(CompactLevel0Phase1Result {
833 28 : new_layers,
834 28 : deltas_to_compact: deltas_to_compact
835 28 : .into_iter()
836 402 : .map(|x| x.drop_eviction_guard())
837 28 : .collect::<Vec<_>>(),
838 28 : })
839 364 : }
840 : }
841 :
842 : #[derive(Default)]
843 : struct CompactLevel0Phase1Result {
844 : new_layers: Vec<ResidentLayer>,
845 : deltas_to_compact: Vec<Layer>,
846 : }
847 :
848 : #[derive(Default)]
849 : struct CompactLevel0Phase1StatsBuilder {
850 : version: Option<u64>,
851 : tenant_id: Option<TenantShardId>,
852 : timeline_id: Option<TimelineId>,
853 : read_lock_acquisition_micros: DurationRecorder,
854 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
855 : read_lock_held_key_sort_micros: DurationRecorder,
856 : read_lock_held_prerequisites_micros: DurationRecorder,
857 : read_lock_held_compute_holes_micros: DurationRecorder,
858 : read_lock_drop_micros: DurationRecorder,
859 : write_layer_files_micros: DurationRecorder,
860 : level0_deltas_count: Option<usize>,
861 : new_deltas_count: Option<usize>,
862 : new_deltas_size: Option<u64>,
863 : }
864 :
865 : #[derive(serde::Serialize)]
866 : struct CompactLevel0Phase1Stats {
867 : version: u64,
868 : tenant_id: TenantShardId,
869 : timeline_id: TimelineId,
870 : read_lock_acquisition_micros: RecordedDuration,
871 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
872 : read_lock_held_key_sort_micros: RecordedDuration,
873 : read_lock_held_prerequisites_micros: RecordedDuration,
874 : read_lock_held_compute_holes_micros: RecordedDuration,
875 : read_lock_drop_micros: RecordedDuration,
876 : write_layer_files_micros: RecordedDuration,
877 : level0_deltas_count: usize,
878 : new_deltas_count: usize,
879 : new_deltas_size: u64,
880 : }
881 :
882 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
883 : type Error = anyhow::Error;
884 :
885 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
886 28 : Ok(Self {
887 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
888 28 : tenant_id: value
889 28 : .tenant_id
890 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
891 28 : timeline_id: value
892 28 : .timeline_id
893 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
894 28 : read_lock_acquisition_micros: value
895 28 : .read_lock_acquisition_micros
896 28 : .into_recorded()
897 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
898 28 : read_lock_held_spawn_blocking_startup_micros: value
899 28 : .read_lock_held_spawn_blocking_startup_micros
900 28 : .into_recorded()
901 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
902 28 : read_lock_held_key_sort_micros: value
903 28 : .read_lock_held_key_sort_micros
904 28 : .into_recorded()
905 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
906 28 : read_lock_held_prerequisites_micros: value
907 28 : .read_lock_held_prerequisites_micros
908 28 : .into_recorded()
909 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
910 28 : read_lock_held_compute_holes_micros: value
911 28 : .read_lock_held_compute_holes_micros
912 28 : .into_recorded()
913 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
914 28 : read_lock_drop_micros: value
915 28 : .read_lock_drop_micros
916 28 : .into_recorded()
917 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
918 28 : write_layer_files_micros: value
919 28 : .write_layer_files_micros
920 28 : .into_recorded()
921 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
922 28 : level0_deltas_count: value
923 28 : .level0_deltas_count
924 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
925 28 : new_deltas_count: value
926 28 : .new_deltas_count
927 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
928 28 : new_deltas_size: value
929 28 : .new_deltas_size
930 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
931 : })
932 28 : }
933 : }
934 :
935 : impl Timeline {
936 : /// Entry point for new tiered compaction algorithm.
937 : ///
938 : /// All the real work is in the implementation in the pageserver_compaction
939 : /// crate. The code here would apply to any algorithm implemented by the
940 : /// same interface, but tiered is the only one at the moment.
941 : ///
942 : /// TODO: cancellation
943 0 : pub(crate) async fn compact_tiered(
944 0 : self: &Arc<Self>,
945 0 : _cancel: &CancellationToken,
946 0 : ctx: &RequestContext,
947 0 : ) -> Result<(), CompactionError> {
948 0 : let fanout = self.get_compaction_threshold() as u64;
949 0 : let target_file_size = self.get_checkpoint_distance();
950 :
951 : // Find the top of the historical layers
952 0 : let end_lsn = {
953 0 : let guard = self.layers.read().await;
954 0 : let layers = guard.layer_map();
955 :
956 0 : let l0_deltas = layers.get_level0_deltas()?;
957 0 : drop(guard);
958 0 :
959 0 : // As an optimization, if we find that there are too few L0 layers,
960 0 : // bail out early. We know that the compaction algorithm would do
961 0 : // nothing in that case.
962 0 : if l0_deltas.len() < fanout as usize {
963 : // doesn't need compacting
964 0 : return Ok(());
965 0 : }
966 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
967 0 : };
968 0 :
969 0 : // Is the timeline being deleted?
970 0 : if self.is_stopping() {
971 0 : trace!("Dropping out of compaction on timeline shutdown");
972 0 : return Err(CompactionError::ShuttingDown);
973 0 : }
974 :
975 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
976 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
977 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
978 0 :
979 0 : pageserver_compaction::compact_tiered::compact_tiered(
980 0 : &mut adaptor,
981 0 : end_lsn,
982 0 : target_file_size,
983 0 : fanout,
984 0 : ctx,
985 0 : )
986 0 : .await?;
987 :
988 0 : adaptor.flush_updates().await?;
989 0 : Ok(())
990 0 : }
991 :
992 : /// An experimental compaction building block that combines compaction with garbage collection.
993 : ///
994 : /// The current implementation picks all delta + image layers that are below or intersecting with
995 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
996 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
997 : /// and create delta layers with all deltas >= gc horizon.
998 4 : pub(crate) async fn compact_with_gc(
999 4 : self: &Arc<Self>,
1000 4 : _cancel: &CancellationToken,
1001 4 : ctx: &RequestContext,
1002 4 : ) -> Result<(), CompactionError> {
1003 4 : use crate::tenant::storage_layer::ValueReconstructState;
1004 4 : use std::collections::BTreeSet;
1005 4 :
1006 4 : info!("running enhanced gc bottom-most compaction");
1007 :
1008 : scopeguard::defer! {
1009 : info!("done enhanced gc bottom-most compaction");
1010 : };
1011 :
1012 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1013 : // The layer selection has the following properties:
1014 : // 1. If a layer is in the selection, all layers below it are in the selection.
1015 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1016 4 : let (layer_selection, gc_cutoff) = {
1017 4 : let guard = self.layers.read().await;
1018 4 : let layers = guard.layer_map();
1019 4 : let gc_info = self.gc_info.read().unwrap();
1020 4 : if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
1021 0 : return Err(CompactionError::Other(anyhow!(
1022 0 : "enhanced legacy compaction currently does not support retain_lsns (branches)"
1023 0 : )));
1024 4 : }
1025 4 : let gc_cutoff = gc_info.cutoffs.select_min();
1026 4 : let mut selected_layers = Vec::new();
1027 4 : // TODO: consider retain_lsns
1028 4 : drop(gc_info);
1029 20 : for desc in layers.iter_historic_layers() {
1030 20 : if desc.get_lsn_range().start <= gc_cutoff {
1031 16 : selected_layers.push(guard.get_from_desc(&desc));
1032 16 : }
1033 : }
1034 4 : (selected_layers, gc_cutoff)
1035 4 : };
1036 4 : info!(
1037 0 : "picked {} layers for compaction with gc_cutoff={}",
1038 0 : layer_selection.len(),
1039 : gc_cutoff
1040 : );
1041 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1042 : // Also, collect the layer information to decide when to split the new delta layers.
1043 4 : let mut downloaded_layers = Vec::new();
1044 4 : let mut delta_split_points = BTreeSet::new();
1045 20 : for layer in &layer_selection {
1046 16 : let resident_layer = layer.download_and_keep_resident().await?;
1047 16 : downloaded_layers.push(resident_layer);
1048 16 :
1049 16 : let desc = layer.layer_desc();
1050 16 : if desc.is_delta() {
1051 8 : // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
1052 8 : // so that we can avoid having too many small delta layers.
1053 8 : let key_range = desc.get_key_range();
1054 8 : delta_split_points.insert(key_range.start);
1055 8 : delta_split_points.insert(key_range.end);
1056 8 : }
1057 : }
1058 4 : let mut delta_layers = Vec::new();
1059 4 : let mut image_layers = Vec::new();
1060 20 : for resident_layer in &downloaded_layers {
1061 16 : if resident_layer.layer_desc().is_delta() {
1062 8 : let layer = resident_layer.get_as_delta(ctx).await?;
1063 8 : delta_layers.push(layer);
1064 8 : } else {
1065 8 : let layer = resident_layer.get_as_image(ctx).await?;
1066 8 : image_layers.push(layer);
1067 : }
1068 : }
1069 4 : let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
1070 4 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1071 4 : // Data of the same key.
1072 4 : let mut accumulated_values = Vec::new();
1073 4 : let mut last_key: Option<Key> = None;
1074 :
1075 : /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
1076 72 : async fn flush_accumulated_states(
1077 72 : tline: &Arc<Timeline>,
1078 72 : key: Key,
1079 72 : accumulated_values: &[(Key, Lsn, crate::repository::Value)],
1080 72 : horizon: Lsn,
1081 72 : ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
1082 72 : let mut base_image = None;
1083 72 : let mut keys_above_horizon = Vec::new();
1084 72 : let mut delta_above_base_image = Vec::new();
1085 : // We have a list of deltas/images. We want to create image layers while collect garbages.
1086 88 : for (key, lsn, val) in accumulated_values.iter().rev() {
1087 88 : if *lsn > horizon {
1088 4 : if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
1089 0 : if *prev_lsn == *lsn {
1090 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1091 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1092 : // drop this delta and keep the image.
1093 : //
1094 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1095 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1096 : // dropped.
1097 0 : continue;
1098 0 : }
1099 4 : }
1100 4 : keys_above_horizon.push((*key, *lsn, val.clone()));
1101 84 : } else if *lsn <= horizon {
1102 84 : match val {
1103 72 : crate::repository::Value::Image(image) => {
1104 72 : base_image = Some((*lsn, image.clone()));
1105 72 : break;
1106 : }
1107 12 : crate::repository::Value::WalRecord(wal) => {
1108 12 : delta_above_base_image.push((*lsn, wal.clone()));
1109 12 : }
1110 : }
1111 0 : }
1112 : }
1113 : // do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
1114 72 : keys_above_horizon.reverse();
1115 72 : let state = ValueReconstructState {
1116 72 : img: base_image,
1117 72 : records: delta_above_base_image,
1118 72 : };
1119 72 : let img = tline.reconstruct_value(key, horizon, state).await?;
1120 72 : Ok((keys_above_horizon, img))
1121 72 : }
1122 :
1123 72 : async fn flush_deltas(
1124 72 : deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
1125 72 : last_key: Key,
1126 72 : delta_split_points: &[Key],
1127 72 : current_delta_split_point: &mut usize,
1128 72 : tline: &Arc<Timeline>,
1129 72 : gc_cutoff: Lsn,
1130 72 : ctx: &RequestContext,
1131 72 : ) -> anyhow::Result<Option<ResidentLayer>> {
1132 72 : // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
1133 72 : // overlapping layers.
1134 72 : //
1135 72 : // If we have a structure like this:
1136 72 : //
1137 72 : // | Delta 1 | | Delta 4 |
1138 72 : // |---------| Delta 2 |---------|
1139 72 : // | Delta 3 | | Delta 5 |
1140 72 : //
1141 72 : // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
1142 72 : // A simple solution here is to split the delta layers using the original boundary, while this
1143 72 : // might produce a lot of small layers. This should be improved and fixed in the future.
1144 72 : let mut need_split = false;
1145 88 : while *current_delta_split_point < delta_split_points.len()
1146 76 : && last_key >= delta_split_points[*current_delta_split_point]
1147 16 : {
1148 16 : *current_delta_split_point += 1;
1149 16 : need_split = true;
1150 16 : }
1151 72 : if !need_split {
1152 56 : return Ok(None);
1153 16 : }
1154 16 : let deltas = std::mem::take(deltas);
1155 16 : if deltas.is_empty() {
1156 12 : return Ok(None);
1157 4 : }
1158 4 : let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
1159 4 : let mut delta_layer_writer = DeltaLayerWriter::new(
1160 4 : tline.conf,
1161 4 : tline.timeline_id,
1162 4 : tline.tenant_shard_id,
1163 4 : deltas.first().unwrap().0,
1164 4 : gc_cutoff..end_lsn,
1165 4 : ctx,
1166 4 : )
1167 2 : .await?;
1168 4 : let key_end = deltas.last().unwrap().0.next();
1169 8 : for (key, lsn, val) in deltas {
1170 4 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1171 : }
1172 10 : let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
1173 4 : Ok(Some(delta_layer))
1174 72 : }
1175 :
1176 4 : let mut image_layer_writer = ImageLayerWriter::new(
1177 4 : self.conf,
1178 4 : self.timeline_id,
1179 4 : self.tenant_shard_id,
1180 4 : &(Key::MIN..Key::MAX), // covers the full key range
1181 4 : gc_cutoff,
1182 4 : ctx,
1183 4 : )
1184 2 : .await?;
1185 :
1186 4 : let mut delta_values = Vec::new();
1187 4 : let delta_split_points = delta_split_points.into_iter().collect_vec();
1188 4 : let mut current_delta_split_point = 0;
1189 4 : let mut delta_layers = Vec::new();
1190 100 : while let Some((key, lsn, val)) = merge_iter.next().await? {
1191 96 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
1192 28 : if last_key.is_none() {
1193 4 : last_key = Some(key);
1194 24 : }
1195 28 : accumulated_values.push((key, lsn, val));
1196 : } else {
1197 68 : let last_key = last_key.as_mut().unwrap();
1198 68 : let (deltas, image) =
1199 68 : flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff)
1200 0 : .await?;
1201 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1202 69 : image_layer_writer.put_image(*last_key, image, ctx).await?;
1203 68 : delta_values.extend(deltas);
1204 68 : delta_layers.extend(
1205 68 : flush_deltas(
1206 68 : &mut delta_values,
1207 68 : *last_key,
1208 68 : &delta_split_points,
1209 68 : &mut current_delta_split_point,
1210 68 : self,
1211 68 : gc_cutoff,
1212 68 : ctx,
1213 68 : )
1214 12 : .await?,
1215 : );
1216 68 : accumulated_values.clear();
1217 68 : *last_key = key;
1218 68 : accumulated_values.push((key, lsn, val));
1219 : }
1220 : }
1221 :
1222 4 : let last_key = last_key.expect("no keys produced during compaction");
1223 : // TODO: move this part to the loop body
1224 4 : let (deltas, image) =
1225 4 : flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
1226 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1227 4 : image_layer_writer.put_image(last_key, image, ctx).await?;
1228 4 : delta_values.extend(deltas);
1229 4 : delta_layers.extend(
1230 4 : flush_deltas(
1231 4 : &mut delta_values,
1232 4 : last_key,
1233 4 : &delta_split_points,
1234 4 : &mut current_delta_split_point,
1235 4 : self,
1236 4 : gc_cutoff,
1237 4 : ctx,
1238 4 : )
1239 0 : .await?,
1240 : );
1241 :
1242 9 : let image_layer = image_layer_writer.finish(self, ctx).await?;
1243 4 : info!(
1244 0 : "produced {} delta layers and {} image layers",
1245 0 : delta_layers.len(),
1246 : 1
1247 : );
1248 4 : let mut compact_to = Vec::new();
1249 4 : compact_to.extend(delta_layers);
1250 4 : compact_to.push(image_layer);
1251 : // Step 3: Place back to the layer map.
1252 : {
1253 4 : let mut guard = self.layers.write().await;
1254 4 : guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
1255 4 : };
1256 4 :
1257 4 : self.remote_client
1258 4 : .schedule_compaction_update(&layer_selection, &compact_to)?;
1259 4 : Ok(())
1260 4 : }
1261 : }
1262 :
1263 : struct TimelineAdaptor {
1264 : timeline: Arc<Timeline>,
1265 :
1266 : keyspace: (Lsn, KeySpace),
1267 :
1268 : new_deltas: Vec<ResidentLayer>,
1269 : new_images: Vec<ResidentLayer>,
1270 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
1271 : }
1272 :
1273 : impl TimelineAdaptor {
1274 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
1275 0 : Self {
1276 0 : timeline: timeline.clone(),
1277 0 : keyspace,
1278 0 : new_images: Vec::new(),
1279 0 : new_deltas: Vec::new(),
1280 0 : layers_to_delete: Vec::new(),
1281 0 : }
1282 0 : }
1283 :
1284 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
1285 0 : let layers_to_delete = {
1286 0 : let guard = self.timeline.layers.read().await;
1287 0 : self.layers_to_delete
1288 0 : .iter()
1289 0 : .map(|x| guard.get_from_desc(x))
1290 0 : .collect::<Vec<Layer>>()
1291 0 : };
1292 0 : self.timeline
1293 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
1294 0 : .await?;
1295 :
1296 0 : self.timeline
1297 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
1298 :
1299 0 : self.new_deltas.clear();
1300 0 : self.layers_to_delete.clear();
1301 0 : Ok(())
1302 0 : }
1303 : }
1304 :
1305 : #[derive(Clone)]
1306 : struct ResidentDeltaLayer(ResidentLayer);
1307 : #[derive(Clone)]
1308 : struct ResidentImageLayer(ResidentLayer);
1309 :
1310 : impl CompactionJobExecutor for TimelineAdaptor {
1311 : type Key = crate::repository::Key;
1312 :
1313 : type Layer = OwnArc<PersistentLayerDesc>;
1314 : type DeltaLayer = ResidentDeltaLayer;
1315 : type ImageLayer = ResidentImageLayer;
1316 :
1317 : type RequestContext = crate::context::RequestContext;
1318 :
1319 0 : fn get_shard_identity(&self) -> &ShardIdentity {
1320 0 : self.timeline.get_shard_identity()
1321 0 : }
1322 :
1323 0 : async fn get_layers(
1324 0 : &mut self,
1325 0 : key_range: &Range<Key>,
1326 0 : lsn_range: &Range<Lsn>,
1327 0 : _ctx: &RequestContext,
1328 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
1329 0 : self.flush_updates().await?;
1330 :
1331 0 : let guard = self.timeline.layers.read().await;
1332 0 : let layer_map = guard.layer_map();
1333 0 :
1334 0 : let result = layer_map
1335 0 : .iter_historic_layers()
1336 0 : .filter(|l| {
1337 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
1338 0 : })
1339 0 : .map(OwnArc)
1340 0 : .collect();
1341 0 : Ok(result)
1342 0 : }
1343 :
1344 0 : async fn get_keyspace(
1345 0 : &mut self,
1346 0 : key_range: &Range<Key>,
1347 0 : lsn: Lsn,
1348 0 : _ctx: &RequestContext,
1349 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
1350 0 : if lsn == self.keyspace.0 {
1351 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
1352 0 : &self.keyspace.1.ranges,
1353 0 : key_range,
1354 0 : ))
1355 : } else {
1356 : // The current compaction implementatin only ever requests the key space
1357 : // at the compaction end LSN.
1358 0 : anyhow::bail!("keyspace not available for requested lsn");
1359 : }
1360 0 : }
1361 :
1362 0 : async fn downcast_delta_layer(
1363 0 : &self,
1364 0 : layer: &OwnArc<PersistentLayerDesc>,
1365 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
1366 0 : // this is a lot more complex than a simple downcast...
1367 0 : if layer.is_delta() {
1368 0 : let l = {
1369 0 : let guard = self.timeline.layers.read().await;
1370 0 : guard.get_from_desc(layer)
1371 : };
1372 0 : let result = l.download_and_keep_resident().await?;
1373 :
1374 0 : Ok(Some(ResidentDeltaLayer(result)))
1375 : } else {
1376 0 : Ok(None)
1377 : }
1378 0 : }
1379 :
1380 0 : async fn create_image(
1381 0 : &mut self,
1382 0 : lsn: Lsn,
1383 0 : key_range: &Range<Key>,
1384 0 : ctx: &RequestContext,
1385 0 : ) -> anyhow::Result<()> {
1386 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
1387 0 : }
1388 :
1389 0 : async fn create_delta(
1390 0 : &mut self,
1391 0 : lsn_range: &Range<Lsn>,
1392 0 : key_range: &Range<Key>,
1393 0 : input_layers: &[ResidentDeltaLayer],
1394 0 : ctx: &RequestContext,
1395 0 : ) -> anyhow::Result<()> {
1396 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1397 :
1398 0 : let mut all_entries = Vec::new();
1399 0 : for dl in input_layers.iter() {
1400 0 : all_entries.extend(dl.load_keys(ctx).await?);
1401 : }
1402 :
1403 : // The current stdlib sorting implementation is designed in a way where it is
1404 : // particularly fast where the slice is made up of sorted sub-ranges.
1405 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1406 :
1407 0 : let mut writer = DeltaLayerWriter::new(
1408 0 : self.timeline.conf,
1409 0 : self.timeline.timeline_id,
1410 0 : self.timeline.tenant_shard_id,
1411 0 : key_range.start,
1412 0 : lsn_range.clone(),
1413 0 : ctx,
1414 0 : )
1415 0 : .await?;
1416 :
1417 0 : let mut dup_values = 0;
1418 0 :
1419 0 : // This iterator walks through all key-value pairs from all the layers
1420 0 : // we're compacting, in key, LSN order.
1421 0 : let mut prev: Option<(Key, Lsn)> = None;
1422 : for &DeltaEntry {
1423 0 : key, lsn, ref val, ..
1424 0 : } in all_entries.iter()
1425 : {
1426 0 : if prev == Some((key, lsn)) {
1427 : // This is a duplicate. Skip it.
1428 : //
1429 : // It can happen if compaction is interrupted after writing some
1430 : // layers but not all, and we are compacting the range again.
1431 : // The calculations in the algorithm assume that there are no
1432 : // duplicates, so the math on targeted file size is likely off,
1433 : // and we will create smaller files than expected.
1434 0 : dup_values += 1;
1435 0 : continue;
1436 0 : }
1437 :
1438 0 : let value = val.load(ctx).await?;
1439 :
1440 0 : writer.put_value(key, lsn, value, ctx).await?;
1441 :
1442 0 : prev = Some((key, lsn));
1443 : }
1444 :
1445 0 : if dup_values > 0 {
1446 0 : warn!("delta layer created with {} duplicate values", dup_values);
1447 0 : }
1448 :
1449 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1450 0 : Err(anyhow::anyhow!(
1451 0 : "failpoint delta-layer-writer-fail-before-finish"
1452 0 : ))
1453 0 : });
1454 :
1455 0 : let new_delta_layer = writer
1456 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
1457 0 : .await?;
1458 :
1459 0 : self.new_deltas.push(new_delta_layer);
1460 0 : Ok(())
1461 0 : }
1462 :
1463 0 : async fn delete_layer(
1464 0 : &mut self,
1465 0 : layer: &OwnArc<PersistentLayerDesc>,
1466 0 : _ctx: &RequestContext,
1467 0 : ) -> anyhow::Result<()> {
1468 0 : self.layers_to_delete.push(layer.clone().0);
1469 0 : Ok(())
1470 0 : }
1471 : }
1472 :
1473 : impl TimelineAdaptor {
1474 0 : async fn create_image_impl(
1475 0 : &mut self,
1476 0 : lsn: Lsn,
1477 0 : key_range: &Range<Key>,
1478 0 : ctx: &RequestContext,
1479 0 : ) -> Result<(), CreateImageLayersError> {
1480 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
1481 :
1482 0 : let image_layer_writer = ImageLayerWriter::new(
1483 0 : self.timeline.conf,
1484 0 : self.timeline.timeline_id,
1485 0 : self.timeline.tenant_shard_id,
1486 0 : key_range,
1487 0 : lsn,
1488 0 : ctx,
1489 0 : )
1490 0 : .await?;
1491 :
1492 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1493 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
1494 0 : "failpoint image-layer-writer-fail-before-finish"
1495 0 : )))
1496 0 : });
1497 :
1498 0 : let keyspace = KeySpace {
1499 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
1500 : };
1501 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
1502 0 : let start = Key::MIN;
1503 : let ImageLayerCreationOutcome {
1504 0 : image,
1505 : next_start_key: _,
1506 0 : } = self
1507 0 : .timeline
1508 0 : .create_image_layer_for_rel_blocks(
1509 0 : &keyspace,
1510 0 : image_layer_writer,
1511 0 : lsn,
1512 0 : ctx,
1513 0 : key_range.clone(),
1514 0 : start,
1515 0 : )
1516 0 : .await?;
1517 :
1518 0 : if let Some(image_layer) = image {
1519 0 : self.new_images.push(image_layer);
1520 0 : }
1521 :
1522 0 : timer.stop_and_record();
1523 0 :
1524 0 : Ok(())
1525 0 : }
1526 : }
1527 :
1528 : impl CompactionRequestContext for crate::context::RequestContext {}
1529 :
1530 : #[derive(Debug, Clone)]
1531 : pub struct OwnArc<T>(pub Arc<T>);
1532 :
1533 : impl<T> Deref for OwnArc<T> {
1534 : type Target = <Arc<T> as Deref>::Target;
1535 0 : fn deref(&self) -> &Self::Target {
1536 0 : &self.0
1537 0 : }
1538 : }
1539 :
1540 : impl<T> AsRef<T> for OwnArc<T> {
1541 0 : fn as_ref(&self) -> &T {
1542 0 : self.0.as_ref()
1543 0 : }
1544 : }
1545 :
1546 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1547 0 : fn key_range(&self) -> &Range<Key> {
1548 0 : &self.key_range
1549 0 : }
1550 0 : fn lsn_range(&self) -> &Range<Lsn> {
1551 0 : &self.lsn_range
1552 0 : }
1553 0 : fn file_size(&self) -> u64 {
1554 0 : self.file_size
1555 0 : }
1556 0 : fn short_id(&self) -> std::string::String {
1557 0 : self.as_ref().short_id().to_string()
1558 0 : }
1559 0 : fn is_delta(&self) -> bool {
1560 0 : self.as_ref().is_delta()
1561 0 : }
1562 : }
1563 :
1564 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1565 0 : fn key_range(&self) -> &Range<Key> {
1566 0 : &self.layer_desc().key_range
1567 0 : }
1568 0 : fn lsn_range(&self) -> &Range<Lsn> {
1569 0 : &self.layer_desc().lsn_range
1570 0 : }
1571 0 : fn file_size(&self) -> u64 {
1572 0 : self.layer_desc().file_size
1573 0 : }
1574 0 : fn short_id(&self) -> std::string::String {
1575 0 : self.layer_desc().short_id().to_string()
1576 0 : }
1577 0 : fn is_delta(&self) -> bool {
1578 0 : true
1579 0 : }
1580 : }
1581 :
1582 : use crate::tenant::timeline::DeltaEntry;
1583 :
1584 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1585 0 : fn key_range(&self) -> &Range<Key> {
1586 0 : &self.0.layer_desc().key_range
1587 0 : }
1588 0 : fn lsn_range(&self) -> &Range<Lsn> {
1589 0 : &self.0.layer_desc().lsn_range
1590 0 : }
1591 0 : fn file_size(&self) -> u64 {
1592 0 : self.0.layer_desc().file_size
1593 0 : }
1594 0 : fn short_id(&self) -> std::string::String {
1595 0 : self.0.layer_desc().short_id().to_string()
1596 0 : }
1597 0 : fn is_delta(&self) -> bool {
1598 0 : true
1599 0 : }
1600 : }
1601 :
1602 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1603 : type DeltaEntry<'a> = DeltaEntry<'a>;
1604 :
1605 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1606 0 : self.0.load_keys(ctx).await
1607 0 : }
1608 : }
1609 :
1610 : impl CompactionLayer<Key> for ResidentImageLayer {
1611 0 : fn key_range(&self) -> &Range<Key> {
1612 0 : &self.0.layer_desc().key_range
1613 0 : }
1614 0 : fn lsn_range(&self) -> &Range<Lsn> {
1615 0 : &self.0.layer_desc().lsn_range
1616 0 : }
1617 0 : fn file_size(&self) -> u64 {
1618 0 : self.0.layer_desc().file_size
1619 0 : }
1620 0 : fn short_id(&self) -> std::string::String {
1621 0 : self.0.layer_desc().short_id().to_string()
1622 0 : }
1623 0 : fn is_delta(&self) -> bool {
1624 0 : false
1625 0 : }
1626 : }
1627 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|