Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, Context};
18 : use enumset::EnumSet;
19 : use fail::fail_point;
20 : use itertools::Itertools;
21 : use pageserver_api::keyspace::ShardedRange;
22 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, info, info_span, trace, warn, Instrument};
25 : use utils::id::TimelineId;
26 :
27 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
28 : use crate::page_cache;
29 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
30 : use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
31 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
32 : use crate::tenant::timeline::{Layer, ResidentLayer};
33 : use crate::tenant::DeltaLayer;
34 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
35 :
36 : use crate::keyspace::KeySpace;
37 : use crate::repository::Key;
38 :
39 : use utils::lsn::Lsn;
40 :
41 : use pageserver_compaction::helpers::overlaps_with;
42 : use pageserver_compaction::interface::*;
43 :
44 : use super::CompactionError;
45 :
46 : impl Timeline {
47 : /// TODO: cancellation
48 364 : pub(crate) async fn compact_legacy(
49 364 : self: &Arc<Self>,
50 364 : cancel: &CancellationToken,
51 364 : flags: EnumSet<CompactFlags>,
52 364 : ctx: &RequestContext,
53 364 : ) -> Result<(), CompactionError> {
54 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
55 0 : return self.compact_with_gc(cancel, ctx).await;
56 364 : }
57 364 :
58 364 : // High level strategy for compaction / image creation:
59 364 : //
60 364 : // 1. First, calculate the desired "partitioning" of the
61 364 : // currently in-use key space. The goal is to partition the
62 364 : // key space into roughly fixed-size chunks, but also take into
63 364 : // account any existing image layers, and try to align the
64 364 : // chunk boundaries with the existing image layers to avoid
65 364 : // too much churn. Also try to align chunk boundaries with
66 364 : // relation boundaries. In principle, we don't know about
67 364 : // relation boundaries here, we just deal with key-value
68 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
69 364 : // map relations into key-value pairs. But in practice we know
70 364 : // that 'field6' is the block number, and the fields 1-5
71 364 : // identify a relation. This is just an optimization,
72 364 : // though.
73 364 : //
74 364 : // 2. Once we know the partitioning, for each partition,
75 364 : // decide if it's time to create a new image layer. The
76 364 : // criteria is: there has been too much "churn" since the last
77 364 : // image layer? The "churn" is fuzzy concept, it's a
78 364 : // combination of too many delta files, or too much WAL in
79 364 : // total in the delta file. Or perhaps: if creating an image
80 364 : // file would allow to delete some older files.
81 364 : //
82 364 : // 3. After that, we compact all level0 delta files if there
83 364 : // are too many of them. While compacting, we also garbage
84 364 : // collect any page versions that are no longer needed because
85 364 : // of the new image layers we created in step 2.
86 364 : //
87 364 : // TODO: This high level strategy hasn't been implemented yet.
88 364 : // Below are functions compact_level0() and create_image_layers()
89 364 : // but they are a bit ad hoc and don't quite work like it's explained
90 364 : // above. Rewrite it.
91 364 :
92 364 : // Is the timeline being deleted?
93 364 : if self.is_stopping() {
94 0 : trace!("Dropping out of compaction on timeline shutdown");
95 0 : return Err(CompactionError::ShuttingDown);
96 364 : }
97 364 :
98 364 : let target_file_size = self.get_checkpoint_distance();
99 :
100 : // Define partitioning schema if needed
101 :
102 : // FIXME: the match should only cover repartitioning, not the next steps
103 364 : let partition_count = match self
104 364 : .repartition(
105 364 : self.get_last_record_lsn(),
106 364 : self.get_compaction_target_size(),
107 364 : flags,
108 364 : ctx,
109 364 : )
110 13852 : .await
111 : {
112 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
113 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
114 364 : let image_ctx = RequestContextBuilder::extend(ctx)
115 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
116 364 : .build();
117 364 :
118 364 : // 2. Compact
119 364 : let timer = self.metrics.compact_time_histo.start_timer();
120 42660 : self.compact_level0(target_file_size, ctx).await?;
121 364 : timer.stop_and_record();
122 364 :
123 364 : // 3. Create new image layers for partitions that have been modified
124 364 : // "enough".
125 364 : let mut partitioning = dense_partitioning;
126 364 : partitioning
127 364 : .parts
128 364 : .extend(sparse_partitioning.into_dense().parts);
129 364 : let image_layers = self
130 364 : .create_image_layers(
131 364 : &partitioning,
132 364 : lsn,
133 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
134 14 : ImageLayerCreationMode::Force
135 : } else {
136 350 : ImageLayerCreationMode::Try
137 : },
138 364 : &image_ctx,
139 : )
140 14490 : .await?;
141 :
142 364 : self.upload_new_image_layers(image_layers)?;
143 364 : partitioning.parts.len()
144 : }
145 0 : Err(err) => {
146 0 : // no partitioning? This is normal, if the timeline was just created
147 0 : // as an empty timeline. Also in unit tests, when we use the timeline
148 0 : // as a simple key-value store, ignoring the datadir layout. Log the
149 0 : // error but continue.
150 0 : //
151 0 : // Suppress error when it's due to cancellation
152 0 : if !self.cancel.is_cancelled() {
153 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
154 0 : }
155 0 : 1
156 : }
157 : };
158 :
159 364 : if self.shard_identity.count >= ShardCount::new(2) {
160 : // Limit the number of layer rewrites to the number of partitions: this means its
161 : // runtime should be comparable to a full round of image layer creations, rather than
162 : // being potentially much longer.
163 0 : let rewrite_max = partition_count;
164 0 :
165 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
166 364 : }
167 :
168 364 : Ok(())
169 364 : }
170 :
171 : /// Check for layers that are elegible to be rewritten:
172 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
173 : /// we don't indefinitely retain keys in this shard that aren't needed.
174 : /// - For future use: layers beyond pitr_interval that are in formats we would
175 : /// rather not maintain compatibility with indefinitely.
176 : ///
177 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
178 : /// how much work it will try to do in each compaction pass.
179 0 : async fn compact_shard_ancestors(
180 0 : self: &Arc<Self>,
181 0 : rewrite_max: usize,
182 0 : ctx: &RequestContext,
183 0 : ) -> anyhow::Result<()> {
184 0 : let mut drop_layers = Vec::new();
185 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
186 0 :
187 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
188 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
189 0 : // pitr_interval, for example because a branchpoint references it.
190 0 : //
191 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
192 0 : // are rewriting layers.
193 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
194 0 :
195 0 : tracing::info!(
196 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
197 0 : *latest_gc_cutoff,
198 0 : self.gc_info.read().unwrap().cutoffs.pitr
199 : );
200 :
201 0 : let layers = self.layers.read().await;
202 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
203 0 : let layer = layers.get_from_desc(&layer_desc);
204 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
205 : // This layer does not belong to a historic ancestor, no need to re-image it.
206 0 : continue;
207 0 : }
208 0 :
209 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
210 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
211 0 : let layer_local_page_count = sharded_range.page_count();
212 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
213 0 : if layer_local_page_count == 0 {
214 : // This ancestral layer only covers keys that belong to other shards.
215 : // We include the full metadata in the log: if we had some critical bug that caused
216 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
217 0 : info!(%layer, old_metadata=?layer.metadata(),
218 0 : "dropping layer after shard split, contains no keys for this shard.",
219 : );
220 :
221 0 : if cfg!(debug_assertions) {
222 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
223 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
224 : // should be !is_key_disposable()
225 0 : let range = layer_desc.get_key_range();
226 0 : let mut key = range.start;
227 0 : while key < range.end {
228 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
229 0 : key = key.next();
230 : }
231 0 : }
232 :
233 0 : drop_layers.push(layer);
234 0 : continue;
235 0 : } else if layer_local_page_count != u32::MAX
236 0 : && layer_local_page_count == layer_raw_page_count
237 : {
238 0 : debug!(%layer,
239 0 : "layer is entirely shard local ({} keys), no need to filter it",
240 : layer_local_page_count
241 : );
242 0 : continue;
243 0 : }
244 0 :
245 0 : // Don't bother re-writing a layer unless it will at least halve its size
246 0 : if layer_local_page_count != u32::MAX
247 0 : && layer_local_page_count > layer_raw_page_count / 2
248 : {
249 0 : debug!(%layer,
250 0 : "layer is already mostly local ({}/{}), not rewriting",
251 : layer_local_page_count,
252 : layer_raw_page_count
253 : );
254 0 : }
255 :
256 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
257 : // without incurring the I/O cost of a rewrite.
258 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
259 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
260 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
261 0 : continue;
262 0 : }
263 0 :
264 0 : if layer_desc.is_delta() {
265 : // We do not yet implement rewrite of delta layers
266 0 : debug!(%layer, "Skipping rewrite of delta layer");
267 0 : continue;
268 0 : }
269 0 :
270 0 : // Only rewrite layers if their generations differ. This guarantees:
271 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
272 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
273 0 : if layer.metadata().generation == self.generation {
274 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
275 0 : continue;
276 0 : }
277 0 :
278 0 : if layers_to_rewrite.len() >= rewrite_max {
279 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
280 0 : layers_to_rewrite.len()
281 : );
282 0 : continue;
283 0 : }
284 0 :
285 0 : // Fall through: all our conditions for doing a rewrite passed.
286 0 : layers_to_rewrite.push(layer);
287 : }
288 :
289 : // Drop read lock on layer map before we start doing time-consuming I/O
290 0 : drop(layers);
291 0 :
292 0 : let mut replace_image_layers = Vec::new();
293 :
294 0 : for layer in layers_to_rewrite {
295 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
296 0 : let mut image_layer_writer = ImageLayerWriter::new(
297 0 : self.conf,
298 0 : self.timeline_id,
299 0 : self.tenant_shard_id,
300 0 : &layer.layer_desc().key_range,
301 0 : layer.layer_desc().image_layer_lsn(),
302 0 : ctx,
303 0 : )
304 0 : .await?;
305 :
306 : // Safety of layer rewrites:
307 : // - We are writing to a different local file path than we are reading from, so the old Layer
308 : // cannot interfere with the new one.
309 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
310 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
311 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
312 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
313 : // - Any readers that have a reference to the old layer will keep it alive until they are done
314 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
315 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
316 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
317 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
318 : // - ingestion, which only inserts layers, therefore cannot collide with us.
319 0 : let resident = layer.download_and_keep_resident().await?;
320 :
321 0 : let keys_written = resident
322 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
323 0 : .await?;
324 :
325 0 : if keys_written > 0 {
326 0 : let new_layer = image_layer_writer.finish(self, ctx).await?;
327 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
328 0 : layer.metadata().file_size,
329 0 : new_layer.metadata().file_size);
330 :
331 0 : replace_image_layers.push((layer, new_layer));
332 0 : } else {
333 0 : // Drop the old layer. Usually for this case we would already have noticed that
334 0 : // the layer has no data for us with the ShardedRange check above, but
335 0 : drop_layers.push(layer);
336 0 : }
337 : }
338 :
339 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
340 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
341 : // to remote index) and be removed. This is inefficient but safe.
342 : fail::fail_point!("compact-shard-ancestors-localonly");
343 :
344 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
345 0 : self.rewrite_layers(replace_image_layers, drop_layers)
346 0 : .await?;
347 :
348 : fail::fail_point!("compact-shard-ancestors-enqueued");
349 :
350 : // We wait for all uploads to complete before finishing this compaction stage. This is not
351 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
352 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
353 : // load.
354 0 : self.remote_client.wait_completion().await?;
355 :
356 : fail::fail_point!("compact-shard-ancestors-persistent");
357 :
358 0 : Ok(())
359 0 : }
360 :
361 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
362 : /// as Level 1 files.
363 364 : async fn compact_level0(
364 364 : self: &Arc<Self>,
365 364 : target_file_size: u64,
366 364 : ctx: &RequestContext,
367 364 : ) -> Result<(), CompactionError> {
368 : let CompactLevel0Phase1Result {
369 364 : new_layers,
370 364 : deltas_to_compact,
371 : } = {
372 364 : let phase1_span = info_span!("compact_level0_phase1");
373 364 : let ctx = ctx.attached_child();
374 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
375 364 : version: Some(2),
376 364 : tenant_id: Some(self.tenant_shard_id),
377 364 : timeline_id: Some(self.timeline_id),
378 364 : ..Default::default()
379 364 : };
380 364 :
381 364 : let begin = tokio::time::Instant::now();
382 364 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
383 364 : let now = tokio::time::Instant::now();
384 364 : stats.read_lock_acquisition_micros =
385 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
386 364 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
387 364 : .instrument(phase1_span)
388 42660 : .await?
389 : };
390 :
391 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
392 : // nothing to do
393 336 : return Ok(());
394 28 : }
395 28 :
396 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
397 0 : .await?;
398 28 : Ok(())
399 364 : }
400 :
401 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
402 364 : async fn compact_level0_phase1(
403 364 : self: &Arc<Self>,
404 364 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
405 364 : mut stats: CompactLevel0Phase1StatsBuilder,
406 364 : target_file_size: u64,
407 364 : ctx: &RequestContext,
408 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
409 364 : stats.read_lock_held_spawn_blocking_startup_micros =
410 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
411 364 : let layers = guard.layer_map();
412 364 : let level0_deltas = layers.get_level0_deltas()?;
413 364 : let mut level0_deltas = level0_deltas
414 364 : .into_iter()
415 1616 : .map(|x| guard.get_from_desc(&x))
416 364 : .collect_vec();
417 364 : stats.level0_deltas_count = Some(level0_deltas.len());
418 364 : // Only compact if enough layers have accumulated.
419 364 : let threshold = self.get_compaction_threshold();
420 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
421 336 : debug!(
422 0 : level0_deltas = level0_deltas.len(),
423 0 : threshold, "too few deltas to compact"
424 : );
425 336 : return Ok(CompactLevel0Phase1Result::default());
426 28 : }
427 28 :
428 28 : // Gather the files to compact in this iteration.
429 28 : //
430 28 : // Start with the oldest Level 0 delta file, and collect any other
431 28 : // level 0 files that form a contiguous sequence, such that the end
432 28 : // LSN of previous file matches the start LSN of the next file.
433 28 : //
434 28 : // Note that if the files don't form such a sequence, we might
435 28 : // "compact" just a single file. That's a bit pointless, but it allows
436 28 : // us to get rid of the level 0 file, and compact the other files on
437 28 : // the next iteration. This could probably made smarter, but such
438 28 : // "gaps" in the sequence of level 0 files should only happen in case
439 28 : // of a crash, partial download from cloud storage, or something like
440 28 : // that, so it's not a big deal in practice.
441 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
442 28 : let mut level0_deltas_iter = level0_deltas.iter();
443 28 :
444 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
445 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
446 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
447 28 :
448 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
449 402 : for l in level0_deltas_iter {
450 374 : let lsn_range = &l.layer_desc().lsn_range;
451 374 :
452 374 : if lsn_range.start != prev_lsn_end {
453 0 : break;
454 374 : }
455 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
456 374 : prev_lsn_end = lsn_range.end;
457 : }
458 28 : let lsn_range = Range {
459 28 : start: deltas_to_compact
460 28 : .first()
461 28 : .unwrap()
462 28 : .layer_desc()
463 28 : .lsn_range
464 28 : .start,
465 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
466 28 : };
467 28 :
468 28 : info!(
469 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
470 0 : lsn_range.start,
471 0 : lsn_range.end,
472 0 : deltas_to_compact.len(),
473 0 : level0_deltas.len()
474 : );
475 :
476 402 : for l in deltas_to_compact.iter() {
477 402 : info!("compact includes {l}");
478 : }
479 :
480 : // We don't need the original list of layers anymore. Drop it so that
481 : // we don't accidentally use it later in the function.
482 28 : drop(level0_deltas);
483 28 :
484 28 : stats.read_lock_held_prerequisites_micros = stats
485 28 : .read_lock_held_spawn_blocking_startup_micros
486 28 : .till_now();
487 28 :
488 28 : // Determine N largest holes where N is number of compacted layers.
489 28 : let max_holes = deltas_to_compact.len();
490 28 : let last_record_lsn = self.get_last_record_lsn();
491 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
492 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
493 28 :
494 28 : // min-heap (reserve space for one more element added before eviction)
495 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
496 28 : let mut prev: Option<Key> = None;
497 28 :
498 28 : let mut all_keys = Vec::new();
499 :
500 402 : for l in deltas_to_compact.iter() {
501 2413 : all_keys.extend(l.load_keys(ctx).await?);
502 : }
503 :
504 : // FIXME: should spawn_blocking the rest of this function
505 :
506 : // The current stdlib sorting implementation is designed in a way where it is
507 : // particularly fast where the slice is made up of sorted sub-ranges.
508 4431758 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
509 28 :
510 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
511 :
512 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
513 2064038 : if let Some(prev_key) = prev {
514 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
515 : // compaction is the gap between data key and metadata keys.
516 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
517 0 : && !Key::is_metadata_key(&prev_key)
518 : {
519 0 : let key_range = prev_key..next_key;
520 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
521 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
522 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
523 0 : // That is why it is better to measure size of hole as number of covering image layers.
524 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
525 0 : if coverage_size >= min_hole_coverage_size {
526 0 : heap.push(Hole {
527 0 : key_range,
528 0 : coverage_size,
529 0 : });
530 0 : if heap.len() > max_holes {
531 0 : heap.pop(); // remove smallest hole
532 0 : }
533 0 : }
534 2064010 : }
535 28 : }
536 2064038 : prev = Some(next_key.next());
537 : }
538 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
539 28 : drop_rlock(guard);
540 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
541 28 : let mut holes = heap.into_vec();
542 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
543 28 : let mut next_hole = 0; // index of next hole in holes vector
544 28 :
545 28 : // This iterator walks through all key-value pairs from all the layers
546 28 : // we're compacting, in key, LSN order.
547 28 : let all_values_iter = all_keys.iter();
548 28 :
549 28 : // This iterator walks through all keys and is needed to calculate size used by each key
550 28 : let mut all_keys_iter = all_keys
551 28 : .iter()
552 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
553 2064010 : .coalesce(|mut prev, cur| {
554 2064010 : // Coalesce keys that belong to the same key pair.
555 2064010 : // This ensures that compaction doesn't put them
556 2064010 : // into different layer files.
557 2064010 : // Still limit this by the target file size,
558 2064010 : // so that we keep the size of the files in
559 2064010 : // check.
560 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
561 40038 : prev.2 += cur.2;
562 40038 : Ok(prev)
563 : } else {
564 2023972 : Err((prev, cur))
565 : }
566 2064010 : });
567 28 :
568 28 : // Merge the contents of all the input delta layers into a new set
569 28 : // of delta layers, based on the current partitioning.
570 28 : //
571 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
572 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
573 28 : // would be too large. In that case, we also split on the LSN dimension.
574 28 : //
575 28 : // LSN
576 28 : // ^
577 28 : // |
578 28 : // | +-----------+ +--+--+--+--+
579 28 : // | | | | | | | |
580 28 : // | +-----------+ | | | | |
581 28 : // | | | | | | | |
582 28 : // | +-----------+ ==> | | | | |
583 28 : // | | | | | | | |
584 28 : // | +-----------+ | | | | |
585 28 : // | | | | | | | |
586 28 : // | +-----------+ +--+--+--+--+
587 28 : // |
588 28 : // +--------------> key
589 28 : //
590 28 : //
591 28 : // If one key (X) has a lot of page versions:
592 28 : //
593 28 : // LSN
594 28 : // ^
595 28 : // | (X)
596 28 : // | +-----------+ +--+--+--+--+
597 28 : // | | | | | | | |
598 28 : // | +-----------+ | | +--+ |
599 28 : // | | | | | | | |
600 28 : // | +-----------+ ==> | | | | |
601 28 : // | | | | | +--+ |
602 28 : // | +-----------+ | | | | |
603 28 : // | | | | | | | |
604 28 : // | +-----------+ +--+--+--+--+
605 28 : // |
606 28 : // +--------------> key
607 28 : // TODO: this actually divides the layers into fixed-size chunks, not
608 28 : // based on the partitioning.
609 28 : //
610 28 : // TODO: we should also opportunistically materialize and
611 28 : // garbage collect what we can.
612 28 : let mut new_layers = Vec::new();
613 28 : let mut prev_key: Option<Key> = None;
614 28 : let mut writer: Option<DeltaLayerWriter> = None;
615 28 : let mut key_values_total_size = 0u64;
616 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
617 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
618 :
619 : for &DeltaEntry {
620 2064038 : key, lsn, ref val, ..
621 2064066 : } in all_values_iter
622 : {
623 2064038 : let value = val.load(ctx).await?;
624 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
625 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
626 2064038 : if !same_key || lsn == dup_end_lsn {
627 2024000 : let mut next_key_size = 0u64;
628 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
629 2024000 : dup_start_lsn = Lsn::INVALID;
630 2024000 : if !same_key {
631 2024000 : dup_end_lsn = Lsn::INVALID;
632 2024000 : }
633 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
634 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
635 2024000 : next_key_size = next_size;
636 2024000 : if key != next_key {
637 2023972 : if dup_end_lsn.is_valid() {
638 0 : // We are writting segment with duplicates:
639 0 : // place all remaining values of this key in separate segment
640 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
641 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
642 2023972 : }
643 2023972 : break;
644 28 : }
645 28 : key_values_total_size += next_size;
646 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
647 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
648 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
649 : // Split key between multiple layers: such layer can contain only single key
650 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
651 0 : dup_end_lsn // new segment with duplicates starts where old one stops
652 : } else {
653 0 : lsn // start with the first LSN for this key
654 : };
655 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
656 0 : break;
657 28 : }
658 : }
659 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
660 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
661 0 : dup_start_lsn = dup_end_lsn;
662 0 : dup_end_lsn = lsn_range.end;
663 2024000 : }
664 2024000 : if writer.is_some() {
665 2023972 : let written_size = writer.as_mut().unwrap().size();
666 2023972 : let contains_hole =
667 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
668 : // check if key cause layer overflow or contains hole...
669 2023972 : if is_dup_layer
670 2023972 : || dup_end_lsn.is_valid()
671 2023972 : || written_size + key_values_total_size > target_file_size
672 2023692 : || contains_hole
673 : {
674 : // ... if so, flush previous layer and prepare to write new one
675 280 : new_layers.push(
676 280 : writer
677 280 : .take()
678 280 : .unwrap()
679 280 : .finish(prev_key.unwrap().next(), self, ctx)
680 720 : .await?,
681 : );
682 280 : writer = None;
683 280 :
684 280 : if contains_hole {
685 0 : // skip hole
686 0 : next_hole += 1;
687 280 : }
688 2023692 : }
689 28 : }
690 : // Remember size of key value because at next iteration we will access next item
691 2024000 : key_values_total_size = next_key_size;
692 40038 : }
693 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
694 0 : Err(CompactionError::Other(anyhow::anyhow!(
695 0 : "failpoint delta-layer-writer-fail-before-finish"
696 0 : )))
697 2064038 : });
698 :
699 2064038 : if !self.shard_identity.is_key_disposable(&key) {
700 2064038 : if writer.is_none() {
701 308 : // Create writer if not initiaized yet
702 308 : writer = Some(
703 : DeltaLayerWriter::new(
704 308 : self.conf,
705 308 : self.timeline_id,
706 308 : self.tenant_shard_id,
707 308 : key,
708 308 : if dup_end_lsn.is_valid() {
709 : // this is a layer containing slice of values of the same key
710 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
711 0 : dup_start_lsn..dup_end_lsn
712 : } else {
713 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
714 308 : lsn_range.clone()
715 : },
716 308 : ctx,
717 : )
718 154 : .await?,
719 : );
720 2063730 : }
721 :
722 2064038 : writer
723 2064038 : .as_mut()
724 2064038 : .unwrap()
725 2064038 : .put_value(key, lsn, value, ctx)
726 1567 : .await?;
727 : } else {
728 0 : debug!(
729 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
730 0 : key,
731 0 : self.shard_identity.get_shard_number(&key)
732 : );
733 : }
734 :
735 2064038 : if !new_layers.is_empty() {
736 19786 : fail_point!("after-timeline-compacted-first-L1");
737 2044252 : }
738 :
739 2064038 : prev_key = Some(key);
740 : }
741 28 : if let Some(writer) = writer {
742 1993 : new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
743 0 : }
744 :
745 : // Sync layers
746 28 : if !new_layers.is_empty() {
747 : // Print a warning if the created layer is larger than double the target size
748 : // Add two pages for potential overhead. This should in theory be already
749 : // accounted for in the target calculation, but for very small targets,
750 : // we still might easily hit the limit otherwise.
751 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
752 308 : for layer in new_layers.iter() {
753 308 : if layer.layer_desc().file_size > warn_limit {
754 0 : warn!(
755 : %layer,
756 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
757 : );
758 308 : }
759 : }
760 :
761 : // The writer.finish() above already did the fsync of the inodes.
762 : // We just need to fsync the directory in which these inodes are linked,
763 : // which we know to be the timeline directory.
764 : //
765 : // We use fatal_err() below because the after writer.finish() returns with success,
766 : // the in-memory state of the filesystem already has the layer file in its final place,
767 : // and subsequent pageserver code could think it's durable while it really isn't.
768 28 : let timeline_dir = VirtualFile::open(
769 28 : &self
770 28 : .conf
771 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
772 28 : ctx,
773 28 : )
774 14 : .await
775 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
776 28 : timeline_dir
777 28 : .sync_all()
778 14 : .await
779 28 : .fatal_err("VirtualFile::sync_all timeline dir");
780 0 : }
781 :
782 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
783 28 : stats.new_deltas_count = Some(new_layers.len());
784 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
785 28 :
786 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
787 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
788 : {
789 28 : Ok(stats_json) => {
790 28 : info!(
791 0 : stats_json = stats_json.as_str(),
792 0 : "compact_level0_phase1 stats available"
793 : )
794 : }
795 0 : Err(e) => {
796 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
797 : }
798 : }
799 :
800 28 : Ok(CompactLevel0Phase1Result {
801 28 : new_layers,
802 28 : deltas_to_compact: deltas_to_compact
803 28 : .into_iter()
804 402 : .map(|x| x.drop_eviction_guard())
805 28 : .collect::<Vec<_>>(),
806 28 : })
807 364 : }
808 : }
809 :
810 : #[derive(Default)]
811 : struct CompactLevel0Phase1Result {
812 : new_layers: Vec<ResidentLayer>,
813 : deltas_to_compact: Vec<Layer>,
814 : }
815 :
816 : #[derive(Default)]
817 : struct CompactLevel0Phase1StatsBuilder {
818 : version: Option<u64>,
819 : tenant_id: Option<TenantShardId>,
820 : timeline_id: Option<TimelineId>,
821 : read_lock_acquisition_micros: DurationRecorder,
822 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
823 : read_lock_held_key_sort_micros: DurationRecorder,
824 : read_lock_held_prerequisites_micros: DurationRecorder,
825 : read_lock_held_compute_holes_micros: DurationRecorder,
826 : read_lock_drop_micros: DurationRecorder,
827 : write_layer_files_micros: DurationRecorder,
828 : level0_deltas_count: Option<usize>,
829 : new_deltas_count: Option<usize>,
830 : new_deltas_size: Option<u64>,
831 : }
832 :
833 : #[derive(serde::Serialize)]
834 : struct CompactLevel0Phase1Stats {
835 : version: u64,
836 : tenant_id: TenantShardId,
837 : timeline_id: TimelineId,
838 : read_lock_acquisition_micros: RecordedDuration,
839 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
840 : read_lock_held_key_sort_micros: RecordedDuration,
841 : read_lock_held_prerequisites_micros: RecordedDuration,
842 : read_lock_held_compute_holes_micros: RecordedDuration,
843 : read_lock_drop_micros: RecordedDuration,
844 : write_layer_files_micros: RecordedDuration,
845 : level0_deltas_count: usize,
846 : new_deltas_count: usize,
847 : new_deltas_size: u64,
848 : }
849 :
850 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
851 : type Error = anyhow::Error;
852 :
853 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
854 28 : Ok(Self {
855 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
856 28 : tenant_id: value
857 28 : .tenant_id
858 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
859 28 : timeline_id: value
860 28 : .timeline_id
861 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
862 28 : read_lock_acquisition_micros: value
863 28 : .read_lock_acquisition_micros
864 28 : .into_recorded()
865 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
866 28 : read_lock_held_spawn_blocking_startup_micros: value
867 28 : .read_lock_held_spawn_blocking_startup_micros
868 28 : .into_recorded()
869 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
870 28 : read_lock_held_key_sort_micros: value
871 28 : .read_lock_held_key_sort_micros
872 28 : .into_recorded()
873 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
874 28 : read_lock_held_prerequisites_micros: value
875 28 : .read_lock_held_prerequisites_micros
876 28 : .into_recorded()
877 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
878 28 : read_lock_held_compute_holes_micros: value
879 28 : .read_lock_held_compute_holes_micros
880 28 : .into_recorded()
881 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
882 28 : read_lock_drop_micros: value
883 28 : .read_lock_drop_micros
884 28 : .into_recorded()
885 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
886 28 : write_layer_files_micros: value
887 28 : .write_layer_files_micros
888 28 : .into_recorded()
889 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
890 28 : level0_deltas_count: value
891 28 : .level0_deltas_count
892 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
893 28 : new_deltas_count: value
894 28 : .new_deltas_count
895 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
896 28 : new_deltas_size: value
897 28 : .new_deltas_size
898 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
899 : })
900 28 : }
901 : }
902 :
903 : impl Timeline {
904 : /// Entry point for new tiered compaction algorithm.
905 : ///
906 : /// All the real work is in the implementation in the pageserver_compaction
907 : /// crate. The code here would apply to any algorithm implemented by the
908 : /// same interface, but tiered is the only one at the moment.
909 : ///
910 : /// TODO: cancellation
911 0 : pub(crate) async fn compact_tiered(
912 0 : self: &Arc<Self>,
913 0 : _cancel: &CancellationToken,
914 0 : ctx: &RequestContext,
915 0 : ) -> Result<(), CompactionError> {
916 0 : let fanout = self.get_compaction_threshold() as u64;
917 0 : let target_file_size = self.get_checkpoint_distance();
918 :
919 : // Find the top of the historical layers
920 0 : let end_lsn = {
921 0 : let guard = self.layers.read().await;
922 0 : let layers = guard.layer_map();
923 :
924 0 : let l0_deltas = layers.get_level0_deltas()?;
925 0 : drop(guard);
926 0 :
927 0 : // As an optimization, if we find that there are too few L0 layers,
928 0 : // bail out early. We know that the compaction algorithm would do
929 0 : // nothing in that case.
930 0 : if l0_deltas.len() < fanout as usize {
931 : // doesn't need compacting
932 0 : return Ok(());
933 0 : }
934 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
935 0 : };
936 0 :
937 0 : // Is the timeline being deleted?
938 0 : if self.is_stopping() {
939 0 : trace!("Dropping out of compaction on timeline shutdown");
940 0 : return Err(CompactionError::ShuttingDown);
941 0 : }
942 :
943 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
944 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
945 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
946 0 :
947 0 : pageserver_compaction::compact_tiered::compact_tiered(
948 0 : &mut adaptor,
949 0 : end_lsn,
950 0 : target_file_size,
951 0 : fanout,
952 0 : ctx,
953 0 : )
954 0 : .await?;
955 :
956 0 : adaptor.flush_updates().await?;
957 0 : Ok(())
958 0 : }
959 :
960 : /// An experimental compaction building block that combines compaction with garbage collection.
961 : ///
962 : /// The current implementation picks all delta + image layers that are below or intersecting with
963 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
964 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
965 : /// and create delta layers with all deltas >= gc horizon.
966 4 : pub(crate) async fn compact_with_gc(
967 4 : self: &Arc<Self>,
968 4 : _cancel: &CancellationToken,
969 4 : ctx: &RequestContext,
970 4 : ) -> Result<(), CompactionError> {
971 4 : use crate::tenant::storage_layer::ValueReconstructState;
972 4 : use std::collections::BTreeSet;
973 4 :
974 4 : info!("running enhanced gc bottom-most compaction");
975 :
976 : scopeguard::defer! {
977 : info!("done enhanced gc bottom-most compaction");
978 : };
979 :
980 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
981 : // The layer selection has the following properties:
982 : // 1. If a layer is in the selection, all layers below it are in the selection.
983 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
984 4 : let (layer_selection, gc_cutoff) = {
985 4 : let guard = self.layers.read().await;
986 4 : let layers = guard.layer_map();
987 4 : let gc_info = self.gc_info.read().unwrap();
988 4 : if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
989 0 : return Err(CompactionError::Other(anyhow!(
990 0 : "enhanced legacy compaction currently does not support retain_lsns (branches)"
991 0 : )));
992 4 : }
993 4 : let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
994 4 : let mut selected_layers = Vec::new();
995 4 : // TODO: consider retain_lsns
996 4 : drop(gc_info);
997 20 : for desc in layers.iter_historic_layers() {
998 20 : if desc.get_lsn_range().start <= gc_cutoff {
999 16 : selected_layers.push(guard.get_from_desc(&desc));
1000 16 : }
1001 : }
1002 4 : (selected_layers, gc_cutoff)
1003 4 : };
1004 4 : info!(
1005 0 : "picked {} layers for compaction with gc_cutoff={}",
1006 0 : layer_selection.len(),
1007 : gc_cutoff
1008 : );
1009 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1010 : // Also, collect the layer information to decide when to split the new delta layers.
1011 4 : let mut all_key_values = Vec::new();
1012 4 : let mut delta_split_points = BTreeSet::new();
1013 20 : for layer in &layer_selection {
1014 16 : all_key_values.extend(layer.load_key_values(ctx).await?);
1015 16 : let desc = layer.layer_desc();
1016 16 : if desc.is_delta() {
1017 8 : // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
1018 8 : // so that we can avoid having too many small delta layers.
1019 8 : let key_range = desc.get_key_range();
1020 8 : delta_split_points.insert(key_range.start);
1021 8 : delta_split_points.insert(key_range.end);
1022 8 : }
1023 : }
1024 : // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
1025 : // image layers, make image appear before than delta.
1026 : struct ValueWrapper<'a>(&'a crate::repository::Value);
1027 : impl Ord for ValueWrapper<'_> {
1028 0 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1029 0 : use crate::repository::Value;
1030 0 : use std::cmp::Ordering;
1031 0 : match (self.0, other.0) {
1032 0 : (Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
1033 0 : (Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
1034 0 : _ => Ordering::Equal,
1035 : }
1036 0 : }
1037 : }
1038 : impl PartialOrd for ValueWrapper<'_> {
1039 0 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1040 0 : Some(self.cmp(other))
1041 0 : }
1042 : }
1043 : impl PartialEq for ValueWrapper<'_> {
1044 0 : fn eq(&self, other: &Self) -> bool {
1045 0 : self.cmp(other) == std::cmp::Ordering::Equal
1046 0 : }
1047 : }
1048 : impl Eq for ValueWrapper<'_> {}
1049 180 : all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
1050 180 : (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
1051 180 : });
1052 4 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1053 4 : // Data of the same key.
1054 4 : let mut accumulated_values = Vec::new();
1055 4 : let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
1056 :
1057 : /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
1058 72 : async fn flush_accumulated_states(
1059 72 : tline: &Arc<Timeline>,
1060 72 : key: Key,
1061 72 : accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
1062 72 : horizon: Lsn,
1063 72 : ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
1064 72 : let mut base_image = None;
1065 72 : let mut keys_above_horizon = Vec::new();
1066 72 : let mut delta_above_base_image = Vec::new();
1067 : // We have a list of deltas/images. We want to create image layers while collect garbages.
1068 88 : for (key, lsn, val) in accumulated_values.iter().rev() {
1069 88 : if *lsn > horizon {
1070 4 : if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
1071 0 : if *prev_lsn == *lsn {
1072 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1073 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1074 : // drop this delta and keep the image.
1075 : //
1076 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1077 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1078 : // dropped.
1079 0 : continue;
1080 0 : }
1081 4 : }
1082 4 : keys_above_horizon.push((*key, *lsn, val.clone()));
1083 84 : } else if *lsn <= horizon {
1084 84 : match val {
1085 72 : crate::repository::Value::Image(image) => {
1086 72 : base_image = Some((*lsn, image.clone()));
1087 72 : break;
1088 : }
1089 12 : crate::repository::Value::WalRecord(wal) => {
1090 12 : delta_above_base_image.push((*lsn, wal.clone()));
1091 12 : }
1092 : }
1093 0 : }
1094 : }
1095 : // do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
1096 72 : keys_above_horizon.reverse();
1097 72 : let state = ValueReconstructState {
1098 72 : img: base_image,
1099 72 : records: delta_above_base_image,
1100 72 : };
1101 72 : let img = tline.reconstruct_value(key, horizon, state).await?;
1102 72 : Ok((keys_above_horizon, img))
1103 72 : }
1104 :
1105 72 : async fn flush_deltas(
1106 72 : deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
1107 72 : last_key: Key,
1108 72 : delta_split_points: &[Key],
1109 72 : current_delta_split_point: &mut usize,
1110 72 : tline: &Arc<Timeline>,
1111 72 : gc_cutoff: Lsn,
1112 72 : ctx: &RequestContext,
1113 72 : ) -> anyhow::Result<Option<ResidentLayer>> {
1114 72 : // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
1115 72 : // overlapping layers.
1116 72 : //
1117 72 : // If we have a structure like this:
1118 72 : //
1119 72 : // | Delta 1 | | Delta 4 |
1120 72 : // |---------| Delta 2 |---------|
1121 72 : // | Delta 3 | | Delta 5 |
1122 72 : //
1123 72 : // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
1124 72 : // A simple solution here is to split the delta layers using the original boundary, while this
1125 72 : // might produce a lot of small layers. This should be improved and fixed in the future.
1126 72 : let mut need_split = false;
1127 88 : while *current_delta_split_point < delta_split_points.len()
1128 76 : && last_key >= delta_split_points[*current_delta_split_point]
1129 16 : {
1130 16 : *current_delta_split_point += 1;
1131 16 : need_split = true;
1132 16 : }
1133 72 : if !need_split {
1134 56 : return Ok(None);
1135 16 : }
1136 16 : let deltas = std::mem::take(deltas);
1137 16 : if deltas.is_empty() {
1138 12 : return Ok(None);
1139 4 : }
1140 4 : let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
1141 4 : let mut delta_layer_writer = DeltaLayerWriter::new(
1142 4 : tline.conf,
1143 4 : tline.timeline_id,
1144 4 : tline.tenant_shard_id,
1145 4 : deltas.first().unwrap().0,
1146 4 : gc_cutoff..end_lsn,
1147 4 : ctx,
1148 4 : )
1149 2 : .await?;
1150 4 : let key_end = deltas.last().unwrap().0.next();
1151 8 : for (key, lsn, val) in deltas {
1152 4 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1153 : }
1154 10 : let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?;
1155 4 : Ok(Some(delta_layer))
1156 72 : }
1157 :
1158 4 : let mut image_layer_writer = ImageLayerWriter::new(
1159 4 : self.conf,
1160 4 : self.timeline_id,
1161 4 : self.tenant_shard_id,
1162 4 : &(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
1163 4 : gc_cutoff,
1164 4 : ctx,
1165 4 : )
1166 2 : .await?;
1167 :
1168 4 : let mut delta_values = Vec::new();
1169 4 : let delta_split_points = delta_split_points.into_iter().collect_vec();
1170 4 : let mut current_delta_split_point = 0;
1171 4 : let mut delta_layers = Vec::new();
1172 100 : for item @ (key, _, _) in &all_key_values {
1173 96 : if &last_key == key {
1174 28 : accumulated_values.push(item);
1175 28 : } else {
1176 68 : let (deltas, image) =
1177 68 : flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
1178 0 : .await?;
1179 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1180 70 : image_layer_writer.put_image(last_key, image, ctx).await?;
1181 68 : delta_values.extend(deltas);
1182 68 : delta_layers.extend(
1183 68 : flush_deltas(
1184 68 : &mut delta_values,
1185 68 : last_key,
1186 68 : &delta_split_points,
1187 68 : &mut current_delta_split_point,
1188 68 : self,
1189 68 : gc_cutoff,
1190 68 : ctx,
1191 68 : )
1192 12 : .await?,
1193 : );
1194 68 : accumulated_values.clear();
1195 68 : accumulated_values.push(item);
1196 68 : last_key = *key;
1197 : }
1198 : }
1199 :
1200 : // TODO: move this part to the loop body
1201 4 : let (deltas, image) =
1202 4 : flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
1203 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
1204 4 : image_layer_writer.put_image(last_key, image, ctx).await?;
1205 4 : delta_values.extend(deltas);
1206 4 : delta_layers.extend(
1207 4 : flush_deltas(
1208 4 : &mut delta_values,
1209 4 : last_key,
1210 4 : &delta_split_points,
1211 4 : &mut current_delta_split_point,
1212 4 : self,
1213 4 : gc_cutoff,
1214 4 : ctx,
1215 4 : )
1216 0 : .await?,
1217 : );
1218 :
1219 8 : let image_layer = image_layer_writer.finish(self, ctx).await?;
1220 4 : info!(
1221 0 : "produced {} delta layers and {} image layers",
1222 0 : delta_layers.len(),
1223 : 1
1224 : );
1225 4 : let mut compact_to = Vec::new();
1226 4 : compact_to.extend(delta_layers);
1227 4 : compact_to.push(image_layer);
1228 : // Step 3: Place back to the layer map.
1229 : {
1230 4 : let mut guard = self.layers.write().await;
1231 4 : guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
1232 4 : };
1233 4 :
1234 4 : self.remote_client
1235 4 : .schedule_compaction_update(&layer_selection, &compact_to)?;
1236 4 : Ok(())
1237 4 : }
1238 : }
1239 :
1240 : struct TimelineAdaptor {
1241 : timeline: Arc<Timeline>,
1242 :
1243 : keyspace: (Lsn, KeySpace),
1244 :
1245 : new_deltas: Vec<ResidentLayer>,
1246 : new_images: Vec<ResidentLayer>,
1247 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
1248 : }
1249 :
1250 : impl TimelineAdaptor {
1251 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
1252 0 : Self {
1253 0 : timeline: timeline.clone(),
1254 0 : keyspace,
1255 0 : new_images: Vec::new(),
1256 0 : new_deltas: Vec::new(),
1257 0 : layers_to_delete: Vec::new(),
1258 0 : }
1259 0 : }
1260 :
1261 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
1262 0 : let layers_to_delete = {
1263 0 : let guard = self.timeline.layers.read().await;
1264 0 : self.layers_to_delete
1265 0 : .iter()
1266 0 : .map(|x| guard.get_from_desc(x))
1267 0 : .collect::<Vec<Layer>>()
1268 0 : };
1269 0 : self.timeline
1270 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
1271 0 : .await?;
1272 :
1273 0 : self.timeline
1274 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
1275 :
1276 0 : self.new_deltas.clear();
1277 0 : self.layers_to_delete.clear();
1278 0 : Ok(())
1279 0 : }
1280 : }
1281 :
1282 : #[derive(Clone)]
1283 : struct ResidentDeltaLayer(ResidentLayer);
1284 : #[derive(Clone)]
1285 : struct ResidentImageLayer(ResidentLayer);
1286 :
1287 : impl CompactionJobExecutor for TimelineAdaptor {
1288 : type Key = crate::repository::Key;
1289 :
1290 : type Layer = OwnArc<PersistentLayerDesc>;
1291 : type DeltaLayer = ResidentDeltaLayer;
1292 : type ImageLayer = ResidentImageLayer;
1293 :
1294 : type RequestContext = crate::context::RequestContext;
1295 :
1296 0 : fn get_shard_identity(&self) -> &ShardIdentity {
1297 0 : self.timeline.get_shard_identity()
1298 0 : }
1299 :
1300 0 : async fn get_layers(
1301 0 : &mut self,
1302 0 : key_range: &Range<Key>,
1303 0 : lsn_range: &Range<Lsn>,
1304 0 : _ctx: &RequestContext,
1305 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
1306 0 : self.flush_updates().await?;
1307 :
1308 0 : let guard = self.timeline.layers.read().await;
1309 0 : let layer_map = guard.layer_map();
1310 0 :
1311 0 : let result = layer_map
1312 0 : .iter_historic_layers()
1313 0 : .filter(|l| {
1314 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
1315 0 : })
1316 0 : .map(OwnArc)
1317 0 : .collect();
1318 0 : Ok(result)
1319 0 : }
1320 :
1321 0 : async fn get_keyspace(
1322 0 : &mut self,
1323 0 : key_range: &Range<Key>,
1324 0 : lsn: Lsn,
1325 0 : _ctx: &RequestContext,
1326 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
1327 0 : if lsn == self.keyspace.0 {
1328 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
1329 0 : &self.keyspace.1.ranges,
1330 0 : key_range,
1331 0 : ))
1332 : } else {
1333 : // The current compaction implementatin only ever requests the key space
1334 : // at the compaction end LSN.
1335 0 : anyhow::bail!("keyspace not available for requested lsn");
1336 : }
1337 0 : }
1338 :
1339 0 : async fn downcast_delta_layer(
1340 0 : &self,
1341 0 : layer: &OwnArc<PersistentLayerDesc>,
1342 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
1343 0 : // this is a lot more complex than a simple downcast...
1344 0 : if layer.is_delta() {
1345 0 : let l = {
1346 0 : let guard = self.timeline.layers.read().await;
1347 0 : guard.get_from_desc(layer)
1348 : };
1349 0 : let result = l.download_and_keep_resident().await?;
1350 :
1351 0 : Ok(Some(ResidentDeltaLayer(result)))
1352 : } else {
1353 0 : Ok(None)
1354 : }
1355 0 : }
1356 :
1357 0 : async fn create_image(
1358 0 : &mut self,
1359 0 : lsn: Lsn,
1360 0 : key_range: &Range<Key>,
1361 0 : ctx: &RequestContext,
1362 0 : ) -> anyhow::Result<()> {
1363 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
1364 0 : }
1365 :
1366 0 : async fn create_delta(
1367 0 : &mut self,
1368 0 : lsn_range: &Range<Lsn>,
1369 0 : key_range: &Range<Key>,
1370 0 : input_layers: &[ResidentDeltaLayer],
1371 0 : ctx: &RequestContext,
1372 0 : ) -> anyhow::Result<()> {
1373 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1374 :
1375 0 : let mut all_entries = Vec::new();
1376 0 : for dl in input_layers.iter() {
1377 0 : all_entries.extend(dl.load_keys(ctx).await?);
1378 : }
1379 :
1380 : // The current stdlib sorting implementation is designed in a way where it is
1381 : // particularly fast where the slice is made up of sorted sub-ranges.
1382 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1383 :
1384 0 : let mut writer = DeltaLayerWriter::new(
1385 0 : self.timeline.conf,
1386 0 : self.timeline.timeline_id,
1387 0 : self.timeline.tenant_shard_id,
1388 0 : key_range.start,
1389 0 : lsn_range.clone(),
1390 0 : ctx,
1391 0 : )
1392 0 : .await?;
1393 :
1394 0 : let mut dup_values = 0;
1395 0 :
1396 0 : // This iterator walks through all key-value pairs from all the layers
1397 0 : // we're compacting, in key, LSN order.
1398 0 : let mut prev: Option<(Key, Lsn)> = None;
1399 : for &DeltaEntry {
1400 0 : key, lsn, ref val, ..
1401 0 : } in all_entries.iter()
1402 : {
1403 0 : if prev == Some((key, lsn)) {
1404 : // This is a duplicate. Skip it.
1405 : //
1406 : // It can happen if compaction is interrupted after writing some
1407 : // layers but not all, and we are compacting the range again.
1408 : // The calculations in the algorithm assume that there are no
1409 : // duplicates, so the math on targeted file size is likely off,
1410 : // and we will create smaller files than expected.
1411 0 : dup_values += 1;
1412 0 : continue;
1413 0 : }
1414 :
1415 0 : let value = val.load(ctx).await?;
1416 :
1417 0 : writer.put_value(key, lsn, value, ctx).await?;
1418 :
1419 0 : prev = Some((key, lsn));
1420 : }
1421 :
1422 0 : if dup_values > 0 {
1423 0 : warn!("delta layer created with {} duplicate values", dup_values);
1424 0 : }
1425 :
1426 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1427 0 : Err(anyhow::anyhow!(
1428 0 : "failpoint delta-layer-writer-fail-before-finish"
1429 0 : ))
1430 0 : });
1431 :
1432 0 : let new_delta_layer = writer
1433 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
1434 0 : .await?;
1435 :
1436 0 : self.new_deltas.push(new_delta_layer);
1437 0 : Ok(())
1438 0 : }
1439 :
1440 0 : async fn delete_layer(
1441 0 : &mut self,
1442 0 : layer: &OwnArc<PersistentLayerDesc>,
1443 0 : _ctx: &RequestContext,
1444 0 : ) -> anyhow::Result<()> {
1445 0 : self.layers_to_delete.push(layer.clone().0);
1446 0 : Ok(())
1447 0 : }
1448 : }
1449 :
1450 : impl TimelineAdaptor {
1451 0 : async fn create_image_impl(
1452 0 : &mut self,
1453 0 : lsn: Lsn,
1454 0 : key_range: &Range<Key>,
1455 0 : ctx: &RequestContext,
1456 0 : ) -> Result<(), CreateImageLayersError> {
1457 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
1458 :
1459 0 : let image_layer_writer = ImageLayerWriter::new(
1460 0 : self.timeline.conf,
1461 0 : self.timeline.timeline_id,
1462 0 : self.timeline.tenant_shard_id,
1463 0 : key_range,
1464 0 : lsn,
1465 0 : ctx,
1466 0 : )
1467 0 : .await?;
1468 :
1469 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1470 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
1471 0 : "failpoint image-layer-writer-fail-before-finish"
1472 0 : )))
1473 0 : });
1474 :
1475 0 : let keyspace = KeySpace {
1476 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
1477 : };
1478 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
1479 0 : let start = Key::MIN;
1480 : let ImageLayerCreationOutcome {
1481 0 : image,
1482 : next_start_key: _,
1483 0 : } = self
1484 0 : .timeline
1485 0 : .create_image_layer_for_rel_blocks(
1486 0 : &keyspace,
1487 0 : image_layer_writer,
1488 0 : lsn,
1489 0 : ctx,
1490 0 : key_range.clone(),
1491 0 : start,
1492 0 : )
1493 0 : .await?;
1494 :
1495 0 : if let Some(image_layer) = image {
1496 0 : self.new_images.push(image_layer);
1497 0 : }
1498 :
1499 0 : timer.stop_and_record();
1500 0 :
1501 0 : Ok(())
1502 0 : }
1503 : }
1504 :
1505 : impl CompactionRequestContext for crate::context::RequestContext {}
1506 :
1507 : #[derive(Debug, Clone)]
1508 : pub struct OwnArc<T>(pub Arc<T>);
1509 :
1510 : impl<T> Deref for OwnArc<T> {
1511 : type Target = <Arc<T> as Deref>::Target;
1512 0 : fn deref(&self) -> &Self::Target {
1513 0 : &self.0
1514 0 : }
1515 : }
1516 :
1517 : impl<T> AsRef<T> for OwnArc<T> {
1518 0 : fn as_ref(&self) -> &T {
1519 0 : self.0.as_ref()
1520 0 : }
1521 : }
1522 :
1523 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1524 0 : fn key_range(&self) -> &Range<Key> {
1525 0 : &self.key_range
1526 0 : }
1527 0 : fn lsn_range(&self) -> &Range<Lsn> {
1528 0 : &self.lsn_range
1529 0 : }
1530 0 : fn file_size(&self) -> u64 {
1531 0 : self.file_size
1532 0 : }
1533 0 : fn short_id(&self) -> std::string::String {
1534 0 : self.as_ref().short_id().to_string()
1535 0 : }
1536 0 : fn is_delta(&self) -> bool {
1537 0 : self.as_ref().is_delta()
1538 0 : }
1539 : }
1540 :
1541 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1542 0 : fn key_range(&self) -> &Range<Key> {
1543 0 : &self.layer_desc().key_range
1544 0 : }
1545 0 : fn lsn_range(&self) -> &Range<Lsn> {
1546 0 : &self.layer_desc().lsn_range
1547 0 : }
1548 0 : fn file_size(&self) -> u64 {
1549 0 : self.layer_desc().file_size
1550 0 : }
1551 0 : fn short_id(&self) -> std::string::String {
1552 0 : self.layer_desc().short_id().to_string()
1553 0 : }
1554 0 : fn is_delta(&self) -> bool {
1555 0 : true
1556 0 : }
1557 : }
1558 :
1559 : use crate::tenant::timeline::DeltaEntry;
1560 :
1561 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1562 0 : fn key_range(&self) -> &Range<Key> {
1563 0 : &self.0.layer_desc().key_range
1564 0 : }
1565 0 : fn lsn_range(&self) -> &Range<Lsn> {
1566 0 : &self.0.layer_desc().lsn_range
1567 0 : }
1568 0 : fn file_size(&self) -> u64 {
1569 0 : self.0.layer_desc().file_size
1570 0 : }
1571 0 : fn short_id(&self) -> std::string::String {
1572 0 : self.0.layer_desc().short_id().to_string()
1573 0 : }
1574 0 : fn is_delta(&self) -> bool {
1575 0 : true
1576 0 : }
1577 : }
1578 :
1579 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1580 : type DeltaEntry<'a> = DeltaEntry<'a>;
1581 :
1582 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1583 0 : self.0.load_keys(ctx).await
1584 0 : }
1585 : }
1586 :
1587 : impl CompactionLayer<Key> for ResidentImageLayer {
1588 0 : fn key_range(&self) -> &Range<Key> {
1589 0 : &self.0.layer_desc().key_range
1590 0 : }
1591 0 : fn lsn_range(&self) -> &Range<Lsn> {
1592 0 : &self.0.layer_desc().lsn_range
1593 0 : }
1594 0 : fn file_size(&self) -> u64 {
1595 0 : self.0.layer_desc().file_size
1596 0 : }
1597 0 : fn short_id(&self) -> std::string::String {
1598 0 : self.0.layer_desc().short_id().to_string()
1599 0 : }
1600 0 : fn is_delta(&self) -> bool {
1601 0 : false
1602 0 : }
1603 : }
1604 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|