Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{CompactFlags, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline};
13 :
14 : use anyhow::{anyhow, Context};
15 : use enumset::EnumSet;
16 : use fail::fail_point;
17 : use itertools::Itertools;
18 : use pageserver_api::keyspace::ShardedRange;
19 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, info_span, trace, warn, Instrument};
22 : use utils::id::TimelineId;
23 :
24 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
25 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
26 : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
27 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
28 : use crate::tenant::timeline::{Layer, ResidentLayer};
29 : use crate::tenant::DeltaLayer;
30 : use crate::tenant::PageReconstructError;
31 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
32 : use crate::{page_cache, ZERO_PAGE};
33 :
34 : use crate::keyspace::KeySpace;
35 : use crate::repository::Key;
36 :
37 : use utils::lsn::Lsn;
38 :
39 : use pageserver_compaction::helpers::overlaps_with;
40 : use pageserver_compaction::interface::*;
41 :
42 : use super::CompactionError;
43 :
44 : impl Timeline {
45 : /// TODO: cancellation
46 330 : pub(crate) async fn compact_legacy(
47 330 : self: &Arc<Self>,
48 330 : _cancel: &CancellationToken,
49 330 : flags: EnumSet<CompactFlags>,
50 330 : ctx: &RequestContext,
51 330 : ) -> Result<(), CompactionError> {
52 330 : // High level strategy for compaction / image creation:
53 330 : //
54 330 : // 1. First, calculate the desired "partitioning" of the
55 330 : // currently in-use key space. The goal is to partition the
56 330 : // key space into roughly fixed-size chunks, but also take into
57 330 : // account any existing image layers, and try to align the
58 330 : // chunk boundaries with the existing image layers to avoid
59 330 : // too much churn. Also try to align chunk boundaries with
60 330 : // relation boundaries. In principle, we don't know about
61 330 : // relation boundaries here, we just deal with key-value
62 330 : // pairs, and the code in pgdatadir_mapping.rs knows how to
63 330 : // map relations into key-value pairs. But in practice we know
64 330 : // that 'field6' is the block number, and the fields 1-5
65 330 : // identify a relation. This is just an optimization,
66 330 : // though.
67 330 : //
68 330 : // 2. Once we know the partitioning, for each partition,
69 330 : // decide if it's time to create a new image layer. The
70 330 : // criteria is: there has been too much "churn" since the last
71 330 : // image layer? The "churn" is fuzzy concept, it's a
72 330 : // combination of too many delta files, or too much WAL in
73 330 : // total in the delta file. Or perhaps: if creating an image
74 330 : // file would allow to delete some older files.
75 330 : //
76 330 : // 3. After that, we compact all level0 delta files if there
77 330 : // are too many of them. While compacting, we also garbage
78 330 : // collect any page versions that are no longer needed because
79 330 : // of the new image layers we created in step 2.
80 330 : //
81 330 : // TODO: This high level strategy hasn't been implemented yet.
82 330 : // Below are functions compact_level0() and create_image_layers()
83 330 : // but they are a bit ad hoc and don't quite work like it's explained
84 330 : // above. Rewrite it.
85 330 :
86 330 : // Is the timeline being deleted?
87 330 : if self.is_stopping() {
88 0 : trace!("Dropping out of compaction on timeline shutdown");
89 0 : return Err(CompactionError::ShuttingDown);
90 330 : }
91 330 :
92 330 : let target_file_size = self.get_checkpoint_distance();
93 :
94 : // Define partitioning schema if needed
95 :
96 : // FIXME: the match should only cover repartitioning, not the next steps
97 330 : let partition_count = match self
98 330 : .repartition(
99 330 : self.get_last_record_lsn(),
100 330 : self.get_compaction_target_size(),
101 330 : flags,
102 330 : ctx,
103 330 : )
104 13269 : .await
105 : {
106 330 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
107 330 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
108 330 : let image_ctx = RequestContextBuilder::extend(ctx)
109 330 : .access_stats_behavior(AccessStatsBehavior::Skip)
110 330 : .build();
111 330 :
112 330 : // 2. Compact
113 330 : let timer = self.metrics.compact_time_histo.start_timer();
114 41663 : self.compact_level0(target_file_size, ctx).await?;
115 330 : timer.stop_and_record();
116 :
117 : // 3. Create new image layers for partitions that have been modified
118 : // "enough".
119 330 : let dense_layers = self
120 330 : .create_image_layers(
121 330 : &dense_partitioning,
122 330 : lsn,
123 330 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
124 0 : ImageLayerCreationMode::Force
125 : } else {
126 330 : ImageLayerCreationMode::Try
127 : },
128 330 : &image_ctx,
129 : )
130 1 : .await
131 330 : .map_err(anyhow::Error::from)?;
132 :
133 : // For now, nothing will be produced...
134 330 : let sparse_layers = self
135 330 : .create_image_layers(
136 330 : &sparse_partitioning.clone().into_dense(),
137 330 : lsn,
138 330 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
139 0 : ImageLayerCreationMode::Force
140 : } else {
141 330 : ImageLayerCreationMode::Try
142 : },
143 330 : &image_ctx,
144 : )
145 0 : .await
146 330 : .map_err(anyhow::Error::from)?;
147 330 : assert!(sparse_layers.is_empty());
148 :
149 330 : self.upload_new_image_layers(dense_layers)?;
150 330 : dense_partitioning.parts.len()
151 : }
152 0 : Err(err) => {
153 0 : // no partitioning? This is normal, if the timeline was just created
154 0 : // as an empty timeline. Also in unit tests, when we use the timeline
155 0 : // as a simple key-value store, ignoring the datadir layout. Log the
156 0 : // error but continue.
157 0 : //
158 0 : // Suppress error when it's due to cancellation
159 0 : if !self.cancel.is_cancelled() {
160 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
161 0 : }
162 0 : 1
163 : }
164 : };
165 :
166 330 : if self.shard_identity.count >= ShardCount::new(2) {
167 : // Limit the number of layer rewrites to the number of partitions: this means its
168 : // runtime should be comparable to a full round of image layer creations, rather than
169 : // being potentially much longer.
170 0 : let rewrite_max = partition_count;
171 0 :
172 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
173 330 : }
174 :
175 330 : Ok(())
176 330 : }
177 :
178 : /// Check for layers that are elegible to be rewritten:
179 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
180 : /// we don't indefinitely retain keys in this shard that aren't needed.
181 : /// - For future use: layers beyond pitr_interval that are in formats we would
182 : /// rather not maintain compatibility with indefinitely.
183 : ///
184 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
185 : /// how much work it will try to do in each compaction pass.
186 0 : async fn compact_shard_ancestors(
187 0 : self: &Arc<Self>,
188 0 : rewrite_max: usize,
189 0 : _ctx: &RequestContext,
190 0 : ) -> anyhow::Result<()> {
191 0 : let mut drop_layers = Vec::new();
192 0 : let layers_to_rewrite: Vec<Layer> = Vec::new();
193 0 :
194 0 : // We will use the PITR cutoff as a condition for rewriting layers.
195 0 : let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
196 :
197 0 : let layers = self.layers.read().await;
198 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
199 0 : let layer = layers.get_from_desc(&layer_desc);
200 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
201 : // This layer does not belong to a historic ancestor, no need to re-image it.
202 0 : continue;
203 0 : }
204 0 :
205 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
206 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
207 0 : let layer_local_page_count = sharded_range.page_count();
208 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
209 0 : if layer_local_page_count == 0 {
210 : // This ancestral layer only covers keys that belong to other shards.
211 : // We include the full metadata in the log: if we had some critical bug that caused
212 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
213 0 : info!(%layer, old_metadata=?layer.metadata(),
214 0 : "dropping layer after shard split, contains no keys for this shard.",
215 : );
216 :
217 0 : if cfg!(debug_assertions) {
218 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
219 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
220 : // should be !is_key_disposable()
221 0 : let range = layer_desc.get_key_range();
222 0 : let mut key = range.start;
223 0 : while key < range.end {
224 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
225 0 : key = key.next();
226 : }
227 0 : }
228 :
229 0 : drop_layers.push(layer);
230 0 : continue;
231 0 : } else if layer_local_page_count != u32::MAX
232 0 : && layer_local_page_count == layer_raw_page_count
233 : {
234 0 : debug!(%layer,
235 0 : "layer is entirely shard local ({} keys), no need to filter it",
236 : layer_local_page_count
237 : );
238 0 : continue;
239 0 : }
240 0 :
241 0 : // Don't bother re-writing a layer unless it will at least halve its size
242 0 : if layer_local_page_count != u32::MAX
243 0 : && layer_local_page_count > layer_raw_page_count / 2
244 : {
245 0 : debug!(%layer,
246 0 : "layer is already mostly local ({}/{}), not rewriting",
247 : layer_local_page_count,
248 : layer_raw_page_count
249 : );
250 0 : }
251 :
252 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
253 : // without incurring the I/O cost of a rewrite.
254 0 : if layer_desc.get_lsn_range().end >= pitr_cutoff {
255 0 : debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
256 0 : layer_desc.get_lsn_range().end, pitr_cutoff);
257 0 : continue;
258 0 : }
259 0 :
260 0 : if layer_desc.is_delta() {
261 : // We do not yet implement rewrite of delta layers
262 0 : debug!(%layer, "Skipping rewrite of delta layer");
263 0 : continue;
264 0 : }
265 0 :
266 0 : // Only rewrite layers if they would have different remote paths: either they belong to this
267 0 : // shard but an old generation, or they belonged to another shard. This also implicitly
268 0 : // guarantees that the layer is persistent in remote storage (as only remote persistent
269 0 : // layers are carried across shard splits, any local-only layer would be in the current generation)
270 0 : if layer.metadata().generation == self.generation
271 0 : && layer.metadata().shard.shard_count == self.shard_identity.count
272 : {
273 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
274 0 : continue;
275 0 : }
276 0 :
277 0 : if layers_to_rewrite.len() >= rewrite_max {
278 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
279 0 : layers_to_rewrite.len()
280 : );
281 0 : continue;
282 0 : }
283 0 :
284 0 : // Fall through: all our conditions for doing a rewrite passed.
285 0 : // TODO: implement rewriting
286 0 : tracing::debug!(%layer, "Would rewrite layer");
287 : }
288 :
289 : // Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
290 0 : drop(layers);
291 0 :
292 0 : // TODO: collect layers to rewrite
293 0 : let replace_layers = Vec::new();
294 0 :
295 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
296 0 : self.rewrite_layers(replace_layers, drop_layers).await?;
297 :
298 0 : if let Some(remote_client) = self.remote_client.as_ref() {
299 : // We wait for all uploads to complete before finishing this compaction stage. This is not
300 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
301 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
302 : // load.
303 0 : remote_client.wait_completion().await?;
304 0 : }
305 :
306 0 : Ok(())
307 0 : }
308 :
309 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
310 : /// as Level 1 files.
311 330 : async fn compact_level0(
312 330 : self: &Arc<Self>,
313 330 : target_file_size: u64,
314 330 : ctx: &RequestContext,
315 330 : ) -> Result<(), CompactionError> {
316 : let CompactLevel0Phase1Result {
317 330 : new_layers,
318 330 : deltas_to_compact,
319 : } = {
320 330 : let phase1_span = info_span!("compact_level0_phase1");
321 330 : let ctx = ctx.attached_child();
322 330 : let mut stats = CompactLevel0Phase1StatsBuilder {
323 330 : version: Some(2),
324 330 : tenant_id: Some(self.tenant_shard_id),
325 330 : timeline_id: Some(self.timeline_id),
326 330 : ..Default::default()
327 330 : };
328 330 :
329 330 : let begin = tokio::time::Instant::now();
330 330 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
331 330 : let now = tokio::time::Instant::now();
332 330 : stats.read_lock_acquisition_micros =
333 330 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
334 330 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
335 330 : .instrument(phase1_span)
336 41663 : .await?
337 : };
338 :
339 330 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
340 : // nothing to do
341 306 : return Ok(());
342 24 : }
343 24 :
344 24 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
345 0 : .await?;
346 24 : Ok(())
347 330 : }
348 :
349 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
350 330 : async fn compact_level0_phase1(
351 330 : self: &Arc<Self>,
352 330 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
353 330 : mut stats: CompactLevel0Phase1StatsBuilder,
354 330 : target_file_size: u64,
355 330 : ctx: &RequestContext,
356 330 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
357 330 : stats.read_lock_held_spawn_blocking_startup_micros =
358 330 : stats.read_lock_acquisition_micros.till_now(); // set by caller
359 330 : let layers = guard.layer_map();
360 330 : let level0_deltas = layers.get_level0_deltas()?;
361 330 : let mut level0_deltas = level0_deltas
362 330 : .into_iter()
363 1372 : .map(|x| guard.get_from_desc(&x))
364 330 : .collect_vec();
365 330 : stats.level0_deltas_count = Some(level0_deltas.len());
366 330 : // Only compact if enough layers have accumulated.
367 330 : let threshold = self.get_compaction_threshold();
368 330 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
369 306 : debug!(
370 0 : level0_deltas = level0_deltas.len(),
371 0 : threshold, "too few deltas to compact"
372 : );
373 306 : return Ok(CompactLevel0Phase1Result::default());
374 24 : }
375 24 :
376 24 : // This failpoint is used together with `test_duplicate_layers` integration test.
377 24 : // It returns the compaction result exactly the same layers as input to compaction.
378 24 : // We want to ensure that this will not cause any problem when updating the layer map
379 24 : // after the compaction is finished.
380 24 : //
381 24 : // Currently, there are two rare edge cases that will cause duplicated layers being
382 24 : // inserted.
383 24 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
384 24 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
385 24 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
386 24 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
387 24 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
388 24 : // layer replace instead of the normal remove / upload process.
389 24 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
390 24 : // size length. Compaction will likely create the same set of n files afterwards.
391 24 : //
392 24 : // This failpoint is a superset of both of the cases.
393 24 : if cfg!(feature = "testing") {
394 24 : let active = (|| {
395 24 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
396 24 : false
397 24 : })();
398 24 :
399 24 : if active {
400 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
401 0 : for delta in &level0_deltas {
402 : // we are just faking these layers as being produced again for this failpoint
403 0 : new_layers.push(
404 0 : delta
405 0 : .download_and_keep_resident()
406 0 : .await
407 0 : .context("download layer for failpoint")?,
408 : );
409 : }
410 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
411 0 : return Ok(CompactLevel0Phase1Result {
412 0 : new_layers,
413 0 : deltas_to_compact: level0_deltas,
414 0 : });
415 24 : }
416 0 : }
417 :
418 : // Gather the files to compact in this iteration.
419 : //
420 : // Start with the oldest Level 0 delta file, and collect any other
421 : // level 0 files that form a contiguous sequence, such that the end
422 : // LSN of previous file matches the start LSN of the next file.
423 : //
424 : // Note that if the files don't form such a sequence, we might
425 : // "compact" just a single file. That's a bit pointless, but it allows
426 : // us to get rid of the level 0 file, and compact the other files on
427 : // the next iteration. This could probably made smarter, but such
428 : // "gaps" in the sequence of level 0 files should only happen in case
429 : // of a crash, partial download from cloud storage, or something like
430 : // that, so it's not a big deal in practice.
431 476 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
432 24 : let mut level0_deltas_iter = level0_deltas.iter();
433 24 :
434 24 : let first_level0_delta = level0_deltas_iter.next().unwrap();
435 24 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
436 24 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
437 24 :
438 24 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
439 262 : for l in level0_deltas_iter {
440 238 : let lsn_range = &l.layer_desc().lsn_range;
441 238 :
442 238 : if lsn_range.start != prev_lsn_end {
443 0 : break;
444 238 : }
445 238 : deltas_to_compact.push(l.download_and_keep_resident().await?);
446 238 : prev_lsn_end = lsn_range.end;
447 : }
448 24 : let lsn_range = Range {
449 24 : start: deltas_to_compact
450 24 : .first()
451 24 : .unwrap()
452 24 : .layer_desc()
453 24 : .lsn_range
454 24 : .start,
455 24 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
456 24 : };
457 24 :
458 24 : info!(
459 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
460 0 : lsn_range.start,
461 0 : lsn_range.end,
462 0 : deltas_to_compact.len(),
463 0 : level0_deltas.len()
464 : );
465 :
466 262 : for l in deltas_to_compact.iter() {
467 262 : info!("compact includes {l}");
468 : }
469 :
470 : // We don't need the original list of layers anymore. Drop it so that
471 : // we don't accidentally use it later in the function.
472 24 : drop(level0_deltas);
473 24 :
474 24 : stats.read_lock_held_prerequisites_micros = stats
475 24 : .read_lock_held_spawn_blocking_startup_micros
476 24 : .till_now();
477 24 :
478 24 : // Determine N largest holes where N is number of compacted layers.
479 24 : let max_holes = deltas_to_compact.len();
480 24 : let last_record_lsn = self.get_last_record_lsn();
481 24 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
482 24 : let min_hole_coverage_size = 3; // TODO: something more flexible?
483 24 :
484 24 : // min-heap (reserve space for one more element added before eviction)
485 24 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
486 24 : let mut prev: Option<Key> = None;
487 24 :
488 24 : let mut all_keys = Vec::new();
489 :
490 262 : for l in deltas_to_compact.iter() {
491 2276 : all_keys.extend(l.load_keys(ctx).await?);
492 : }
493 :
494 : // FIXME: should spawn_blocking the rest of this function
495 :
496 : // The current stdlib sorting implementation is designed in a way where it is
497 : // particularly fast where the slice is made up of sorted sub-ranges.
498 4235848 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
499 24 :
500 24 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
501 :
502 2041998 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
503 2041998 : if let Some(prev_key) = prev {
504 : // just first fast filter
505 2041974 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
506 0 : let key_range = prev_key..next_key;
507 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
508 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
509 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
510 0 : // That is why it is better to measure size of hole as number of covering image layers.
511 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
512 0 : if coverage_size >= min_hole_coverage_size {
513 0 : heap.push(Hole {
514 0 : key_range,
515 0 : coverage_size,
516 0 : });
517 0 : if heap.len() > max_holes {
518 0 : heap.pop(); // remove smallest hole
519 0 : }
520 0 : }
521 2041974 : }
522 24 : }
523 2041998 : prev = Some(next_key.next());
524 : }
525 24 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
526 24 : drop_rlock(guard);
527 24 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
528 24 : let mut holes = heap.into_vec();
529 24 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
530 24 : let mut next_hole = 0; // index of next hole in holes vector
531 24 :
532 24 : // This iterator walks through all key-value pairs from all the layers
533 24 : // we're compacting, in key, LSN order.
534 24 : let all_values_iter = all_keys.iter();
535 24 :
536 24 : // This iterator walks through all keys and is needed to calculate size used by each key
537 24 : let mut all_keys_iter = all_keys
538 24 : .iter()
539 2041998 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
540 2041974 : .coalesce(|mut prev, cur| {
541 2041974 : // Coalesce keys that belong to the same key pair.
542 2041974 : // This ensures that compaction doesn't put them
543 2041974 : // into different layer files.
544 2041974 : // Still limit this by the target file size,
545 2041974 : // so that we keep the size of the files in
546 2041974 : // check.
547 2041974 : if prev.0 == cur.0 && prev.2 < target_file_size {
548 20000 : prev.2 += cur.2;
549 20000 : Ok(prev)
550 : } else {
551 2021974 : Err((prev, cur))
552 : }
553 2041974 : });
554 24 :
555 24 : // Merge the contents of all the input delta layers into a new set
556 24 : // of delta layers, based on the current partitioning.
557 24 : //
558 24 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
559 24 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
560 24 : // would be too large. In that case, we also split on the LSN dimension.
561 24 : //
562 24 : // LSN
563 24 : // ^
564 24 : // |
565 24 : // | +-----------+ +--+--+--+--+
566 24 : // | | | | | | | |
567 24 : // | +-----------+ | | | | |
568 24 : // | | | | | | | |
569 24 : // | +-----------+ ==> | | | | |
570 24 : // | | | | | | | |
571 24 : // | +-----------+ | | | | |
572 24 : // | | | | | | | |
573 24 : // | +-----------+ +--+--+--+--+
574 24 : // |
575 24 : // +--------------> key
576 24 : //
577 24 : //
578 24 : // If one key (X) has a lot of page versions:
579 24 : //
580 24 : // LSN
581 24 : // ^
582 24 : // | (X)
583 24 : // | +-----------+ +--+--+--+--+
584 24 : // | | | | | | | |
585 24 : // | +-----------+ | | +--+ |
586 24 : // | | | | | | | |
587 24 : // | +-----------+ ==> | | | | |
588 24 : // | | | | | +--+ |
589 24 : // | +-----------+ | | | | |
590 24 : // | | | | | | | |
591 24 : // | +-----------+ +--+--+--+--+
592 24 : // |
593 24 : // +--------------> key
594 24 : // TODO: this actually divides the layers into fixed-size chunks, not
595 24 : // based on the partitioning.
596 24 : //
597 24 : // TODO: we should also opportunistically materialize and
598 24 : // garbage collect what we can.
599 24 : let mut new_layers = Vec::new();
600 24 : let mut prev_key: Option<Key> = None;
601 24 : let mut writer: Option<DeltaLayerWriter> = None;
602 24 : let mut key_values_total_size = 0u64;
603 24 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
604 24 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
605 :
606 : for &DeltaEntry {
607 2041998 : key, lsn, ref val, ..
608 2042022 : } in all_values_iter
609 : {
610 2041998 : let value = val.load(ctx).await?;
611 2041998 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
612 2041998 : // We need to check key boundaries once we reach next key or end of layer with the same key
613 2041998 : if !same_key || lsn == dup_end_lsn {
614 2021998 : let mut next_key_size = 0u64;
615 2021998 : let is_dup_layer = dup_end_lsn.is_valid();
616 2021998 : dup_start_lsn = Lsn::INVALID;
617 2021998 : if !same_key {
618 2021998 : dup_end_lsn = Lsn::INVALID;
619 2021998 : }
620 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
621 2021998 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
622 2021998 : next_key_size = next_size;
623 2021998 : if key != next_key {
624 2021974 : if dup_end_lsn.is_valid() {
625 0 : // We are writting segment with duplicates:
626 0 : // place all remaining values of this key in separate segment
627 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
628 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
629 2021974 : }
630 2021974 : break;
631 24 : }
632 24 : key_values_total_size += next_size;
633 24 : // Check if it is time to split segment: if total keys size is larger than target file size.
634 24 : // We need to avoid generation of empty segments if next_size > target_file_size.
635 24 : if key_values_total_size > target_file_size && lsn != next_lsn {
636 : // Split key between multiple layers: such layer can contain only single key
637 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
638 0 : dup_end_lsn // new segment with duplicates starts where old one stops
639 : } else {
640 0 : lsn // start with the first LSN for this key
641 : };
642 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
643 0 : break;
644 24 : }
645 : }
646 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
647 2021998 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
648 0 : dup_start_lsn = dup_end_lsn;
649 0 : dup_end_lsn = lsn_range.end;
650 2021998 : }
651 2021998 : if writer.is_some() {
652 2021974 : let written_size = writer.as_mut().unwrap().size();
653 2021974 : let contains_hole =
654 2021974 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
655 : // check if key cause layer overflow or contains hole...
656 2021974 : if is_dup_layer
657 2021974 : || dup_end_lsn.is_valid()
658 2021974 : || written_size + key_values_total_size > target_file_size
659 2021774 : || contains_hole
660 : {
661 : // ... if so, flush previous layer and prepare to write new one
662 200 : new_layers.push(
663 200 : writer
664 200 : .take()
665 200 : .unwrap()
666 200 : .finish(prev_key.unwrap().next(), self, ctx)
667 509 : .await?,
668 : );
669 200 : writer = None;
670 200 :
671 200 : if contains_hole {
672 0 : // skip hole
673 0 : next_hole += 1;
674 200 : }
675 2021774 : }
676 24 : }
677 : // Remember size of key value because at next iteration we will access next item
678 2021998 : key_values_total_size = next_key_size;
679 20000 : }
680 2041998 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
681 0 : Err(CompactionError::Other(anyhow::anyhow!(
682 0 : "failpoint delta-layer-writer-fail-before-finish"
683 0 : )))
684 2041998 : });
685 :
686 2041998 : if !self.shard_identity.is_key_disposable(&key) {
687 2041998 : if writer.is_none() {
688 224 : // Create writer if not initiaized yet
689 224 : writer = Some(
690 : DeltaLayerWriter::new(
691 224 : self.conf,
692 224 : self.timeline_id,
693 224 : self.tenant_shard_id,
694 224 : key,
695 224 : if dup_end_lsn.is_valid() {
696 : // this is a layer containing slice of values of the same key
697 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
698 0 : dup_start_lsn..dup_end_lsn
699 : } else {
700 224 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
701 224 : lsn_range.clone()
702 : },
703 : )
704 112 : .await?,
705 : );
706 2041774 : }
707 :
708 2041998 : writer
709 2041998 : .as_mut()
710 2041998 : .unwrap()
711 2041998 : .put_value(key, lsn, value, ctx)
712 1555 : .await?;
713 : } else {
714 0 : debug!(
715 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
716 0 : key,
717 0 : self.shard_identity.get_shard_number(&key)
718 : );
719 : }
720 :
721 2041998 : if !new_layers.is_empty() {
722 19786 : fail_point!("after-timeline-compacted-first-L1");
723 2022212 : }
724 :
725 2041998 : prev_key = Some(key);
726 : }
727 24 : if let Some(writer) = writer {
728 1957 : new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
729 0 : }
730 :
731 : // Sync layers
732 24 : if !new_layers.is_empty() {
733 : // Print a warning if the created layer is larger than double the target size
734 : // Add two pages for potential overhead. This should in theory be already
735 : // accounted for in the target calculation, but for very small targets,
736 : // we still might easily hit the limit otherwise.
737 24 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
738 224 : for layer in new_layers.iter() {
739 224 : if layer.layer_desc().file_size > warn_limit {
740 0 : warn!(
741 : %layer,
742 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
743 : );
744 224 : }
745 : }
746 :
747 : // The writer.finish() above already did the fsync of the inodes.
748 : // We just need to fsync the directory in which these inodes are linked,
749 : // which we know to be the timeline directory.
750 : //
751 : // We use fatal_err() below because the after writer.finish() returns with success,
752 : // the in-memory state of the filesystem already has the layer file in its final place,
753 : // and subsequent pageserver code could think it's durable while it really isn't.
754 24 : let timeline_dir = VirtualFile::open(
755 24 : &self
756 24 : .conf
757 24 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
758 24 : )
759 12 : .await
760 24 : .fatal_err("VirtualFile::open for timeline dir fsync");
761 24 : timeline_dir
762 24 : .sync_all()
763 12 : .await
764 24 : .fatal_err("VirtualFile::sync_all timeline dir");
765 0 : }
766 :
767 24 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
768 24 : stats.new_deltas_count = Some(new_layers.len());
769 224 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
770 24 :
771 24 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
772 24 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
773 : {
774 24 : Ok(stats_json) => {
775 24 : info!(
776 0 : stats_json = stats_json.as_str(),
777 0 : "compact_level0_phase1 stats available"
778 : )
779 : }
780 0 : Err(e) => {
781 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
782 : }
783 : }
784 :
785 24 : Ok(CompactLevel0Phase1Result {
786 24 : new_layers,
787 24 : deltas_to_compact: deltas_to_compact
788 24 : .into_iter()
789 262 : .map(|x| x.drop_eviction_guard())
790 24 : .collect::<Vec<_>>(),
791 24 : })
792 330 : }
793 : }
794 :
795 : #[derive(Default)]
796 : struct CompactLevel0Phase1Result {
797 : new_layers: Vec<ResidentLayer>,
798 : deltas_to_compact: Vec<Layer>,
799 : }
800 :
801 : #[derive(Default)]
802 : struct CompactLevel0Phase1StatsBuilder {
803 : version: Option<u64>,
804 : tenant_id: Option<TenantShardId>,
805 : timeline_id: Option<TimelineId>,
806 : read_lock_acquisition_micros: DurationRecorder,
807 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
808 : read_lock_held_key_sort_micros: DurationRecorder,
809 : read_lock_held_prerequisites_micros: DurationRecorder,
810 : read_lock_held_compute_holes_micros: DurationRecorder,
811 : read_lock_drop_micros: DurationRecorder,
812 : write_layer_files_micros: DurationRecorder,
813 : level0_deltas_count: Option<usize>,
814 : new_deltas_count: Option<usize>,
815 : new_deltas_size: Option<u64>,
816 : }
817 :
818 : #[derive(serde::Serialize)]
819 : struct CompactLevel0Phase1Stats {
820 : version: u64,
821 : tenant_id: TenantShardId,
822 : timeline_id: TimelineId,
823 : read_lock_acquisition_micros: RecordedDuration,
824 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
825 : read_lock_held_key_sort_micros: RecordedDuration,
826 : read_lock_held_prerequisites_micros: RecordedDuration,
827 : read_lock_held_compute_holes_micros: RecordedDuration,
828 : read_lock_drop_micros: RecordedDuration,
829 : write_layer_files_micros: RecordedDuration,
830 : level0_deltas_count: usize,
831 : new_deltas_count: usize,
832 : new_deltas_size: u64,
833 : }
834 :
835 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
836 : type Error = anyhow::Error;
837 :
838 24 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
839 24 : Ok(Self {
840 24 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
841 24 : tenant_id: value
842 24 : .tenant_id
843 24 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
844 24 : timeline_id: value
845 24 : .timeline_id
846 24 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
847 24 : read_lock_acquisition_micros: value
848 24 : .read_lock_acquisition_micros
849 24 : .into_recorded()
850 24 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
851 24 : read_lock_held_spawn_blocking_startup_micros: value
852 24 : .read_lock_held_spawn_blocking_startup_micros
853 24 : .into_recorded()
854 24 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
855 24 : read_lock_held_key_sort_micros: value
856 24 : .read_lock_held_key_sort_micros
857 24 : .into_recorded()
858 24 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
859 24 : read_lock_held_prerequisites_micros: value
860 24 : .read_lock_held_prerequisites_micros
861 24 : .into_recorded()
862 24 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
863 24 : read_lock_held_compute_holes_micros: value
864 24 : .read_lock_held_compute_holes_micros
865 24 : .into_recorded()
866 24 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
867 24 : read_lock_drop_micros: value
868 24 : .read_lock_drop_micros
869 24 : .into_recorded()
870 24 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
871 24 : write_layer_files_micros: value
872 24 : .write_layer_files_micros
873 24 : .into_recorded()
874 24 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
875 24 : level0_deltas_count: value
876 24 : .level0_deltas_count
877 24 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
878 24 : new_deltas_count: value
879 24 : .new_deltas_count
880 24 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
881 24 : new_deltas_size: value
882 24 : .new_deltas_size
883 24 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
884 : })
885 24 : }
886 : }
887 :
888 : impl Timeline {
889 : /// Entry point for new tiered compaction algorithm.
890 : ///
891 : /// All the real work is in the implementation in the pageserver_compaction
892 : /// crate. The code here would apply to any algorithm implemented by the
893 : /// same interface, but tiered is the only one at the moment.
894 : ///
895 : /// TODO: cancellation
896 0 : pub(crate) async fn compact_tiered(
897 0 : self: &Arc<Self>,
898 0 : _cancel: &CancellationToken,
899 0 : ctx: &RequestContext,
900 0 : ) -> Result<(), CompactionError> {
901 0 : let fanout = self.get_compaction_threshold() as u64;
902 0 : let target_file_size = self.get_checkpoint_distance();
903 :
904 : // Find the top of the historical layers
905 0 : let end_lsn = {
906 0 : let guard = self.layers.read().await;
907 0 : let layers = guard.layer_map();
908 :
909 0 : let l0_deltas = layers.get_level0_deltas()?;
910 0 : drop(guard);
911 0 :
912 0 : // As an optimization, if we find that there are too few L0 layers,
913 0 : // bail out early. We know that the compaction algorithm would do
914 0 : // nothing in that case.
915 0 : if l0_deltas.len() < fanout as usize {
916 : // doesn't need compacting
917 0 : return Ok(());
918 0 : }
919 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
920 0 : };
921 0 :
922 0 : // Is the timeline being deleted?
923 0 : if self.is_stopping() {
924 0 : trace!("Dropping out of compaction on timeline shutdown");
925 0 : return Err(CompactionError::ShuttingDown);
926 0 : }
927 :
928 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
929 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
930 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
931 0 :
932 0 : pageserver_compaction::compact_tiered::compact_tiered(
933 0 : &mut adaptor,
934 0 : end_lsn,
935 0 : target_file_size,
936 0 : fanout,
937 0 : ctx,
938 0 : )
939 0 : .await?;
940 :
941 0 : adaptor.flush_updates().await?;
942 0 : Ok(())
943 0 : }
944 : }
945 :
946 : struct TimelineAdaptor {
947 : timeline: Arc<Timeline>,
948 :
949 : keyspace: (Lsn, KeySpace),
950 :
951 : new_deltas: Vec<ResidentLayer>,
952 : new_images: Vec<ResidentLayer>,
953 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
954 : }
955 :
956 : impl TimelineAdaptor {
957 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
958 0 : Self {
959 0 : timeline: timeline.clone(),
960 0 : keyspace,
961 0 : new_images: Vec::new(),
962 0 : new_deltas: Vec::new(),
963 0 : layers_to_delete: Vec::new(),
964 0 : }
965 0 : }
966 :
967 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
968 0 : let layers_to_delete = {
969 0 : let guard = self.timeline.layers.read().await;
970 0 : self.layers_to_delete
971 0 : .iter()
972 0 : .map(|x| guard.get_from_desc(x))
973 0 : .collect::<Vec<Layer>>()
974 0 : };
975 0 : self.timeline
976 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
977 0 : .await?;
978 :
979 0 : self.timeline
980 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
981 :
982 0 : self.new_deltas.clear();
983 0 : self.layers_to_delete.clear();
984 0 : Ok(())
985 0 : }
986 : }
987 :
988 : #[derive(Clone)]
989 : struct ResidentDeltaLayer(ResidentLayer);
990 : #[derive(Clone)]
991 : struct ResidentImageLayer(ResidentLayer);
992 :
993 : impl CompactionJobExecutor for TimelineAdaptor {
994 : type Key = crate::repository::Key;
995 :
996 : type Layer = OwnArc<PersistentLayerDesc>;
997 : type DeltaLayer = ResidentDeltaLayer;
998 : type ImageLayer = ResidentImageLayer;
999 :
1000 : type RequestContext = crate::context::RequestContext;
1001 :
1002 0 : fn get_shard_identity(&self) -> &ShardIdentity {
1003 0 : self.timeline.get_shard_identity()
1004 0 : }
1005 :
1006 0 : async fn get_layers(
1007 0 : &mut self,
1008 0 : key_range: &Range<Key>,
1009 0 : lsn_range: &Range<Lsn>,
1010 0 : _ctx: &RequestContext,
1011 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
1012 0 : self.flush_updates().await?;
1013 :
1014 0 : let guard = self.timeline.layers.read().await;
1015 0 : let layer_map = guard.layer_map();
1016 0 :
1017 0 : let result = layer_map
1018 0 : .iter_historic_layers()
1019 0 : .filter(|l| {
1020 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
1021 0 : })
1022 0 : .map(OwnArc)
1023 0 : .collect();
1024 0 : Ok(result)
1025 0 : }
1026 :
1027 0 : async fn get_keyspace(
1028 0 : &mut self,
1029 0 : key_range: &Range<Key>,
1030 0 : lsn: Lsn,
1031 0 : _ctx: &RequestContext,
1032 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
1033 0 : if lsn == self.keyspace.0 {
1034 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
1035 0 : &self.keyspace.1.ranges,
1036 0 : key_range,
1037 0 : ))
1038 : } else {
1039 : // The current compaction implementatin only ever requests the key space
1040 : // at the compaction end LSN.
1041 0 : anyhow::bail!("keyspace not available for requested lsn");
1042 : }
1043 0 : }
1044 :
1045 0 : async fn downcast_delta_layer(
1046 0 : &self,
1047 0 : layer: &OwnArc<PersistentLayerDesc>,
1048 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
1049 0 : // this is a lot more complex than a simple downcast...
1050 0 : if layer.is_delta() {
1051 0 : let l = {
1052 0 : let guard = self.timeline.layers.read().await;
1053 0 : guard.get_from_desc(layer)
1054 : };
1055 0 : let result = l.download_and_keep_resident().await?;
1056 :
1057 0 : Ok(Some(ResidentDeltaLayer(result)))
1058 : } else {
1059 0 : Ok(None)
1060 : }
1061 0 : }
1062 :
1063 0 : async fn create_image(
1064 0 : &mut self,
1065 0 : lsn: Lsn,
1066 0 : key_range: &Range<Key>,
1067 0 : ctx: &RequestContext,
1068 0 : ) -> anyhow::Result<()> {
1069 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
1070 0 : }
1071 :
1072 0 : async fn create_delta(
1073 0 : &mut self,
1074 0 : lsn_range: &Range<Lsn>,
1075 0 : key_range: &Range<Key>,
1076 0 : input_layers: &[ResidentDeltaLayer],
1077 0 : ctx: &RequestContext,
1078 0 : ) -> anyhow::Result<()> {
1079 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1080 :
1081 0 : let mut all_entries = Vec::new();
1082 0 : for dl in input_layers.iter() {
1083 0 : all_entries.extend(dl.load_keys(ctx).await?);
1084 : }
1085 :
1086 : // The current stdlib sorting implementation is designed in a way where it is
1087 : // particularly fast where the slice is made up of sorted sub-ranges.
1088 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1089 :
1090 0 : let mut writer = DeltaLayerWriter::new(
1091 0 : self.timeline.conf,
1092 0 : self.timeline.timeline_id,
1093 0 : self.timeline.tenant_shard_id,
1094 0 : key_range.start,
1095 0 : lsn_range.clone(),
1096 0 : )
1097 0 : .await?;
1098 :
1099 0 : let mut dup_values = 0;
1100 0 :
1101 0 : // This iterator walks through all key-value pairs from all the layers
1102 0 : // we're compacting, in key, LSN order.
1103 0 : let mut prev: Option<(Key, Lsn)> = None;
1104 : for &DeltaEntry {
1105 0 : key, lsn, ref val, ..
1106 0 : } in all_entries.iter()
1107 : {
1108 0 : if prev == Some((key, lsn)) {
1109 : // This is a duplicate. Skip it.
1110 : //
1111 : // It can happen if compaction is interrupted after writing some
1112 : // layers but not all, and we are compacting the range again.
1113 : // The calculations in the algorithm assume that there are no
1114 : // duplicates, so the math on targeted file size is likely off,
1115 : // and we will create smaller files than expected.
1116 0 : dup_values += 1;
1117 0 : continue;
1118 0 : }
1119 :
1120 0 : let value = val.load(ctx).await?;
1121 :
1122 0 : writer.put_value(key, lsn, value, ctx).await?;
1123 :
1124 0 : prev = Some((key, lsn));
1125 : }
1126 :
1127 0 : if dup_values > 0 {
1128 0 : warn!("delta layer created with {} duplicate values", dup_values);
1129 0 : }
1130 :
1131 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1132 0 : Err(anyhow::anyhow!(
1133 0 : "failpoint delta-layer-writer-fail-before-finish"
1134 0 : ))
1135 0 : });
1136 :
1137 0 : let new_delta_layer = writer
1138 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
1139 0 : .await?;
1140 :
1141 0 : self.new_deltas.push(new_delta_layer);
1142 0 : Ok(())
1143 0 : }
1144 :
1145 0 : async fn delete_layer(
1146 0 : &mut self,
1147 0 : layer: &OwnArc<PersistentLayerDesc>,
1148 0 : _ctx: &RequestContext,
1149 0 : ) -> anyhow::Result<()> {
1150 0 : self.layers_to_delete.push(layer.clone().0);
1151 0 : Ok(())
1152 0 : }
1153 : }
1154 :
1155 : impl TimelineAdaptor {
1156 0 : async fn create_image_impl(
1157 0 : &mut self,
1158 0 : lsn: Lsn,
1159 0 : key_range: &Range<Key>,
1160 0 : ctx: &RequestContext,
1161 0 : ) -> Result<(), PageReconstructError> {
1162 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
1163 :
1164 0 : let mut image_layer_writer = ImageLayerWriter::new(
1165 0 : self.timeline.conf,
1166 0 : self.timeline.timeline_id,
1167 0 : self.timeline.tenant_shard_id,
1168 0 : key_range,
1169 0 : lsn,
1170 0 : )
1171 0 : .await?;
1172 :
1173 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1174 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1175 0 : "failpoint image-layer-writer-fail-before-finish"
1176 0 : )))
1177 0 : });
1178 0 : let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
1179 0 : for range in &keyspace_ranges {
1180 0 : let mut key = range.start;
1181 0 : while key < range.end {
1182 0 : let img = match self.timeline.get(key, lsn, ctx).await {
1183 0 : Ok(img) => img,
1184 0 : Err(err) => {
1185 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
1186 0 : // page without losing any actual user data. That seems better
1187 0 : // than failing repeatedly and getting stuck.
1188 0 : //
1189 0 : // We had a bug at one point, where we truncated the FSM and VM
1190 0 : // in the pageserver, but the Postgres didn't know about that
1191 0 : // and continued to generate incremental WAL records for pages
1192 0 : // that didn't exist in the pageserver. Trying to replay those
1193 0 : // WAL records failed to find the previous image of the page.
1194 0 : // This special case allows us to recover from that situation.
1195 0 : // See https://github.com/neondatabase/neon/issues/2601.
1196 0 : //
1197 0 : // Unfortunately we cannot do this for the main fork, or for
1198 0 : // any metadata keys, keys, as that would lead to actual data
1199 0 : // loss.
1200 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
1201 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
1202 0 : ZERO_PAGE.clone()
1203 : } else {
1204 0 : return Err(err);
1205 : }
1206 : }
1207 : };
1208 0 : image_layer_writer.put_image(key, img, ctx).await?;
1209 0 : key = key.next();
1210 : }
1211 : }
1212 0 : let image_layer = image_layer_writer.finish(&self.timeline, ctx).await?;
1213 :
1214 0 : self.new_images.push(image_layer);
1215 0 :
1216 0 : timer.stop_and_record();
1217 0 :
1218 0 : Ok(())
1219 0 : }
1220 : }
1221 :
1222 : impl CompactionRequestContext for crate::context::RequestContext {}
1223 :
1224 : #[derive(Debug, Clone)]
1225 : pub struct OwnArc<T>(pub Arc<T>);
1226 :
1227 : impl<T> Deref for OwnArc<T> {
1228 : type Target = <Arc<T> as Deref>::Target;
1229 0 : fn deref(&self) -> &Self::Target {
1230 0 : &self.0
1231 0 : }
1232 : }
1233 :
1234 : impl<T> AsRef<T> for OwnArc<T> {
1235 0 : fn as_ref(&self) -> &T {
1236 0 : self.0.as_ref()
1237 0 : }
1238 : }
1239 :
1240 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1241 0 : fn key_range(&self) -> &Range<Key> {
1242 0 : &self.key_range
1243 0 : }
1244 0 : fn lsn_range(&self) -> &Range<Lsn> {
1245 0 : &self.lsn_range
1246 0 : }
1247 0 : fn file_size(&self) -> u64 {
1248 0 : self.file_size
1249 0 : }
1250 0 : fn short_id(&self) -> std::string::String {
1251 0 : self.as_ref().short_id().to_string()
1252 0 : }
1253 0 : fn is_delta(&self) -> bool {
1254 0 : self.as_ref().is_delta()
1255 0 : }
1256 : }
1257 :
1258 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1259 0 : fn key_range(&self) -> &Range<Key> {
1260 0 : &self.layer_desc().key_range
1261 0 : }
1262 0 : fn lsn_range(&self) -> &Range<Lsn> {
1263 0 : &self.layer_desc().lsn_range
1264 0 : }
1265 0 : fn file_size(&self) -> u64 {
1266 0 : self.layer_desc().file_size
1267 0 : }
1268 0 : fn short_id(&self) -> std::string::String {
1269 0 : self.layer_desc().short_id().to_string()
1270 0 : }
1271 0 : fn is_delta(&self) -> bool {
1272 0 : true
1273 0 : }
1274 : }
1275 :
1276 : use crate::tenant::timeline::DeltaEntry;
1277 :
1278 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1279 0 : fn key_range(&self) -> &Range<Key> {
1280 0 : &self.0.layer_desc().key_range
1281 0 : }
1282 0 : fn lsn_range(&self) -> &Range<Lsn> {
1283 0 : &self.0.layer_desc().lsn_range
1284 0 : }
1285 0 : fn file_size(&self) -> u64 {
1286 0 : self.0.layer_desc().file_size
1287 0 : }
1288 0 : fn short_id(&self) -> std::string::String {
1289 0 : self.0.layer_desc().short_id().to_string()
1290 0 : }
1291 0 : fn is_delta(&self) -> bool {
1292 0 : true
1293 0 : }
1294 : }
1295 :
1296 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1297 : type DeltaEntry<'a> = DeltaEntry<'a>;
1298 :
1299 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1300 0 : self.0.load_keys(ctx).await
1301 0 : }
1302 : }
1303 :
1304 : impl CompactionLayer<Key> for ResidentImageLayer {
1305 0 : fn key_range(&self) -> &Range<Key> {
1306 0 : &self.0.layer_desc().key_range
1307 0 : }
1308 0 : fn lsn_range(&self) -> &Range<Lsn> {
1309 0 : &self.0.layer_desc().lsn_range
1310 0 : }
1311 0 : fn file_size(&self) -> u64 {
1312 0 : self.0.layer_desc().file_size
1313 0 : }
1314 0 : fn short_id(&self) -> std::string::String {
1315 0 : self.0.layer_desc().short_id().to_string()
1316 0 : }
1317 0 : fn is_delta(&self) -> bool {
1318 0 : false
1319 0 : }
1320 : }
1321 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|