Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{CompactFlags, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline};
13 :
14 : use anyhow::{anyhow, Context};
15 : use enumset::EnumSet;
16 : use fail::fail_point;
17 : use itertools::Itertools;
18 : use pageserver_api::keyspace::ShardedRange;
19 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, info_span, trace, warn, Instrument};
22 : use utils::id::TimelineId;
23 :
24 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
25 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
26 : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
27 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
28 : use crate::tenant::timeline::{Layer, ResidentLayer};
29 : use crate::tenant::DeltaLayer;
30 : use crate::tenant::PageReconstructError;
31 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
32 : use crate::{page_cache, ZERO_PAGE};
33 :
34 : use crate::keyspace::KeySpace;
35 : use crate::repository::Key;
36 :
37 : use utils::lsn::Lsn;
38 :
39 : use pageserver_compaction::helpers::overlaps_with;
40 : use pageserver_compaction::interface::*;
41 :
42 : use super::CompactionError;
43 :
44 : impl Timeline {
45 : /// TODO: cancellation
46 366 : pub(crate) async fn compact_legacy(
47 366 : self: &Arc<Self>,
48 366 : _cancel: &CancellationToken,
49 366 : flags: EnumSet<CompactFlags>,
50 366 : ctx: &RequestContext,
51 366 : ) -> Result<(), CompactionError> {
52 366 : // High level strategy for compaction / image creation:
53 366 : //
54 366 : // 1. First, calculate the desired "partitioning" of the
55 366 : // currently in-use key space. The goal is to partition the
56 366 : // key space into roughly fixed-size chunks, but also take into
57 366 : // account any existing image layers, and try to align the
58 366 : // chunk boundaries with the existing image layers to avoid
59 366 : // too much churn. Also try to align chunk boundaries with
60 366 : // relation boundaries. In principle, we don't know about
61 366 : // relation boundaries here, we just deal with key-value
62 366 : // pairs, and the code in pgdatadir_mapping.rs knows how to
63 366 : // map relations into key-value pairs. But in practice we know
64 366 : // that 'field6' is the block number, and the fields 1-5
65 366 : // identify a relation. This is just an optimization,
66 366 : // though.
67 366 : //
68 366 : // 2. Once we know the partitioning, for each partition,
69 366 : // decide if it's time to create a new image layer. The
70 366 : // criteria is: there has been too much "churn" since the last
71 366 : // image layer? The "churn" is fuzzy concept, it's a
72 366 : // combination of too many delta files, or too much WAL in
73 366 : // total in the delta file. Or perhaps: if creating an image
74 366 : // file would allow to delete some older files.
75 366 : //
76 366 : // 3. After that, we compact all level0 delta files if there
77 366 : // are too many of them. While compacting, we also garbage
78 366 : // collect any page versions that are no longer needed because
79 366 : // of the new image layers we created in step 2.
80 366 : //
81 366 : // TODO: This high level strategy hasn't been implemented yet.
82 366 : // Below are functions compact_level0() and create_image_layers()
83 366 : // but they are a bit ad hoc and don't quite work like it's explained
84 366 : // above. Rewrite it.
85 366 :
86 366 : // Is the timeline being deleted?
87 366 : if self.is_stopping() {
88 0 : trace!("Dropping out of compaction on timeline shutdown");
89 0 : return Err(CompactionError::ShuttingDown);
90 366 : }
91 366 :
92 366 : let target_file_size = self.get_checkpoint_distance();
93 :
94 : // Define partitioning schema if needed
95 :
96 : // FIXME: the match should only cover repartitioning, not the next steps
97 366 : let partition_count = match self
98 366 : .repartition(
99 366 : self.get_last_record_lsn(),
100 366 : self.get_compaction_target_size(),
101 366 : flags,
102 366 : ctx,
103 366 : )
104 13419 : .await
105 : {
106 366 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
107 366 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
108 366 : let image_ctx = RequestContextBuilder::extend(ctx)
109 366 : .access_stats_behavior(AccessStatsBehavior::Skip)
110 366 : .build();
111 366 :
112 366 : // 2. Compact
113 366 : let timer = self.metrics.compact_time_histo.start_timer();
114 42316 : self.compact_level0(target_file_size, ctx).await?;
115 366 : timer.stop_and_record();
116 366 :
117 366 : // 3. Create new image layers for partitions that have been modified
118 366 : // "enough".
119 366 : let mut partitioning = dense_partitioning;
120 366 : partitioning
121 366 : .parts
122 366 : .extend(sparse_partitioning.into_dense().parts);
123 366 : let image_layers = self
124 366 : .create_image_layers(
125 366 : &partitioning,
126 366 : lsn,
127 366 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
128 16 : ImageLayerCreationMode::Force
129 : } else {
130 350 : ImageLayerCreationMode::Try
131 : },
132 366 : &image_ctx,
133 : )
134 14689 : .await
135 366 : .map_err(anyhow::Error::from)?;
136 :
137 366 : self.upload_new_image_layers(image_layers)?;
138 366 : partitioning.parts.len()
139 : }
140 0 : Err(err) => {
141 0 : // no partitioning? This is normal, if the timeline was just created
142 0 : // as an empty timeline. Also in unit tests, when we use the timeline
143 0 : // as a simple key-value store, ignoring the datadir layout. Log the
144 0 : // error but continue.
145 0 : //
146 0 : // Suppress error when it's due to cancellation
147 0 : if !self.cancel.is_cancelled() {
148 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
149 0 : }
150 0 : 1
151 : }
152 : };
153 :
154 366 : if self.shard_identity.count >= ShardCount::new(2) {
155 : // Limit the number of layer rewrites to the number of partitions: this means its
156 : // runtime should be comparable to a full round of image layer creations, rather than
157 : // being potentially much longer.
158 0 : let rewrite_max = partition_count;
159 0 :
160 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
161 366 : }
162 :
163 366 : Ok(())
164 366 : }
165 :
166 : /// Check for layers that are elegible to be rewritten:
167 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
168 : /// we don't indefinitely retain keys in this shard that aren't needed.
169 : /// - For future use: layers beyond pitr_interval that are in formats we would
170 : /// rather not maintain compatibility with indefinitely.
171 : ///
172 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
173 : /// how much work it will try to do in each compaction pass.
174 0 : async fn compact_shard_ancestors(
175 0 : self: &Arc<Self>,
176 0 : rewrite_max: usize,
177 0 : _ctx: &RequestContext,
178 0 : ) -> anyhow::Result<()> {
179 0 : let mut drop_layers = Vec::new();
180 0 : let layers_to_rewrite: Vec<Layer> = Vec::new();
181 0 :
182 0 : // We will use the PITR cutoff as a condition for rewriting layers.
183 0 : let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
184 :
185 0 : let layers = self.layers.read().await;
186 0 : for layer_desc in layers.layer_map().iter_historic_layers() {
187 0 : let layer = layers.get_from_desc(&layer_desc);
188 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
189 : // This layer does not belong to a historic ancestor, no need to re-image it.
190 0 : continue;
191 0 : }
192 0 :
193 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
194 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
195 0 : let layer_local_page_count = sharded_range.page_count();
196 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
197 0 : if layer_local_page_count == 0 {
198 : // This ancestral layer only covers keys that belong to other shards.
199 : // We include the full metadata in the log: if we had some critical bug that caused
200 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
201 0 : info!(%layer, old_metadata=?layer.metadata(),
202 0 : "dropping layer after shard split, contains no keys for this shard.",
203 : );
204 :
205 0 : if cfg!(debug_assertions) {
206 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
207 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
208 : // should be !is_key_disposable()
209 0 : let range = layer_desc.get_key_range();
210 0 : let mut key = range.start;
211 0 : while key < range.end {
212 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
213 0 : key = key.next();
214 : }
215 0 : }
216 :
217 0 : drop_layers.push(layer);
218 0 : continue;
219 0 : } else if layer_local_page_count != u32::MAX
220 0 : && layer_local_page_count == layer_raw_page_count
221 : {
222 0 : debug!(%layer,
223 0 : "layer is entirely shard local ({} keys), no need to filter it",
224 : layer_local_page_count
225 : );
226 0 : continue;
227 0 : }
228 0 :
229 0 : // Don't bother re-writing a layer unless it will at least halve its size
230 0 : if layer_local_page_count != u32::MAX
231 0 : && layer_local_page_count > layer_raw_page_count / 2
232 : {
233 0 : debug!(%layer,
234 0 : "layer is already mostly local ({}/{}), not rewriting",
235 : layer_local_page_count,
236 : layer_raw_page_count
237 : );
238 0 : }
239 :
240 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
241 : // without incurring the I/O cost of a rewrite.
242 0 : if layer_desc.get_lsn_range().end >= pitr_cutoff {
243 0 : debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
244 0 : layer_desc.get_lsn_range().end, pitr_cutoff);
245 0 : continue;
246 0 : }
247 0 :
248 0 : if layer_desc.is_delta() {
249 : // We do not yet implement rewrite of delta layers
250 0 : debug!(%layer, "Skipping rewrite of delta layer");
251 0 : continue;
252 0 : }
253 0 :
254 0 : // Only rewrite layers if they would have different remote paths: either they belong to this
255 0 : // shard but an old generation, or they belonged to another shard. This also implicitly
256 0 : // guarantees that the layer is persistent in remote storage (as only remote persistent
257 0 : // layers are carried across shard splits, any local-only layer would be in the current generation)
258 0 : if layer.metadata().generation == self.generation
259 0 : && layer.metadata().shard.shard_count == self.shard_identity.count
260 : {
261 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
262 0 : continue;
263 0 : }
264 0 :
265 0 : if layers_to_rewrite.len() >= rewrite_max {
266 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
267 0 : layers_to_rewrite.len()
268 : );
269 0 : continue;
270 0 : }
271 0 :
272 0 : // Fall through: all our conditions for doing a rewrite passed.
273 0 : // TODO: implement rewriting
274 0 : tracing::debug!(%layer, "Would rewrite layer");
275 : }
276 :
277 : // Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
278 0 : drop(layers);
279 0 :
280 0 : // TODO: collect layers to rewrite
281 0 : let replace_layers = Vec::new();
282 0 :
283 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
284 0 : self.rewrite_layers(replace_layers, drop_layers).await?;
285 :
286 : // We wait for all uploads to complete before finishing this compaction stage. This is not
287 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
288 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
289 : // load.
290 0 : self.remote_client.wait_completion().await?;
291 :
292 0 : Ok(())
293 0 : }
294 :
295 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
296 : /// as Level 1 files.
297 366 : async fn compact_level0(
298 366 : self: &Arc<Self>,
299 366 : target_file_size: u64,
300 366 : ctx: &RequestContext,
301 366 : ) -> Result<(), CompactionError> {
302 : let CompactLevel0Phase1Result {
303 366 : new_layers,
304 366 : deltas_to_compact,
305 : } = {
306 366 : let phase1_span = info_span!("compact_level0_phase1");
307 366 : let ctx = ctx.attached_child();
308 366 : let mut stats = CompactLevel0Phase1StatsBuilder {
309 366 : version: Some(2),
310 366 : tenant_id: Some(self.tenant_shard_id),
311 366 : timeline_id: Some(self.timeline_id),
312 366 : ..Default::default()
313 366 : };
314 366 :
315 366 : let begin = tokio::time::Instant::now();
316 366 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
317 366 : let now = tokio::time::Instant::now();
318 366 : stats.read_lock_acquisition_micros =
319 366 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
320 366 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
321 366 : .instrument(phase1_span)
322 42315 : .await?
323 : };
324 :
325 366 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
326 : // nothing to do
327 338 : return Ok(());
328 28 : }
329 28 :
330 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
331 0 : .await?;
332 28 : Ok(())
333 366 : }
334 :
335 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
336 366 : async fn compact_level0_phase1(
337 366 : self: &Arc<Self>,
338 366 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
339 366 : mut stats: CompactLevel0Phase1StatsBuilder,
340 366 : target_file_size: u64,
341 366 : ctx: &RequestContext,
342 366 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
343 366 : stats.read_lock_held_spawn_blocking_startup_micros =
344 366 : stats.read_lock_acquisition_micros.till_now(); // set by caller
345 366 : let layers = guard.layer_map();
346 366 : let level0_deltas = layers.get_level0_deltas()?;
347 366 : let mut level0_deltas = level0_deltas
348 366 : .into_iter()
349 1542 : .map(|x| guard.get_from_desc(&x))
350 366 : .collect_vec();
351 366 : stats.level0_deltas_count = Some(level0_deltas.len());
352 366 : // Only compact if enough layers have accumulated.
353 366 : let threshold = self.get_compaction_threshold();
354 366 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
355 338 : debug!(
356 0 : level0_deltas = level0_deltas.len(),
357 0 : threshold, "too few deltas to compact"
358 : );
359 338 : return Ok(CompactLevel0Phase1Result::default());
360 28 : }
361 28 :
362 28 : // This failpoint is used together with `test_duplicate_layers` integration test.
363 28 : // It returns the compaction result exactly the same layers as input to compaction.
364 28 : // We want to ensure that this will not cause any problem when updating the layer map
365 28 : // after the compaction is finished.
366 28 : //
367 28 : // Currently, there are two rare edge cases that will cause duplicated layers being
368 28 : // inserted.
369 28 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
370 28 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
371 28 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
372 28 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
373 28 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
374 28 : // layer replace instead of the normal remove / upload process.
375 28 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
376 28 : // size length. Compaction will likely create the same set of n files afterwards.
377 28 : //
378 28 : // This failpoint is a superset of both of the cases.
379 28 : if cfg!(feature = "testing") {
380 28 : let active = (|| {
381 28 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
382 28 : false
383 28 : })();
384 28 :
385 28 : if active {
386 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
387 0 : for delta in &level0_deltas {
388 : // we are just faking these layers as being produced again for this failpoint
389 0 : new_layers.push(
390 0 : delta
391 0 : .download_and_keep_resident()
392 0 : .await
393 0 : .context("download layer for failpoint")?,
394 : );
395 : }
396 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
397 0 : return Ok(CompactLevel0Phase1Result {
398 0 : new_layers,
399 0 : deltas_to_compact: level0_deltas,
400 0 : });
401 28 : }
402 0 : }
403 :
404 : // Gather the files to compact in this iteration.
405 : //
406 : // Start with the oldest Level 0 delta file, and collect any other
407 : // level 0 files that form a contiguous sequence, such that the end
408 : // LSN of previous file matches the start LSN of the next file.
409 : //
410 : // Note that if the files don't form such a sequence, we might
411 : // "compact" just a single file. That's a bit pointless, but it allows
412 : // us to get rid of the level 0 file, and compact the other files on
413 : // the next iteration. This could probably made smarter, but such
414 : // "gaps" in the sequence of level 0 files should only happen in case
415 : // of a crash, partial download from cloud storage, or something like
416 : // that, so it's not a big deal in practice.
417 588 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
418 28 : let mut level0_deltas_iter = level0_deltas.iter();
419 28 :
420 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
421 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
422 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
423 28 :
424 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
425 322 : for l in level0_deltas_iter {
426 294 : let lsn_range = &l.layer_desc().lsn_range;
427 294 :
428 294 : if lsn_range.start != prev_lsn_end {
429 0 : break;
430 294 : }
431 294 : deltas_to_compact.push(l.download_and_keep_resident().await?);
432 294 : prev_lsn_end = lsn_range.end;
433 : }
434 28 : let lsn_range = Range {
435 28 : start: deltas_to_compact
436 28 : .first()
437 28 : .unwrap()
438 28 : .layer_desc()
439 28 : .lsn_range
440 28 : .start,
441 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
442 28 : };
443 28 :
444 28 : info!(
445 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
446 0 : lsn_range.start,
447 0 : lsn_range.end,
448 0 : deltas_to_compact.len(),
449 0 : level0_deltas.len()
450 : );
451 :
452 322 : for l in deltas_to_compact.iter() {
453 322 : info!("compact includes {l}");
454 : }
455 :
456 : // We don't need the original list of layers anymore. Drop it so that
457 : // we don't accidentally use it later in the function.
458 28 : drop(level0_deltas);
459 28 :
460 28 : stats.read_lock_held_prerequisites_micros = stats
461 28 : .read_lock_held_spawn_blocking_startup_micros
462 28 : .till_now();
463 28 :
464 28 : // Determine N largest holes where N is number of compacted layers.
465 28 : let max_holes = deltas_to_compact.len();
466 28 : let last_record_lsn = self.get_last_record_lsn();
467 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
468 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
469 28 :
470 28 : // min-heap (reserve space for one more element added before eviction)
471 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
472 28 : let mut prev: Option<Key> = None;
473 28 :
474 28 : let mut all_keys = Vec::new();
475 :
476 322 : for l in deltas_to_compact.iter() {
477 2390 : all_keys.extend(l.load_keys(ctx).await?);
478 : }
479 :
480 : // FIXME: should spawn_blocking the rest of this function
481 :
482 : // The current stdlib sorting implementation is designed in a way where it is
483 : // particularly fast where the slice is made up of sorted sub-ranges.
484 4431762 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
485 28 :
486 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
487 :
488 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
489 2064038 : if let Some(prev_key) = prev {
490 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
491 : // compaction is the gap between data key and metadata keys.
492 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
493 0 : && !Key::is_metadata_key(&prev_key)
494 : {
495 0 : let key_range = prev_key..next_key;
496 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
497 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
498 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
499 0 : // That is why it is better to measure size of hole as number of covering image layers.
500 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
501 0 : if coverage_size >= min_hole_coverage_size {
502 0 : heap.push(Hole {
503 0 : key_range,
504 0 : coverage_size,
505 0 : });
506 0 : if heap.len() > max_holes {
507 0 : heap.pop(); // remove smallest hole
508 0 : }
509 0 : }
510 2064010 : }
511 28 : }
512 2064038 : prev = Some(next_key.next());
513 : }
514 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
515 28 : drop_rlock(guard);
516 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
517 28 : let mut holes = heap.into_vec();
518 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
519 28 : let mut next_hole = 0; // index of next hole in holes vector
520 28 :
521 28 : // This iterator walks through all key-value pairs from all the layers
522 28 : // we're compacting, in key, LSN order.
523 28 : let all_values_iter = all_keys.iter();
524 28 :
525 28 : // This iterator walks through all keys and is needed to calculate size used by each key
526 28 : let mut all_keys_iter = all_keys
527 28 : .iter()
528 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
529 2064010 : .coalesce(|mut prev, cur| {
530 2064010 : // Coalesce keys that belong to the same key pair.
531 2064010 : // This ensures that compaction doesn't put them
532 2064010 : // into different layer files.
533 2064010 : // Still limit this by the target file size,
534 2064010 : // so that we keep the size of the files in
535 2064010 : // check.
536 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
537 40038 : prev.2 += cur.2;
538 40038 : Ok(prev)
539 : } else {
540 2023972 : Err((prev, cur))
541 : }
542 2064010 : });
543 28 :
544 28 : // Merge the contents of all the input delta layers into a new set
545 28 : // of delta layers, based on the current partitioning.
546 28 : //
547 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
548 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
549 28 : // would be too large. In that case, we also split on the LSN dimension.
550 28 : //
551 28 : // LSN
552 28 : // ^
553 28 : // |
554 28 : // | +-----------+ +--+--+--+--+
555 28 : // | | | | | | | |
556 28 : // | +-----------+ | | | | |
557 28 : // | | | | | | | |
558 28 : // | +-----------+ ==> | | | | |
559 28 : // | | | | | | | |
560 28 : // | +-----------+ | | | | |
561 28 : // | | | | | | | |
562 28 : // | +-----------+ +--+--+--+--+
563 28 : // |
564 28 : // +--------------> key
565 28 : //
566 28 : //
567 28 : // If one key (X) has a lot of page versions:
568 28 : //
569 28 : // LSN
570 28 : // ^
571 28 : // | (X)
572 28 : // | +-----------+ +--+--+--+--+
573 28 : // | | | | | | | |
574 28 : // | +-----------+ | | +--+ |
575 28 : // | | | | | | | |
576 28 : // | +-----------+ ==> | | | | |
577 28 : // | | | | | +--+ |
578 28 : // | +-----------+ | | | | |
579 28 : // | | | | | | | |
580 28 : // | +-----------+ +--+--+--+--+
581 28 : // |
582 28 : // +--------------> key
583 28 : // TODO: this actually divides the layers into fixed-size chunks, not
584 28 : // based on the partitioning.
585 28 : //
586 28 : // TODO: we should also opportunistically materialize and
587 28 : // garbage collect what we can.
588 28 : let mut new_layers = Vec::new();
589 28 : let mut prev_key: Option<Key> = None;
590 28 : let mut writer: Option<DeltaLayerWriter> = None;
591 28 : let mut key_values_total_size = 0u64;
592 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
593 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
594 :
595 : for &DeltaEntry {
596 2064038 : key, lsn, ref val, ..
597 2064066 : } in all_values_iter
598 : {
599 2064038 : let value = val.load(ctx).await?;
600 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
601 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
602 2064038 : if !same_key || lsn == dup_end_lsn {
603 2024000 : let mut next_key_size = 0u64;
604 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
605 2024000 : dup_start_lsn = Lsn::INVALID;
606 2024000 : if !same_key {
607 2024000 : dup_end_lsn = Lsn::INVALID;
608 2024000 : }
609 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
610 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
611 2024000 : next_key_size = next_size;
612 2024000 : if key != next_key {
613 2023972 : if dup_end_lsn.is_valid() {
614 0 : // We are writting segment with duplicates:
615 0 : // place all remaining values of this key in separate segment
616 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
617 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
618 2023972 : }
619 2023972 : break;
620 28 : }
621 28 : key_values_total_size += next_size;
622 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
623 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
624 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
625 : // Split key between multiple layers: such layer can contain only single key
626 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
627 0 : dup_end_lsn // new segment with duplicates starts where old one stops
628 : } else {
629 0 : lsn // start with the first LSN for this key
630 : };
631 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
632 0 : break;
633 28 : }
634 : }
635 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
636 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
637 0 : dup_start_lsn = dup_end_lsn;
638 0 : dup_end_lsn = lsn_range.end;
639 2024000 : }
640 2024000 : if writer.is_some() {
641 2023972 : let written_size = writer.as_mut().unwrap().size();
642 2023972 : let contains_hole =
643 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
644 : // check if key cause layer overflow or contains hole...
645 2023972 : if is_dup_layer
646 2023972 : || dup_end_lsn.is_valid()
647 2023972 : || written_size + key_values_total_size > target_file_size
648 2023772 : || contains_hole
649 : {
650 : // ... if so, flush previous layer and prepare to write new one
651 200 : new_layers.push(
652 200 : writer
653 200 : .take()
654 200 : .unwrap()
655 200 : .finish(prev_key.unwrap().next(), self, ctx)
656 509 : .await?,
657 : );
658 200 : writer = None;
659 200 :
660 200 : if contains_hole {
661 0 : // skip hole
662 0 : next_hole += 1;
663 200 : }
664 2023772 : }
665 28 : }
666 : // Remember size of key value because at next iteration we will access next item
667 2024000 : key_values_total_size = next_key_size;
668 40038 : }
669 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
670 0 : Err(CompactionError::Other(anyhow::anyhow!(
671 0 : "failpoint delta-layer-writer-fail-before-finish"
672 0 : )))
673 2064038 : });
674 :
675 2064038 : if !self.shard_identity.is_key_disposable(&key) {
676 2064038 : if writer.is_none() {
677 228 : // Create writer if not initiaized yet
678 228 : writer = Some(
679 : DeltaLayerWriter::new(
680 228 : self.conf,
681 228 : self.timeline_id,
682 228 : self.tenant_shard_id,
683 228 : key,
684 228 : if dup_end_lsn.is_valid() {
685 : // this is a layer containing slice of values of the same key
686 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
687 0 : dup_start_lsn..dup_end_lsn
688 : } else {
689 228 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
690 228 : lsn_range.clone()
691 : },
692 228 : ctx,
693 : )
694 114 : .await?,
695 : );
696 2063810 : }
697 :
698 2064038 : writer
699 2064038 : .as_mut()
700 2064038 : .unwrap()
701 2064038 : .put_value(key, lsn, value, ctx)
702 1570 : .await?;
703 : } else {
704 0 : debug!(
705 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
706 0 : key,
707 0 : self.shard_identity.get_shard_number(&key)
708 : );
709 : }
710 :
711 2064038 : if !new_layers.is_empty() {
712 19786 : fail_point!("after-timeline-compacted-first-L1");
713 2044252 : }
714 :
715 2064038 : prev_key = Some(key);
716 : }
717 28 : if let Some(writer) = writer {
718 1992 : new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
719 0 : }
720 :
721 : // Sync layers
722 28 : if !new_layers.is_empty() {
723 : // Print a warning if the created layer is larger than double the target size
724 : // Add two pages for potential overhead. This should in theory be already
725 : // accounted for in the target calculation, but for very small targets,
726 : // we still might easily hit the limit otherwise.
727 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
728 228 : for layer in new_layers.iter() {
729 228 : if layer.layer_desc().file_size > warn_limit {
730 0 : warn!(
731 : %layer,
732 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
733 : );
734 228 : }
735 : }
736 :
737 : // The writer.finish() above already did the fsync of the inodes.
738 : // We just need to fsync the directory in which these inodes are linked,
739 : // which we know to be the timeline directory.
740 : //
741 : // We use fatal_err() below because the after writer.finish() returns with success,
742 : // the in-memory state of the filesystem already has the layer file in its final place,
743 : // and subsequent pageserver code could think it's durable while it really isn't.
744 28 : let timeline_dir = VirtualFile::open(
745 28 : &self
746 28 : .conf
747 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
748 28 : ctx,
749 28 : )
750 14 : .await
751 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
752 28 : timeline_dir
753 28 : .sync_all()
754 14 : .await
755 28 : .fatal_err("VirtualFile::sync_all timeline dir");
756 0 : }
757 :
758 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
759 28 : stats.new_deltas_count = Some(new_layers.len());
760 228 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
761 28 :
762 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
763 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
764 : {
765 28 : Ok(stats_json) => {
766 28 : info!(
767 0 : stats_json = stats_json.as_str(),
768 0 : "compact_level0_phase1 stats available"
769 : )
770 : }
771 0 : Err(e) => {
772 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
773 : }
774 : }
775 :
776 28 : Ok(CompactLevel0Phase1Result {
777 28 : new_layers,
778 28 : deltas_to_compact: deltas_to_compact
779 28 : .into_iter()
780 322 : .map(|x| x.drop_eviction_guard())
781 28 : .collect::<Vec<_>>(),
782 28 : })
783 366 : }
784 : }
785 :
786 : #[derive(Default)]
787 : struct CompactLevel0Phase1Result {
788 : new_layers: Vec<ResidentLayer>,
789 : deltas_to_compact: Vec<Layer>,
790 : }
791 :
792 : #[derive(Default)]
793 : struct CompactLevel0Phase1StatsBuilder {
794 : version: Option<u64>,
795 : tenant_id: Option<TenantShardId>,
796 : timeline_id: Option<TimelineId>,
797 : read_lock_acquisition_micros: DurationRecorder,
798 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
799 : read_lock_held_key_sort_micros: DurationRecorder,
800 : read_lock_held_prerequisites_micros: DurationRecorder,
801 : read_lock_held_compute_holes_micros: DurationRecorder,
802 : read_lock_drop_micros: DurationRecorder,
803 : write_layer_files_micros: DurationRecorder,
804 : level0_deltas_count: Option<usize>,
805 : new_deltas_count: Option<usize>,
806 : new_deltas_size: Option<u64>,
807 : }
808 :
809 : #[derive(serde::Serialize)]
810 : struct CompactLevel0Phase1Stats {
811 : version: u64,
812 : tenant_id: TenantShardId,
813 : timeline_id: TimelineId,
814 : read_lock_acquisition_micros: RecordedDuration,
815 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
816 : read_lock_held_key_sort_micros: RecordedDuration,
817 : read_lock_held_prerequisites_micros: RecordedDuration,
818 : read_lock_held_compute_holes_micros: RecordedDuration,
819 : read_lock_drop_micros: RecordedDuration,
820 : write_layer_files_micros: RecordedDuration,
821 : level0_deltas_count: usize,
822 : new_deltas_count: usize,
823 : new_deltas_size: u64,
824 : }
825 :
826 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
827 : type Error = anyhow::Error;
828 :
829 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
830 28 : Ok(Self {
831 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
832 28 : tenant_id: value
833 28 : .tenant_id
834 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
835 28 : timeline_id: value
836 28 : .timeline_id
837 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
838 28 : read_lock_acquisition_micros: value
839 28 : .read_lock_acquisition_micros
840 28 : .into_recorded()
841 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
842 28 : read_lock_held_spawn_blocking_startup_micros: value
843 28 : .read_lock_held_spawn_blocking_startup_micros
844 28 : .into_recorded()
845 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
846 28 : read_lock_held_key_sort_micros: value
847 28 : .read_lock_held_key_sort_micros
848 28 : .into_recorded()
849 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
850 28 : read_lock_held_prerequisites_micros: value
851 28 : .read_lock_held_prerequisites_micros
852 28 : .into_recorded()
853 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
854 28 : read_lock_held_compute_holes_micros: value
855 28 : .read_lock_held_compute_holes_micros
856 28 : .into_recorded()
857 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
858 28 : read_lock_drop_micros: value
859 28 : .read_lock_drop_micros
860 28 : .into_recorded()
861 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
862 28 : write_layer_files_micros: value
863 28 : .write_layer_files_micros
864 28 : .into_recorded()
865 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
866 28 : level0_deltas_count: value
867 28 : .level0_deltas_count
868 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
869 28 : new_deltas_count: value
870 28 : .new_deltas_count
871 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
872 28 : new_deltas_size: value
873 28 : .new_deltas_size
874 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
875 : })
876 28 : }
877 : }
878 :
879 : impl Timeline {
880 : /// Entry point for new tiered compaction algorithm.
881 : ///
882 : /// All the real work is in the implementation in the pageserver_compaction
883 : /// crate. The code here would apply to any algorithm implemented by the
884 : /// same interface, but tiered is the only one at the moment.
885 : ///
886 : /// TODO: cancellation
887 0 : pub(crate) async fn compact_tiered(
888 0 : self: &Arc<Self>,
889 0 : _cancel: &CancellationToken,
890 0 : ctx: &RequestContext,
891 0 : ) -> Result<(), CompactionError> {
892 0 : let fanout = self.get_compaction_threshold() as u64;
893 0 : let target_file_size = self.get_checkpoint_distance();
894 :
895 : // Find the top of the historical layers
896 0 : let end_lsn = {
897 0 : let guard = self.layers.read().await;
898 0 : let layers = guard.layer_map();
899 :
900 0 : let l0_deltas = layers.get_level0_deltas()?;
901 0 : drop(guard);
902 0 :
903 0 : // As an optimization, if we find that there are too few L0 layers,
904 0 : // bail out early. We know that the compaction algorithm would do
905 0 : // nothing in that case.
906 0 : if l0_deltas.len() < fanout as usize {
907 : // doesn't need compacting
908 0 : return Ok(());
909 0 : }
910 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
911 0 : };
912 0 :
913 0 : // Is the timeline being deleted?
914 0 : if self.is_stopping() {
915 0 : trace!("Dropping out of compaction on timeline shutdown");
916 0 : return Err(CompactionError::ShuttingDown);
917 0 : }
918 :
919 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
920 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
921 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
922 0 :
923 0 : pageserver_compaction::compact_tiered::compact_tiered(
924 0 : &mut adaptor,
925 0 : end_lsn,
926 0 : target_file_size,
927 0 : fanout,
928 0 : ctx,
929 0 : )
930 0 : .await?;
931 :
932 0 : adaptor.flush_updates().await?;
933 0 : Ok(())
934 0 : }
935 : }
936 :
937 : struct TimelineAdaptor {
938 : timeline: Arc<Timeline>,
939 :
940 : keyspace: (Lsn, KeySpace),
941 :
942 : new_deltas: Vec<ResidentLayer>,
943 : new_images: Vec<ResidentLayer>,
944 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
945 : }
946 :
947 : impl TimelineAdaptor {
948 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
949 0 : Self {
950 0 : timeline: timeline.clone(),
951 0 : keyspace,
952 0 : new_images: Vec::new(),
953 0 : new_deltas: Vec::new(),
954 0 : layers_to_delete: Vec::new(),
955 0 : }
956 0 : }
957 :
958 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
959 0 : let layers_to_delete = {
960 0 : let guard = self.timeline.layers.read().await;
961 0 : self.layers_to_delete
962 0 : .iter()
963 0 : .map(|x| guard.get_from_desc(x))
964 0 : .collect::<Vec<Layer>>()
965 0 : };
966 0 : self.timeline
967 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
968 0 : .await?;
969 :
970 0 : self.timeline
971 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
972 :
973 0 : self.new_deltas.clear();
974 0 : self.layers_to_delete.clear();
975 0 : Ok(())
976 0 : }
977 : }
978 :
979 : #[derive(Clone)]
980 : struct ResidentDeltaLayer(ResidentLayer);
981 : #[derive(Clone)]
982 : struct ResidentImageLayer(ResidentLayer);
983 :
984 : impl CompactionJobExecutor for TimelineAdaptor {
985 : type Key = crate::repository::Key;
986 :
987 : type Layer = OwnArc<PersistentLayerDesc>;
988 : type DeltaLayer = ResidentDeltaLayer;
989 : type ImageLayer = ResidentImageLayer;
990 :
991 : type RequestContext = crate::context::RequestContext;
992 :
993 0 : fn get_shard_identity(&self) -> &ShardIdentity {
994 0 : self.timeline.get_shard_identity()
995 0 : }
996 :
997 0 : async fn get_layers(
998 0 : &mut self,
999 0 : key_range: &Range<Key>,
1000 0 : lsn_range: &Range<Lsn>,
1001 0 : _ctx: &RequestContext,
1002 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
1003 0 : self.flush_updates().await?;
1004 :
1005 0 : let guard = self.timeline.layers.read().await;
1006 0 : let layer_map = guard.layer_map();
1007 0 :
1008 0 : let result = layer_map
1009 0 : .iter_historic_layers()
1010 0 : .filter(|l| {
1011 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
1012 0 : })
1013 0 : .map(OwnArc)
1014 0 : .collect();
1015 0 : Ok(result)
1016 0 : }
1017 :
1018 0 : async fn get_keyspace(
1019 0 : &mut self,
1020 0 : key_range: &Range<Key>,
1021 0 : lsn: Lsn,
1022 0 : _ctx: &RequestContext,
1023 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
1024 0 : if lsn == self.keyspace.0 {
1025 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
1026 0 : &self.keyspace.1.ranges,
1027 0 : key_range,
1028 0 : ))
1029 : } else {
1030 : // The current compaction implementatin only ever requests the key space
1031 : // at the compaction end LSN.
1032 0 : anyhow::bail!("keyspace not available for requested lsn");
1033 : }
1034 0 : }
1035 :
1036 0 : async fn downcast_delta_layer(
1037 0 : &self,
1038 0 : layer: &OwnArc<PersistentLayerDesc>,
1039 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
1040 0 : // this is a lot more complex than a simple downcast...
1041 0 : if layer.is_delta() {
1042 0 : let l = {
1043 0 : let guard = self.timeline.layers.read().await;
1044 0 : guard.get_from_desc(layer)
1045 : };
1046 0 : let result = l.download_and_keep_resident().await?;
1047 :
1048 0 : Ok(Some(ResidentDeltaLayer(result)))
1049 : } else {
1050 0 : Ok(None)
1051 : }
1052 0 : }
1053 :
1054 0 : async fn create_image(
1055 0 : &mut self,
1056 0 : lsn: Lsn,
1057 0 : key_range: &Range<Key>,
1058 0 : ctx: &RequestContext,
1059 0 : ) -> anyhow::Result<()> {
1060 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
1061 0 : }
1062 :
1063 0 : async fn create_delta(
1064 0 : &mut self,
1065 0 : lsn_range: &Range<Lsn>,
1066 0 : key_range: &Range<Key>,
1067 0 : input_layers: &[ResidentDeltaLayer],
1068 0 : ctx: &RequestContext,
1069 0 : ) -> anyhow::Result<()> {
1070 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1071 :
1072 0 : let mut all_entries = Vec::new();
1073 0 : for dl in input_layers.iter() {
1074 0 : all_entries.extend(dl.load_keys(ctx).await?);
1075 : }
1076 :
1077 : // The current stdlib sorting implementation is designed in a way where it is
1078 : // particularly fast where the slice is made up of sorted sub-ranges.
1079 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1080 :
1081 0 : let mut writer = DeltaLayerWriter::new(
1082 0 : self.timeline.conf,
1083 0 : self.timeline.timeline_id,
1084 0 : self.timeline.tenant_shard_id,
1085 0 : key_range.start,
1086 0 : lsn_range.clone(),
1087 0 : ctx,
1088 0 : )
1089 0 : .await?;
1090 :
1091 0 : let mut dup_values = 0;
1092 0 :
1093 0 : // This iterator walks through all key-value pairs from all the layers
1094 0 : // we're compacting, in key, LSN order.
1095 0 : let mut prev: Option<(Key, Lsn)> = None;
1096 : for &DeltaEntry {
1097 0 : key, lsn, ref val, ..
1098 0 : } in all_entries.iter()
1099 : {
1100 0 : if prev == Some((key, lsn)) {
1101 : // This is a duplicate. Skip it.
1102 : //
1103 : // It can happen if compaction is interrupted after writing some
1104 : // layers but not all, and we are compacting the range again.
1105 : // The calculations in the algorithm assume that there are no
1106 : // duplicates, so the math on targeted file size is likely off,
1107 : // and we will create smaller files than expected.
1108 0 : dup_values += 1;
1109 0 : continue;
1110 0 : }
1111 :
1112 0 : let value = val.load(ctx).await?;
1113 :
1114 0 : writer.put_value(key, lsn, value, ctx).await?;
1115 :
1116 0 : prev = Some((key, lsn));
1117 : }
1118 :
1119 0 : if dup_values > 0 {
1120 0 : warn!("delta layer created with {} duplicate values", dup_values);
1121 0 : }
1122 :
1123 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1124 0 : Err(anyhow::anyhow!(
1125 0 : "failpoint delta-layer-writer-fail-before-finish"
1126 0 : ))
1127 0 : });
1128 :
1129 0 : let new_delta_layer = writer
1130 0 : .finish(prev.unwrap().0.next(), &self.timeline, ctx)
1131 0 : .await?;
1132 :
1133 0 : self.new_deltas.push(new_delta_layer);
1134 0 : Ok(())
1135 0 : }
1136 :
1137 0 : async fn delete_layer(
1138 0 : &mut self,
1139 0 : layer: &OwnArc<PersistentLayerDesc>,
1140 0 : _ctx: &RequestContext,
1141 0 : ) -> anyhow::Result<()> {
1142 0 : self.layers_to_delete.push(layer.clone().0);
1143 0 : Ok(())
1144 0 : }
1145 : }
1146 :
1147 : impl TimelineAdaptor {
1148 0 : async fn create_image_impl(
1149 0 : &mut self,
1150 0 : lsn: Lsn,
1151 0 : key_range: &Range<Key>,
1152 0 : ctx: &RequestContext,
1153 0 : ) -> Result<(), PageReconstructError> {
1154 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
1155 :
1156 0 : let mut image_layer_writer = ImageLayerWriter::new(
1157 0 : self.timeline.conf,
1158 0 : self.timeline.timeline_id,
1159 0 : self.timeline.tenant_shard_id,
1160 0 : key_range,
1161 0 : lsn,
1162 0 : ctx,
1163 0 : )
1164 0 : .await?;
1165 :
1166 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1167 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1168 0 : "failpoint image-layer-writer-fail-before-finish"
1169 0 : )))
1170 0 : });
1171 0 : let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
1172 0 : for range in &keyspace_ranges {
1173 0 : let mut key = range.start;
1174 0 : while key < range.end {
1175 0 : let img = match self.timeline.get(key, lsn, ctx).await {
1176 0 : Ok(img) => img,
1177 0 : Err(err) => {
1178 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
1179 0 : // page without losing any actual user data. That seems better
1180 0 : // than failing repeatedly and getting stuck.
1181 0 : //
1182 0 : // We had a bug at one point, where we truncated the FSM and VM
1183 0 : // in the pageserver, but the Postgres didn't know about that
1184 0 : // and continued to generate incremental WAL records for pages
1185 0 : // that didn't exist in the pageserver. Trying to replay those
1186 0 : // WAL records failed to find the previous image of the page.
1187 0 : // This special case allows us to recover from that situation.
1188 0 : // See https://github.com/neondatabase/neon/issues/2601.
1189 0 : //
1190 0 : // Unfortunately we cannot do this for the main fork, or for
1191 0 : // any metadata keys, keys, as that would lead to actual data
1192 0 : // loss.
1193 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
1194 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
1195 0 : ZERO_PAGE.clone()
1196 : } else {
1197 0 : return Err(err);
1198 : }
1199 : }
1200 : };
1201 0 : image_layer_writer.put_image(key, img, ctx).await?;
1202 0 : key = key.next();
1203 : }
1204 : }
1205 0 : let image_layer = image_layer_writer.finish(&self.timeline, ctx).await?;
1206 :
1207 0 : self.new_images.push(image_layer);
1208 0 :
1209 0 : timer.stop_and_record();
1210 0 :
1211 0 : Ok(())
1212 0 : }
1213 : }
1214 :
1215 : impl CompactionRequestContext for crate::context::RequestContext {}
1216 :
1217 : #[derive(Debug, Clone)]
1218 : pub struct OwnArc<T>(pub Arc<T>);
1219 :
1220 : impl<T> Deref for OwnArc<T> {
1221 : type Target = <Arc<T> as Deref>::Target;
1222 0 : fn deref(&self) -> &Self::Target {
1223 0 : &self.0
1224 0 : }
1225 : }
1226 :
1227 : impl<T> AsRef<T> for OwnArc<T> {
1228 0 : fn as_ref(&self) -> &T {
1229 0 : self.0.as_ref()
1230 0 : }
1231 : }
1232 :
1233 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1234 0 : fn key_range(&self) -> &Range<Key> {
1235 0 : &self.key_range
1236 0 : }
1237 0 : fn lsn_range(&self) -> &Range<Lsn> {
1238 0 : &self.lsn_range
1239 0 : }
1240 0 : fn file_size(&self) -> u64 {
1241 0 : self.file_size
1242 0 : }
1243 0 : fn short_id(&self) -> std::string::String {
1244 0 : self.as_ref().short_id().to_string()
1245 0 : }
1246 0 : fn is_delta(&self) -> bool {
1247 0 : self.as_ref().is_delta()
1248 0 : }
1249 : }
1250 :
1251 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1252 0 : fn key_range(&self) -> &Range<Key> {
1253 0 : &self.layer_desc().key_range
1254 0 : }
1255 0 : fn lsn_range(&self) -> &Range<Lsn> {
1256 0 : &self.layer_desc().lsn_range
1257 0 : }
1258 0 : fn file_size(&self) -> u64 {
1259 0 : self.layer_desc().file_size
1260 0 : }
1261 0 : fn short_id(&self) -> std::string::String {
1262 0 : self.layer_desc().short_id().to_string()
1263 0 : }
1264 0 : fn is_delta(&self) -> bool {
1265 0 : true
1266 0 : }
1267 : }
1268 :
1269 : use crate::tenant::timeline::DeltaEntry;
1270 :
1271 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1272 0 : fn key_range(&self) -> &Range<Key> {
1273 0 : &self.0.layer_desc().key_range
1274 0 : }
1275 0 : fn lsn_range(&self) -> &Range<Lsn> {
1276 0 : &self.0.layer_desc().lsn_range
1277 0 : }
1278 0 : fn file_size(&self) -> u64 {
1279 0 : self.0.layer_desc().file_size
1280 0 : }
1281 0 : fn short_id(&self) -> std::string::String {
1282 0 : self.0.layer_desc().short_id().to_string()
1283 0 : }
1284 0 : fn is_delta(&self) -> bool {
1285 0 : true
1286 0 : }
1287 : }
1288 :
1289 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1290 : type DeltaEntry<'a> = DeltaEntry<'a>;
1291 :
1292 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1293 0 : self.0.load_keys(ctx).await
1294 0 : }
1295 : }
1296 :
1297 : impl CompactionLayer<Key> for ResidentImageLayer {
1298 0 : fn key_range(&self) -> &Range<Key> {
1299 0 : &self.0.layer_desc().key_range
1300 0 : }
1301 0 : fn lsn_range(&self) -> &Range<Lsn> {
1302 0 : &self.0.layer_desc().lsn_range
1303 0 : }
1304 0 : fn file_size(&self) -> u64 {
1305 0 : self.0.layer_desc().file_size
1306 0 : }
1307 0 : fn short_id(&self) -> std::string::String {
1308 0 : self.0.layer_desc().short_id().to_string()
1309 0 : }
1310 0 : fn is_delta(&self) -> bool {
1311 0 : false
1312 0 : }
1313 : }
1314 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|