Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, Context};
18 : use enumset::EnumSet;
19 : use fail::fail_point;
20 : use itertools::Itertools;
21 : use pageserver_api::keyspace::ShardedRange;
22 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, info, info_span, trace, warn, Instrument};
25 : use utils::id::TimelineId;
26 :
27 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
28 : use crate::page_cache;
29 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
30 : use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
31 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
32 : use crate::tenant::timeline::{Layer, ResidentLayer};
33 : use crate::tenant::DeltaLayer;
34 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
35 :
36 : use crate::keyspace::KeySpace;
37 : use crate::repository::Key;
38 :
39 : use utils::lsn::Lsn;
40 :
41 : use pageserver_compaction::helpers::overlaps_with;
42 : use pageserver_compaction::interface::*;
43 :
44 : use super::CompactionError;
45 :
46 : impl Timeline {
47 : /// TODO: cancellation
48 364 : pub(crate) async fn compact_legacy(
49 364 : self: &Arc<Self>,
50 364 : _cancel: &CancellationToken,
51 364 : flags: EnumSet<CompactFlags>,
52 364 : ctx: &RequestContext,
53 364 : ) -> Result<(), CompactionError> {
54 364 : // High level strategy for compaction / image creation:
55 364 : //
56 364 : // 1. First, calculate the desired "partitioning" of the
57 364 : // currently in-use key space. The goal is to partition the
58 364 : // key space into roughly fixed-size chunks, but also take into
59 364 : // account any existing image layers, and try to align the
60 364 : // chunk boundaries with the existing image layers to avoid
61 364 : // too much churn. Also try to align chunk boundaries with
62 364 : // relation boundaries. In principle, we don't know about
63 364 : // relation boundaries here, we just deal with key-value
64 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
65 364 : // map relations into key-value pairs. But in practice we know
66 364 : // that 'field6' is the block number, and the fields 1-5
67 364 : // identify a relation. This is just an optimization,
68 364 : // though.
69 364 : //
70 364 : // 2. Once we know the partitioning, for each partition,
71 364 : // decide if it's time to create a new image layer. The
72 364 : // criteria is: there has been too much "churn" since the last
73 364 : // image layer? The "churn" is fuzzy concept, it's a
74 364 : // combination of too many delta files, or too much WAL in
75 364 : // total in the delta file. Or perhaps: if creating an image
76 364 : // file would allow to delete some older files.
77 364 : //
78 364 : // 3. After that, we compact all level0 delta files if there
79 364 : // are too many of them. While compacting, we also garbage
80 364 : // collect any page versions that are no longer needed because
81 364 : // of the new image layers we created in step 2.
82 364 : //
83 364 : // TODO: This high level strategy hasn't been implemented yet.
84 364 : // Below are functions compact_level0() and create_image_layers()
85 364 : // but they are a bit ad hoc and don't quite work like it's explained
86 364 : // above. Rewrite it.
87 364 :
88 364 : // Is the timeline being deleted?
89 364 : if self.is_stopping() {
90 0 : trace!("Dropping out of compaction on timeline shutdown");
91 0 : return Err(CompactionError::ShuttingDown);
92 364 : }
93 364 :
94 364 : let target_file_size = self.get_checkpoint_distance();
95 :
96 : // Define partitioning schema if needed
97 :
98 : // FIXME: the match should only cover repartitioning, not the next steps
99 364 : let partition_count = match self
100 364 : .repartition(
101 364 : self.get_last_record_lsn(),
102 364 : self.get_compaction_target_size(),
103 364 : flags,
104 364 : ctx,
105 364 : )
106 14061 : .await
107 : {
108 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
109 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
110 364 : let image_ctx = RequestContextBuilder::extend(ctx)
111 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
112 364 : .build();
113 364 :
114 364 : // 2. Compact
115 364 : let timer = self.metrics.compact_time_histo.start_timer();
116 42653 : self.compact_level0(target_file_size, ctx).await?;
117 364 : timer.stop_and_record();
118 364 :
119 364 : // 3. Create new image layers for partitions that have been modified
120 364 : // "enough".
121 364 : let mut partitioning = dense_partitioning;
122 364 : partitioning
123 364 : .parts
124 364 : .extend(sparse_partitioning.into_dense().parts);
125 364 : let image_layers = self
126 364 : .create_image_layers(
127 364 : &partitioning,
128 364 : lsn,
129 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
130 14 : ImageLayerCreationMode::Force
131 : } else {
132 350 : ImageLayerCreationMode::Try
133 : },
134 364 : &image_ctx,
135 : )
136 14474 : .await?;
137 :
138 364 : self.upload_new_image_layers(image_layers)?;
139 364 : partitioning.parts.len()
140 : }
141 0 : Err(err) => {
142 0 : // no partitioning? This is normal, if the timeline was just created
143 0 : // as an empty timeline. Also in unit tests, when we use the timeline
144 0 : // as a simple key-value store, ignoring the datadir layout. Log the
145 0 : // error but continue.
146 0 : //
147 0 : // Suppress error when it's due to cancellation
148 0 : if !self.cancel.is_cancelled() {
149 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
150 0 : }
151 0 : 1
152 : }
153 : };
154 :
155 364 : if self.shard_identity.count >= ShardCount::new(2) {
156 : // Limit the number of layer rewrites to the number of partitions: this means its
157 : // runtime should be comparable to a full round of image layer creations, rather than
158 : // being potentially much longer.
159 0 : let rewrite_max = partition_count;
160 0 :
161 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
162 364 : }
163 :
164 364 : Ok(())
165 364 : }
166 :
167 : /// Check for layers that are elegible to be rewritten:
168 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
169 : /// we don't indefinitely retain keys in this shard that aren't needed.
170 : /// - For future use: layers beyond pitr_interval that are in formats we would
171 : /// rather not maintain compatibility with indefinitely.
172 : ///
173 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
174 : /// how much work it will try to do in each compaction pass.
175 0 : async fn compact_shard_ancestors(
176 0 : self: &Arc<Self>,
177 0 : rewrite_max: usize,
178 0 : ctx: &RequestContext,
179 0 : ) -> anyhow::Result<()> {
180 0 : let mut drop_layers = Vec::new();
181 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
182 0 :
183 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
184 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
185 0 : // pitr_interval, for example because a branchpoint references it.
186 0 : //
187 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
188 0 : // are rewriting layers.
189 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
190 0 :
191 0 : tracing::info!(
192 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
193 0 : *latest_gc_cutoff,
194 0 : self.gc_info.read().unwrap().cutoffs.pitr
195 : );
196 :
197 0 : let layers = self.layers.read().await;
198 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
199 0 : let layer = layers.get_from_desc(&layer_desc);
200 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
201 : // This layer does not belong to a historic ancestor, no need to re-image it.
202 0 : continue;
203 0 : }
204 0 :
205 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
206 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
207 0 : let layer_local_page_count = sharded_range.page_count();
208 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
209 0 : if layer_local_page_count == 0 {
210 : // This ancestral layer only covers keys that belong to other shards.
211 : // We include the full metadata in the log: if we had some critical bug that caused
212 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
213 0 : info!(%layer, old_metadata=?layer.metadata(),
214 0 : "dropping layer after shard split, contains no keys for this shard.",
215 : );
216 :
217 0 : if cfg!(debug_assertions) {
218 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
219 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
220 : // should be !is_key_disposable()
221 0 : let range = layer_desc.get_key_range();
222 0 : let mut key = range.start;
223 0 : while key < range.end {
224 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
225 0 : key = key.next();
226 : }
227 0 : }
228 :
229 0 : drop_layers.push(layer);
230 0 : continue;
231 0 : } else if layer_local_page_count != u32::MAX
232 0 : && layer_local_page_count == layer_raw_page_count
233 : {
234 0 : debug!(%layer,
235 0 : "layer is entirely shard local ({} keys), no need to filter it",
236 : layer_local_page_count
237 : );
238 0 : continue;
239 0 : }
240 0 :
241 0 : // Don't bother re-writing a layer unless it will at least halve its size
242 0 : if layer_local_page_count != u32::MAX
243 0 : && layer_local_page_count > layer_raw_page_count / 2
244 : {
245 0 : debug!(%layer,
246 0 : "layer is already mostly local ({}/{}), not rewriting",
247 : layer_local_page_count,
248 : layer_raw_page_count
249 : );
250 0 : }
251 :
252 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
253 : // without incurring the I/O cost of a rewrite.
254 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
255 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
256 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
257 0 : continue;
258 0 : }
259 0 :
260 0 : if layer_desc.is_delta() {
261 : // We do not yet implement rewrite of delta layers
262 0 : debug!(%layer, "Skipping rewrite of delta layer");
263 0 : continue;
264 0 : }
265 0 :
266 0 : // Only rewrite layers if their generations differ. This guarantees:
267 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
268 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
269 0 : if layer.metadata().generation == self.generation {
270 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
271 0 : continue;
272 0 : }
273 0 :
274 0 : if layers_to_rewrite.len() >= rewrite_max {
275 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
276 0 : layers_to_rewrite.len()
277 : );
278 0 : continue;
279 0 : }
280 0 :
281 0 : // Fall through: all our conditions for doing a rewrite passed.
282 0 : layers_to_rewrite.push(layer);
283 : }
284 :
285 : // Drop read lock on layer map before we start doing time-consuming I/O
286 0 : drop(layers);
287 0 :
288 0 : let mut replace_image_layers = Vec::new();
289 :
290 0 : for layer in layers_to_rewrite {
291 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
292 0 : let mut image_layer_writer = ImageLayerWriter::new(
293 0 : self.conf,
294 0 : self.timeline_id,
295 0 : self.tenant_shard_id,
296 0 : &layer.layer_desc().key_range,
297 0 : layer.layer_desc().image_layer_lsn(),
298 0 : ctx,
299 0 : )
300 0 : .await?;
301 :
302 : // Safety of layer rewrites:
303 : // - We are writing to a different local file path than we are reading from, so the old Layer
304 : // cannot interfere with the new one.
305 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
306 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
307 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
308 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
309 : // - Any readers that have a reference to the old layer will keep it alive until they are done
310 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
311 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
312 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
313 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
314 : // - ingestion, which only inserts layers, therefore cannot collide with us.
315 0 : let resident = layer.download_and_keep_resident().await?;
316 :
317 0 : let keys_written = resident
318 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
319 0 : .await?;
320 :
321 0 : if keys_written > 0 {
322 0 : let new_layer = image_layer_writer.finish(self, ctx).await?;
323 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
324 0 : layer.metadata().file_size,
325 0 : new_layer.metadata().file_size);
326 :
327 0 : replace_image_layers.push((layer, new_layer));
328 0 : } else {
329 0 : // Drop the old layer. Usually for this case we would already have noticed that
330 0 : // the layer has no data for us with the ShardedRange check above, but
331 0 : drop_layers.push(layer);
332 0 : }
333 : }
334 :
335 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
336 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
337 : // to remote index) and be removed. This is inefficient but safe.
338 : fail::fail_point!("compact-shard-ancestors-localonly");
339 :
340 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
341 0 : self.rewrite_layers(replace_image_layers, drop_layers)
342 0 : .await?;
343 :
344 : fail::fail_point!("compact-shard-ancestors-enqueued");
345 :
346 : // We wait for all uploads to complete before finishing this compaction stage. This is not
347 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
348 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
349 : // load.
350 0 : self.remote_client.wait_completion().await?;
351 :
352 : fail::fail_point!("compact-shard-ancestors-persistent");
353 :
354 0 : Ok(())
355 0 : }
356 :
357 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
358 : /// as Level 1 files.
359 364 : async fn compact_level0(
360 364 : self: &Arc<Self>,
361 364 : target_file_size: u64,
362 364 : ctx: &RequestContext,
363 364 : ) -> Result<(), CompactionError> {
364 : let CompactLevel0Phase1Result {
365 364 : new_layers,
366 364 : deltas_to_compact,
367 : } = {
368 364 : let phase1_span = info_span!("compact_level0_phase1");
369 364 : let ctx = ctx.attached_child();
370 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
371 364 : version: Some(2),
372 364 : tenant_id: Some(self.tenant_shard_id),
373 364 : timeline_id: Some(self.timeline_id),
374 364 : ..Default::default()
375 364 : };
376 364 :
377 364 : let begin = tokio::time::Instant::now();
378 364 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
379 364 : let now = tokio::time::Instant::now();
380 364 : stats.read_lock_acquisition_micros =
381 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
382 364 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
383 364 : .instrument(phase1_span)
384 42653 : .await?
385 : };
386 :
387 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
388 : // nothing to do
389 336 : return Ok(());
390 28 : }
391 28 :
392 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
393 0 : .await?;
394 28 : Ok(())
395 364 : }
396 :
397 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
398 364 : async fn compact_level0_phase1(
399 364 : self: &Arc<Self>,
400 364 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
401 364 : mut stats: CompactLevel0Phase1StatsBuilder,
402 364 : target_file_size: u64,
403 364 : ctx: &RequestContext,
404 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
405 364 : stats.read_lock_held_spawn_blocking_startup_micros =
406 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
407 364 : let layers = guard.layer_map();
408 364 : let level0_deltas = layers.get_level0_deltas()?;
409 364 : let mut level0_deltas = level0_deltas
410 364 : .into_iter()
411 1616 : .map(|x| guard.get_from_desc(&x))
412 364 : .collect_vec();
413 364 : stats.level0_deltas_count = Some(level0_deltas.len());
414 364 : // Only compact if enough layers have accumulated.
415 364 : let threshold = self.get_compaction_threshold();
416 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
417 336 : debug!(
418 0 : level0_deltas = level0_deltas.len(),
419 0 : threshold, "too few deltas to compact"
420 : );
421 336 : return Ok(CompactLevel0Phase1Result::default());
422 28 : }
423 28 :
424 28 : // Gather the files to compact in this iteration.
425 28 : //
426 28 : // Start with the oldest Level 0 delta file, and collect any other
427 28 : // level 0 files that form a contiguous sequence, such that the end
428 28 : // LSN of previous file matches the start LSN of the next file.
429 28 : //
430 28 : // Note that if the files don't form such a sequence, we might
431 28 : // "compact" just a single file. That's a bit pointless, but it allows
432 28 : // us to get rid of the level 0 file, and compact the other files on
433 28 : // the next iteration. This could probably made smarter, but such
434 28 : // "gaps" in the sequence of level 0 files should only happen in case
435 28 : // of a crash, partial download from cloud storage, or something like
436 28 : // that, so it's not a big deal in practice.
437 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
438 28 : let mut level0_deltas_iter = level0_deltas.iter();
439 28 :
440 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
441 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
442 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
443 28 :
444 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
445 402 : for l in level0_deltas_iter {
446 374 : let lsn_range = &l.layer_desc().lsn_range;
447 374 :
448 374 : if lsn_range.start != prev_lsn_end {
449 0 : break;
450 374 : }
451 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
452 374 : prev_lsn_end = lsn_range.end;
453 : }
454 28 : let lsn_range = Range {
455 28 : start: deltas_to_compact
456 28 : .first()
457 28 : .unwrap()
458 28 : .layer_desc()
459 28 : .lsn_range
460 28 : .start,
461 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
462 28 : };
463 28 :
464 28 : info!(
465 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
466 0 : lsn_range.start,
467 0 : lsn_range.end,
468 0 : deltas_to_compact.len(),
469 0 : level0_deltas.len()
470 : );
471 :
472 402 : for l in deltas_to_compact.iter() {
473 402 : info!("compact includes {l}");
474 : }
475 :
476 : // We don't need the original list of layers anymore. Drop it so that
477 : // we don't accidentally use it later in the function.
478 28 : drop(level0_deltas);
479 28 :
480 28 : stats.read_lock_held_prerequisites_micros = stats
481 28 : .read_lock_held_spawn_blocking_startup_micros
482 28 : .till_now();
483 28 :
484 28 : // Determine N largest holes where N is number of compacted layers.
485 28 : let max_holes = deltas_to_compact.len();
486 28 : let last_record_lsn = self.get_last_record_lsn();
487 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
488 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
489 28 :
490 28 : // min-heap (reserve space for one more element added before eviction)
491 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
492 28 : let mut prev: Option<Key> = None;
493 28 :
494 28 : let mut all_keys = Vec::new();
495 :
496 402 : for l in deltas_to_compact.iter() {
497 2413 : all_keys.extend(l.load_keys(ctx).await?);
498 : }
499 :
500 : // FIXME: should spawn_blocking the rest of this function
501 :
502 : // The current stdlib sorting implementation is designed in a way where it is
503 : // particularly fast where the slice is made up of sorted sub-ranges.
504 4431810 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
505 28 :
506 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
507 :
508 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
509 2064038 : if let Some(prev_key) = prev {
510 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
511 : // compaction is the gap between data key and metadata keys.
512 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
513 0 : && !Key::is_metadata_key(&prev_key)
514 : {
515 0 : let key_range = prev_key..next_key;
516 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
517 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
518 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
519 0 : // That is why it is better to measure size of hole as number of covering image layers.
520 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
521 0 : if coverage_size >= min_hole_coverage_size {
522 0 : heap.push(Hole {
523 0 : key_range,
524 0 : coverage_size,
525 0 : });
526 0 : if heap.len() > max_holes {
527 0 : heap.pop(); // remove smallest hole
528 0 : }
529 0 : }
530 2064010 : }
531 28 : }
532 2064038 : prev = Some(next_key.next());
533 : }
534 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
535 28 : drop_rlock(guard);
536 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
537 28 : let mut holes = heap.into_vec();
538 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
539 28 : let mut next_hole = 0; // index of next hole in holes vector
540 28 :
541 28 : // This iterator walks through all key-value pairs from all the layers
542 28 : // we're compacting, in key, LSN order.
543 28 : let all_values_iter = all_keys.iter();
544 28 :
545 28 : // This iterator walks through all keys and is needed to calculate size used by each key
546 28 : let mut all_keys_iter = all_keys
547 28 : .iter()
548 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
549 2064010 : .coalesce(|mut prev, cur| {
550 2064010 : // Coalesce keys that belong to the same key pair.
551 2064010 : // This ensures that compaction doesn't put them
552 2064010 : // into different layer files.
553 2064010 : // Still limit this by the target file size,
554 2064010 : // so that we keep the size of the files in
555 2064010 : // check.
556 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
557 40038 : prev.2 += cur.2;
558 40038 : Ok(prev)
559 : } else {
560 2023972 : Err((prev, cur))
561 : }
562 2064010 : });
563 28 :
564 28 : // Merge the contents of all the input delta layers into a new set
565 28 : // of delta layers, based on the current partitioning.
566 28 : //
567 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
568 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
569 28 : // would be too large. In that case, we also split on the LSN dimension.
570 28 : //
571 28 : // LSN
572 28 : // ^
573 28 : // |
574 28 : // | +-----------+ +--+--+--+--+
575 28 : // | | | | | | | |
576 28 : // | +-----------+ | | | | |
577 28 : // | | | | | | | |
578 28 : // | +-----------+ ==> | | | | |
579 28 : // | | | | | | | |
580 28 : // | +-----------+ | | | | |
581 28 : // | | | | | | | |
582 28 : // | +-----------+ +--+--+--+--+
583 28 : // |
584 28 : // +--------------> key
585 28 : //
586 28 : //
587 28 : // If one key (X) has a lot of page versions:
588 28 : //
589 28 : // LSN
590 28 : // ^
591 28 : // | (X)
592 28 : // | +-----------+ +--+--+--+--+
593 28 : // | | | | | | | |
594 28 : // | +-----------+ | | +--+ |
595 28 : // | | | | | | | |
596 28 : // | +-----------+ ==> | | | | |
597 28 : // | | | | | +--+ |
598 28 : // | +-----------+ | | | | |
599 28 : // | | | | | | | |
600 28 : // | +-----------+ +--+--+--+--+
601 28 : // |
602 28 : // +--------------> key
603 28 : // TODO: this actually divides the layers into fixed-size chunks, not
604 28 : // based on the partitioning.
605 28 : //
606 28 : // TODO: we should also opportunistically materialize and
607 28 : // garbage collect what we can.
608 28 : let mut new_layers = Vec::new();
609 28 : let mut prev_key: Option<Key> = None;
610 28 : let mut writer: Option<DeltaLayerWriter> = None;
611 28 : let mut key_values_total_size = 0u64;
612 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
613 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
614 :
615 : for &DeltaEntry {
616 2064038 : key, lsn, ref val, ..
617 2064066 : } in all_values_iter
618 : {
619 2064038 : let value = val.load(ctx).await?;
620 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
621 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
622 2064038 : if !same_key || lsn == dup_end_lsn {
623 2024000 : let mut next_key_size = 0u64;
624 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
625 2024000 : dup_start_lsn = Lsn::INVALID;
626 2024000 : if !same_key {
627 2024000 : dup_end_lsn = Lsn::INVALID;
628 2024000 : }
629 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
630 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
631 2024000 : next_key_size = next_size;
632 2024000 : if key != next_key {
633 2023972 : if dup_end_lsn.is_valid() {
634 0 : // We are writting segment with duplicates:
635 0 : // place all remaining values of this key in separate segment
636 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
637 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
638 2023972 : }
639 2023972 : break;
640 28 : }
641 28 : key_values_total_size += next_size;
642 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
643 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
644 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
645 : // Split key between multiple layers: such layer can contain only single key
646 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
647 0 : dup_end_lsn // new segment with duplicates starts where old one stops
648 : } else {
649 0 : lsn // start with the first LSN for this key
650 : };
651 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
652 0 : break;
653 28 : }
654 : }
655 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
656 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
657 0 : dup_start_lsn = dup_end_lsn;
658 0 : dup_end_lsn = lsn_range.end;
659 2024000 : }
660 2024000 : if writer.is_some() {
661 2023972 : let written_size = writer.as_mut().unwrap().size();
662 2023972 : let contains_hole =
663 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
664 : // check if key cause layer overflow or contains hole...
665 2023972 : if is_dup_layer
666 2023972 : || dup_end_lsn.is_valid()
667 2023972 : || written_size + key_values_total_size > target_file_size
668 2023692 : || contains_hole
669 : {
670 : // ... if so, flush previous layer and prepare to write new one
671 280 : new_layers.push(
672 280 : writer
673 280 : .take()
674 280 : .unwrap()
675 280 : .finish(prev_key.unwrap().next(), self, ctx)
676 701 : .await?,
677 : );
678 280 : writer = None;
679 280 :
680 280 : if contains_hole {
681 0 : // skip hole
682 0 : next_hole += 1;
683 280 : }
684 2023692 : }
685 28 : }
686 : // Remember size of key value because at next iteration we will access next item
687 2024000 : key_values_total_size = next_key_size;
688 40038 : }
689 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
690 0 : Err(CompactionError::Other(anyhow::anyhow!(
691 0 : "failpoint delta-layer-writer-fail-before-finish"
692 0 : )))
693 2064038 : });
694 :
695 2064038 : if !self.shard_identity.is_key_disposable(&key) {
696 2064038 : if writer.is_none() {
697 308 : // Create writer if not initiaized yet
698 308 : writer = Some(
699 : DeltaLayerWriter::new(
700 308 : self.conf,
701 308 : self.timeline_id,
702 308 : self.tenant_shard_id,
703 308 : key,
704 308 : if dup_end_lsn.is_valid() {
705 : // this is a layer containing slice of values of the same key
706 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
707 0 : dup_start_lsn..dup_end_lsn
708 : } else {
709 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
710 308 : lsn_range.clone()
711 : },
712 308 : ctx,
713 : )
714 154 : .await?,
715 : );
716 2063730 : }
717 :
718 2064038 : writer
719 2064038 : .as_mut()
720 2064038 : .unwrap()
721 2064038 : .put_value(key, lsn, value, ctx)
722 1569 : .await?;
723 : } else {
724 0 : debug!(
725 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
726 0 : key,
727 0 : self.shard_identity.get_shard_number(&key)
728 : );
729 : }
730 :
731 2064038 : if !new_layers.is_empty() {
732 19786 : fail_point!("after-timeline-compacted-first-L1");
733 2044252 : }
734 :
735 2064038 : prev_key = Some(key);
736 : }
737 28 : if let Some(writer) = writer {
738 1992 : new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
739 0 : }
740 :
741 : // Sync layers
742 28 : if !new_layers.is_empty() {
743 : // Print a warning if the created layer is larger than double the target size
744 : // Add two pages for potential overhead. This should in theory be already
745 : // accounted for in the target calculation, but for very small targets,
746 : // we still might easily hit the limit otherwise.
747 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
748 308 : for layer in new_layers.iter() {
749 308 : if layer.layer_desc().file_size > warn_limit {
750 0 : warn!(
751 : %layer,
752 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
753 : );
754 308 : }
755 : }
756 :
757 : // The writer.finish() above already did the fsync of the inodes.
758 : // We just need to fsync the directory in which these inodes are linked,
759 : // which we know to be the timeline directory.
760 : //
761 : // We use fatal_err() below because the after writer.finish() returns with success,
762 : // the in-memory state of the filesystem already has the layer file in its final place,
763 : // and subsequent pageserver code could think it's durable while it really isn't.
764 28 : let timeline_dir = VirtualFile::open(
765 28 : &self
766 28 : .conf
767 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
768 28 : ctx,
769 28 : )
770 14 : .await
771 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
772 28 : timeline_dir
773 28 : .sync_all()
774 14 : .await
775 28 : .fatal_err("VirtualFile::sync_all timeline dir");
776 0 : }
777 :
778 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
779 28 : stats.new_deltas_count = Some(new_layers.len());
780 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
781 28 :
782 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
783 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
784 : {
785 28 : Ok(stats_json) => {
786 28 : info!(
787 0 : stats_json = stats_json.as_str(),
788 0 : "compact_level0_phase1 stats available"
789 : )
790 : }
791 0 : Err(e) => {
792 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
793 : }
794 : }
795 :
796 28 : Ok(CompactLevel0Phase1Result {
797 28 : new_layers,
798 28 : deltas_to_compact: deltas_to_compact
799 28 : .into_iter()
800 402 : .map(|x| x.drop_eviction_guard())
801 28 : .collect::<Vec<_>>(),
802 28 : })
803 364 : }
804 : }
805 :
806 : #[derive(Default)]
807 : struct CompactLevel0Phase1Result {
808 : new_layers: Vec<ResidentLayer>,
809 : deltas_to_compact: Vec<Layer>,
810 : }
811 :
812 : #[derive(Default)]
813 : struct CompactLevel0Phase1StatsBuilder {
814 : version: Option<u64>,
815 : tenant_id: Option<TenantShardId>,
816 : timeline_id: Option<TimelineId>,
817 : read_lock_acquisition_micros: DurationRecorder,
818 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
819 : read_lock_held_key_sort_micros: DurationRecorder,
820 : read_lock_held_prerequisites_micros: DurationRecorder,
821 : read_lock_held_compute_holes_micros: DurationRecorder,
822 : read_lock_drop_micros: DurationRecorder,
823 : write_layer_files_micros: DurationRecorder,
824 : level0_deltas_count: Option<usize>,
825 : new_deltas_count: Option<usize>,
826 : new_deltas_size: Option<u64>,
827 : }
828 :
829 : #[derive(serde::Serialize)]
830 : struct CompactLevel0Phase1Stats {
831 : version: u64,
832 : tenant_id: TenantShardId,
833 : timeline_id: TimelineId,
834 : read_lock_acquisition_micros: RecordedDuration,
835 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
836 : read_lock_held_key_sort_micros: RecordedDuration,
837 : read_lock_held_prerequisites_micros: RecordedDuration,
838 : read_lock_held_compute_holes_micros: RecordedDuration,
839 : read_lock_drop_micros: RecordedDuration,
840 : write_layer_files_micros: RecordedDuration,
841 : level0_deltas_count: usize,
842 : new_deltas_count: usize,
843 : new_deltas_size: u64,
844 : }
845 :
846 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
847 : type Error = anyhow::Error;
848 :
849 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
850 28 : Ok(Self {
851 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
852 28 : tenant_id: value
853 28 : .tenant_id
854 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
855 28 : timeline_id: value
856 28 : .timeline_id
857 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
858 28 : read_lock_acquisition_micros: value
859 28 : .read_lock_acquisition_micros
860 28 : .into_recorded()
861 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
862 28 : read_lock_held_spawn_blocking_startup_micros: value
863 28 : .read_lock_held_spawn_blocking_startup_micros
864 28 : .into_recorded()
865 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
866 28 : read_lock_held_key_sort_micros: value
867 28 : .read_lock_held_key_sort_micros
868 28 : .into_recorded()
869 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
870 28 : read_lock_held_prerequisites_micros: value
871 28 : .read_lock_held_prerequisites_micros
872 28 : .into_recorded()
873 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
874 28 : read_lock_held_compute_holes_micros: value
875 28 : .read_lock_held_compute_holes_micros
876 28 : .into_recorded()
877 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
878 28 : read_lock_drop_micros: value
879 28 : .read_lock_drop_micros
880 28 : .into_recorded()
881 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
882 28 : write_layer_files_micros: value
883 28 : .write_layer_files_micros
884 28 : .into_recorded()
885 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
886 28 : level0_deltas_count: value
887 28 : .level0_deltas_count
888 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
889 28 : new_deltas_count: value
890 28 : .new_deltas_count
891 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
892 28 : new_deltas_size: value
893 28 : .new_deltas_size
894 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
895 : })
896 28 : }
897 : }
898 :
899 : impl Timeline {
900 : /// Entry point for new tiered compaction algorithm.
901 : ///
902 : /// All the real work is in the implementation in the pageserver_compaction
903 : /// crate. The code here would apply to any algorithm implemented by the
904 : /// same interface, but tiered is the only one at the moment.
905 : ///
906 : /// TODO: cancellation
907 0 : pub(crate) async fn compact_tiered(
908 0 : self: &Arc<Self>,
909 0 : _cancel: &CancellationToken,
910 0 : ctx: &RequestContext,
911 0 : ) -> Result<(), CompactionError> {
912 0 : let fanout = self.get_compaction_threshold() as u64;
913 0 : let target_file_size = self.get_checkpoint_distance();
914 :
915 : // Find the top of the historical layers
916 0 : let end_lsn = {
917 0 : let guard = self.layers.read().await;
918 0 : let layers = guard.layer_map();
919 :
920 0 : let l0_deltas = layers.get_level0_deltas()?;
921 0 : drop(guard);
922 0 :
923 0 : // As an optimization, if we find that there are too few L0 layers,
924 0 : // bail out early. We know that the compaction algorithm would do
925 0 : // nothing in that case.
926 0 : if l0_deltas.len() < fanout as usize {
927 : // doesn't need compacting
928 0 : return Ok(());
929 0 : }
930 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
931 0 : };
932 0 :
933 0 : // Is the timeline being deleted?
934 0 : if self.is_stopping() {
935 0 : trace!("Dropping out of compaction on timeline shutdown");
936 0 : return Err(CompactionError::ShuttingDown);
937 0 : }
938 :
939 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
940 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
941 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
942 0 :
943 0 : pageserver_compaction::compact_tiered::compact_tiered(
944 0 : &mut adaptor,
945 0 : end_lsn,
946 0 : target_file_size,
947 0 : fanout,
948 0 : ctx,
949 0 : )
950 0 : .await?;
951 :
952 0 : adaptor.flush_updates().await?;
953 0 : Ok(())
954 0 : }
955 :
956 : /// An experimental compaction building block that combines compaction with garbage collection.
957 : ///
958 : /// The current implementation picks all delta + image layers that are below or intersecting with
959 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
960 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
961 : /// and create delta layers with all deltas >= gc horizon.
962 : #[cfg(test)]
963 2 : pub(crate) async fn compact_with_gc(
964 2 : self: &Arc<Self>,
965 2 : _cancel: &CancellationToken,
966 2 : ctx: &RequestContext,
967 2 : ) -> Result<(), CompactionError> {
968 : use crate::tenant::storage_layer::ValueReconstructState;
969 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
970 : // The layer selection has the following properties:
971 : // 1. If a layer is in the selection, all layers below it are in the selection.
972 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
973 2 : let (layer_selection, gc_cutoff) = {
974 2 : let guard = self.layers.read().await;
975 2 : let layers = guard.layer_map();
976 2 : let gc_info = self.gc_info.read().unwrap();
977 2 : let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
978 2 : let mut selected_layers = Vec::new();
979 2 : // TODO: consider retain_lsns
980 2 : drop(gc_info);
981 10 : for desc in layers.iter_historic_layers() {
982 10 : if desc.get_lsn_range().start <= gc_cutoff {
983 8 : selected_layers.push(guard.get_from_desc(&desc));
984 8 : }
985 : }
986 2 : (selected_layers, gc_cutoff)
987 2 : };
988 2 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
989 2 : let mut all_key_values = Vec::new();
990 10 : for layer in &layer_selection {
991 17 : all_key_values.extend(layer.load_key_values(ctx).await?);
992 : }
993 : // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
994 : // image layers, make image appear later than delta.
995 : struct ValueWrapper<'a>(&'a crate::repository::Value);
996 : impl Ord for ValueWrapper<'_> {
997 0 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
998 0 : use crate::repository::Value;
999 0 : use std::cmp::Ordering;
1000 0 : match (self.0, other.0) {
1001 0 : (Value::Image(_), Value::WalRecord(_)) => Ordering::Greater,
1002 0 : (Value::WalRecord(_), Value::Image(_)) => Ordering::Less,
1003 0 : _ => Ordering::Equal,
1004 : }
1005 0 : }
1006 : }
1007 : impl PartialOrd for ValueWrapper<'_> {
1008 0 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1009 0 : Some(self.cmp(other))
1010 0 : }
1011 : }
1012 : impl PartialEq for ValueWrapper<'_> {
1013 0 : fn eq(&self, other: &Self) -> bool {
1014 0 : self.cmp(other) == std::cmp::Ordering::Equal
1015 0 : }
1016 : }
1017 : impl Eq for ValueWrapper<'_> {}
1018 82 : all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
1019 82 : (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
1020 82 : });
1021 2 : let max_lsn = all_key_values
1022 2 : .iter()
1023 46 : .map(|(_, lsn, _)| lsn)
1024 2 : .max()
1025 2 : .copied()
1026 2 : .unwrap()
1027 2 : + 1;
1028 2 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1029 2 : // Data of the same key.
1030 2 : let mut accumulated_values = Vec::new();
1031 2 : let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
1032 :
1033 : /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
1034 36 : async fn flush_accumulated_states(
1035 36 : tline: &Arc<Timeline>,
1036 36 : key: Key,
1037 36 : accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
1038 36 : horizon: Lsn,
1039 36 : ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
1040 36 : let mut base_image = None;
1041 36 : let mut keys_above_horizon = Vec::new();
1042 36 : let mut delta_above_base_image = Vec::new();
1043 : // We have a list of deltas/images. We want to create image layers while collect garbages.
1044 38 : for (key, lsn, val) in accumulated_values.iter().rev() {
1045 38 : if *lsn > horizon {
1046 2 : keys_above_horizon.push((*key, *lsn, val.clone())); // TODO: ensure one LSN corresponds to either delta or image instead of both
1047 36 : } else if *lsn <= horizon {
1048 36 : match val {
1049 36 : crate::repository::Value::Image(image) => {
1050 36 : if lsn <= &horizon {
1051 36 : base_image = Some((*lsn, image.clone()));
1052 36 : break;
1053 0 : }
1054 : }
1055 0 : crate::repository::Value::WalRecord(wal) => {
1056 0 : delta_above_base_image.push((*lsn, wal.clone()));
1057 0 : }
1058 : }
1059 0 : }
1060 : }
1061 36 : delta_above_base_image.reverse();
1062 36 : keys_above_horizon.reverse();
1063 36 : let state = ValueReconstructState {
1064 36 : img: base_image,
1065 36 : records: delta_above_base_image,
1066 36 : };
1067 36 : let img = tline.reconstruct_value(key, horizon, state).await?;
1068 36 : Ok((keys_above_horizon, img))
1069 36 : }
1070 :
1071 2 : let mut delta_layer_writer = DeltaLayerWriter::new(
1072 2 : self.conf,
1073 2 : self.timeline_id,
1074 2 : self.tenant_shard_id,
1075 2 : all_key_values.first().unwrap().0,
1076 2 : gc_cutoff..max_lsn, // TODO: off by one?
1077 2 : ctx,
1078 2 : )
1079 1 : .await?;
1080 2 : let mut image_layer_writer = ImageLayerWriter::new(
1081 2 : self.conf,
1082 2 : self.timeline_id,
1083 2 : self.tenant_shard_id,
1084 2 : &(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
1085 2 : gc_cutoff,
1086 2 : ctx,
1087 2 : )
1088 1 : .await?;
1089 :
1090 48 : for item @ (key, _, _) in &all_key_values {
1091 46 : if &last_key == key {
1092 12 : accumulated_values.push(item);
1093 12 : } else {
1094 34 : let (deltas, image) =
1095 34 : flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
1096 0 : .await?;
1097 34 : image_layer_writer.put_image(last_key, image, ctx).await?;
1098 36 : for (key, lsn, val) in deltas {
1099 2 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1100 : }
1101 34 : accumulated_values.clear();
1102 34 : accumulated_values.push(item);
1103 34 : last_key = *key;
1104 : }
1105 : }
1106 2 : let (deltas, image) =
1107 2 : flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
1108 2 : image_layer_writer.put_image(last_key, image, ctx).await?;
1109 2 : for (key, lsn, val) in deltas {
1110 0 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1111 : }
1112 2 : accumulated_values.clear();
1113 : // TODO: split layers
1114 5 : let delta_layer = delta_layer_writer.finish(last_key, self, ctx).await?;
1115 4 : let image_layer = image_layer_writer.finish(self, ctx).await?;
1116 : // Step 3: Place back to the layer map.
1117 : {
1118 2 : let mut guard = self.layers.write().await;
1119 2 : guard.finish_gc_compaction(
1120 2 : &layer_selection,
1121 2 : &[delta_layer.clone(), image_layer.clone()],
1122 2 : &self.metrics,
1123 2 : )
1124 2 : };
1125 2 : Ok(())
1126 2 : }
1127 : }
1128 :
1129 : struct TimelineAdaptor {
1130 : timeline: Arc<Timeline>,
1131 :
1132 : keyspace: (Lsn, KeySpace),
1133 :
1134 : new_deltas: Vec<ResidentLayer>,
1135 : new_images: Vec<ResidentLayer>,
1136 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
1137 : }
1138 :
1139 : impl TimelineAdaptor {
1140 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
1141 0 : Self {
1142 0 : timeline: timeline.clone(),
1143 0 : keyspace,
1144 0 : new_images: Vec::new(),
1145 0 : new_deltas: Vec::new(),
1146 0 : layers_to_delete: Vec::new(),
1147 0 : }
1148 0 : }
1149 :
1150 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
1151 0 : let layers_to_delete = {
1152 0 : let guard = self.timeline.layers.read().await;
1153 0 : self.layers_to_delete
1154 0 : .iter()
1155 0 : .map(|x| guard.get_from_desc(x))
1156 0 : .collect::<Vec<Layer>>()
1157 0 : };
1158 0 : self.timeline
1159 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
1160 0 : .await?;
1161 :
1162 0 : self.timeline
1163 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
1164 :
1165 0 : self.new_deltas.clear();
1166 0 : self.layers_to_delete.clear();
1167 0 : Ok(())
1168 0 : }
1169 : }
1170 :
1171 : #[derive(Clone)]
1172 : struct ResidentDeltaLayer(ResidentLayer);
1173 : #[derive(Clone)]
1174 : struct ResidentImageLayer(ResidentLayer);
1175 :
1176 : impl CompactionJobExecutor for TimelineAdaptor {
1177 : type Key = crate::repository::Key;
1178 :
1179 : type Layer = OwnArc<PersistentLayerDesc>;
1180 : type DeltaLayer = ResidentDeltaLayer;
1181 : type ImageLayer = ResidentImageLayer;
1182 :
1183 : type RequestContext = crate::context::RequestContext;
1184 :
1185 0 : fn get_shard_identity(&self) -> &ShardIdentity {
1186 0 : self.timeline.get_shard_identity()
1187 0 : }
1188 :
1189 0 : async fn get_layers(
1190 0 : &mut self,
1191 0 : key_range: &Range<Key>,
1192 0 : lsn_range: &Range<Lsn>,
1193 0 : _ctx: &RequestContext,
1194 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
1195 0 : self.flush_updates().await?;
1196 :
1197 0 : let guard = self.timeline.layers.read().await;
1198 0 : let layer_map = guard.layer_map();
1199 0 :
1200 0 : let result = layer_map
1201 0 : .iter_historic_layers()
1202 0 : .filter(|l| {
1203 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
1204 0 : })
1205 0 : .map(OwnArc)
1206 0 : .collect();
1207 0 : Ok(result)
1208 0 : }
1209 :
1210 0 : async fn get_keyspace(
1211 0 : &mut self,
1212 0 : key_range: &Range<Key>,
1213 0 : lsn: Lsn,
1214 0 : _ctx: &RequestContext,
1215 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
1216 0 : if lsn == self.keyspace.0 {
1217 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
1218 0 : &self.keyspace.1.ranges,
1219 0 : key_range,
1220 0 : ))
1221 : } else {
1222 : // The current compaction implementatin only ever requests the key space
1223 : // at the compaction end LSN.
1224 0 : anyhow::bail!("keyspace not available for requested lsn");
1225 : }
1226 0 : }
1227 :
1228 0 : async fn downcast_delta_layer(
1229 0 : &self,
1230 0 : layer: &OwnArc<PersistentLayerDesc>,
1231 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
1232 0 : // this is a lot more complex than a simple downcast...
1233 0 : if layer.is_delta() {
1234 0 : let l = {
1235 0 : let guard = self.timeline.layers.read().await;
1236 0 : guard.get_from_desc(layer)
1237 : };
1238 0 : let result = l.download_and_keep_resident().await?;
1239 :
1240 0 : Ok(Some(ResidentDeltaLayer(result)))
1241 : } else {
1242 0 : Ok(None)
1243 : }
1244 0 : }
1245 :
1246 0 : async fn create_image(
1247 0 : &mut self,
1248 0 : lsn: Lsn,
1249 0 : key_range: &Range<Key>,
1250 0 : ctx: &RequestContext,
1251 0 : ) -> anyhow::Result<()> {
1252 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
1253 0 : }
1254 :
1255 0 : async fn create_delta(
1256 0 : &mut self,
1257 0 : lsn_range: &Range<Lsn>,
1258 0 : key_range: &Range<Key>,
1259 0 : input_layers: &[ResidentDeltaLayer],
1260 0 : ctx: &RequestContext,
1261 0 : ) -> anyhow::Result<()> {
1262 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1263 :
1264 0 : let mut all_entries = Vec::new();
1265 0 : for dl in input_layers.iter() {
1266 0 : all_entries.extend(dl.load_keys(ctx).await?);
1267 : }
1268 :
1269 : // The current stdlib sorting implementation is designed in a way where it is
1270 : // particularly fast where the slice is made up of sorted sub-ranges.
1271 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1272 :
1273 0 : let mut writer = DeltaLayerWriter::new(
1274 0 : self.timeline.conf,
1275 0 : self.timeline.timeline_id,
1276 0 : self.timeline.tenant_shard_id,
1277 0 : key_range.start,
1278 0 : lsn_range.clone(),
1279 0 : ctx,
1280 0 : )
1281 0 : .await?;
1282 :
1283 0 : let mut dup_values = 0;
1284 0 :
1285 0 : // This iterator walks through all key-value pairs from all the layers
1286 0 : // we're compacting, in key, LSN order.
1287 0 : let mut prev: Option<(Key, Lsn)> = None;
1288 : for &DeltaEntry {
1289 0 : key, lsn, ref val, ..
1290 0 : } in all_entries.iter()
1291 : {
1292 0 : if prev == Some((key, lsn)) {
1293 : // This is a duplicate. Skip it.
1294 : //
1295 : // It can happen if compaction is interrupted after writing some
1296 : // layers but not all, and we are compacting the range again.
1297 : // The calculations in the algorithm assume that there are no
1298 : // duplicates, so the math on targeted file size is likely off,
1299 : // and we will create smaller files than expected.
1300 0 : dup_values += 1;
1301 0 : continue;
1302 0 : }
1303 :
1304 0 : let value = val.load(ctx).await?;
1305 :
1306 0 : writer.put_value(key, lsn, value, ctx).await?;
1307 :
1308 0 : prev = Some((key, lsn));
1309 : }
1310 :
1311 0 : if dup_values > 0 {
1312 0 : warn!("delta layer created with {} duplicate values", dup_values);
1313 0 : }
1314 :
1315 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1316 0 : Err(anyhow::anyhow!(
1317 0 : "failpoint delta-layer-writer-fail-before-finish"
1318 0 : ))
1319 0 : });
1320 :
1321 0 : let new_delta_layer = writer
1322 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
1323 0 : .await?;
1324 :
1325 0 : self.new_deltas.push(new_delta_layer);
1326 0 : Ok(())
1327 0 : }
1328 :
1329 0 : async fn delete_layer(
1330 0 : &mut self,
1331 0 : layer: &OwnArc<PersistentLayerDesc>,
1332 0 : _ctx: &RequestContext,
1333 0 : ) -> anyhow::Result<()> {
1334 0 : self.layers_to_delete.push(layer.clone().0);
1335 0 : Ok(())
1336 0 : }
1337 : }
1338 :
1339 : impl TimelineAdaptor {
1340 0 : async fn create_image_impl(
1341 0 : &mut self,
1342 0 : lsn: Lsn,
1343 0 : key_range: &Range<Key>,
1344 0 : ctx: &RequestContext,
1345 0 : ) -> Result<(), CreateImageLayersError> {
1346 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
1347 :
1348 0 : let image_layer_writer = ImageLayerWriter::new(
1349 0 : self.timeline.conf,
1350 0 : self.timeline.timeline_id,
1351 0 : self.timeline.tenant_shard_id,
1352 0 : key_range,
1353 0 : lsn,
1354 0 : ctx,
1355 0 : )
1356 0 : .await?;
1357 :
1358 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1359 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
1360 0 : "failpoint image-layer-writer-fail-before-finish"
1361 0 : )))
1362 0 : });
1363 :
1364 0 : let keyspace = KeySpace {
1365 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
1366 : };
1367 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
1368 0 : let start = Key::MIN;
1369 : let ImageLayerCreationOutcome {
1370 0 : image,
1371 : next_start_key: _,
1372 0 : } = self
1373 0 : .timeline
1374 0 : .create_image_layer_for_rel_blocks(
1375 0 : &keyspace,
1376 0 : image_layer_writer,
1377 0 : lsn,
1378 0 : ctx,
1379 0 : key_range.clone(),
1380 0 : start,
1381 0 : )
1382 0 : .await?;
1383 :
1384 0 : if let Some(image_layer) = image {
1385 0 : self.new_images.push(image_layer);
1386 0 : }
1387 :
1388 0 : timer.stop_and_record();
1389 0 :
1390 0 : Ok(())
1391 0 : }
1392 : }
1393 :
1394 : impl CompactionRequestContext for crate::context::RequestContext {}
1395 :
1396 : #[derive(Debug, Clone)]
1397 : pub struct OwnArc<T>(pub Arc<T>);
1398 :
1399 : impl<T> Deref for OwnArc<T> {
1400 : type Target = <Arc<T> as Deref>::Target;
1401 0 : fn deref(&self) -> &Self::Target {
1402 0 : &self.0
1403 0 : }
1404 : }
1405 :
1406 : impl<T> AsRef<T> for OwnArc<T> {
1407 0 : fn as_ref(&self) -> &T {
1408 0 : self.0.as_ref()
1409 0 : }
1410 : }
1411 :
1412 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1413 0 : fn key_range(&self) -> &Range<Key> {
1414 0 : &self.key_range
1415 0 : }
1416 0 : fn lsn_range(&self) -> &Range<Lsn> {
1417 0 : &self.lsn_range
1418 0 : }
1419 0 : fn file_size(&self) -> u64 {
1420 0 : self.file_size
1421 0 : }
1422 0 : fn short_id(&self) -> std::string::String {
1423 0 : self.as_ref().short_id().to_string()
1424 0 : }
1425 0 : fn is_delta(&self) -> bool {
1426 0 : self.as_ref().is_delta()
1427 0 : }
1428 : }
1429 :
1430 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1431 0 : fn key_range(&self) -> &Range<Key> {
1432 0 : &self.layer_desc().key_range
1433 0 : }
1434 0 : fn lsn_range(&self) -> &Range<Lsn> {
1435 0 : &self.layer_desc().lsn_range
1436 0 : }
1437 0 : fn file_size(&self) -> u64 {
1438 0 : self.layer_desc().file_size
1439 0 : }
1440 0 : fn short_id(&self) -> std::string::String {
1441 0 : self.layer_desc().short_id().to_string()
1442 0 : }
1443 0 : fn is_delta(&self) -> bool {
1444 0 : true
1445 0 : }
1446 : }
1447 :
1448 : use crate::tenant::timeline::DeltaEntry;
1449 :
1450 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1451 0 : fn key_range(&self) -> &Range<Key> {
1452 0 : &self.0.layer_desc().key_range
1453 0 : }
1454 0 : fn lsn_range(&self) -> &Range<Lsn> {
1455 0 : &self.0.layer_desc().lsn_range
1456 0 : }
1457 0 : fn file_size(&self) -> u64 {
1458 0 : self.0.layer_desc().file_size
1459 0 : }
1460 0 : fn short_id(&self) -> std::string::String {
1461 0 : self.0.layer_desc().short_id().to_string()
1462 0 : }
1463 0 : fn is_delta(&self) -> bool {
1464 0 : true
1465 0 : }
1466 : }
1467 :
1468 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1469 : type DeltaEntry<'a> = DeltaEntry<'a>;
1470 :
1471 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1472 0 : self.0.load_keys(ctx).await
1473 0 : }
1474 : }
1475 :
1476 : impl CompactionLayer<Key> for ResidentImageLayer {
1477 0 : fn key_range(&self) -> &Range<Key> {
1478 0 : &self.0.layer_desc().key_range
1479 0 : }
1480 0 : fn lsn_range(&self) -> &Range<Lsn> {
1481 0 : &self.0.layer_desc().lsn_range
1482 0 : }
1483 0 : fn file_size(&self) -> u64 {
1484 0 : self.0.layer_desc().file_size
1485 0 : }
1486 0 : fn short_id(&self) -> std::string::String {
1487 0 : self.0.layer_desc().short_id().to_string()
1488 0 : }
1489 0 : fn is_delta(&self) -> bool {
1490 0 : false
1491 0 : }
1492 : }
1493 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|