Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashSet};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
25 : use serde::Serialize;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{debug, info, info_span, trace, warn, Instrument};
28 : use utils::id::TimelineId;
29 :
30 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
31 : use crate::page_cache;
32 : use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
33 : use crate::tenant::remote_timeline_client::WaitCompletionError;
34 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
35 : use crate::tenant::storage_layer::{
36 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
37 : };
38 : use crate::tenant::timeline::ImageLayerCreationOutcome;
39 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
40 : use crate::tenant::timeline::{Layer, ResidentLayer};
41 : use crate::tenant::DeltaLayer;
42 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
43 :
44 : use crate::keyspace::KeySpace;
45 : use crate::repository::{Key, Value};
46 : use crate::walrecord::NeonWalRecord;
47 :
48 : use utils::lsn::Lsn;
49 :
50 : use pageserver_compaction::helpers::overlaps_with;
51 : use pageserver_compaction::interface::*;
52 :
53 : use super::CompactionError;
54 :
55 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
56 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
57 :
58 : /// The result of bottom-most compaction for a single key at each LSN.
59 : #[derive(Debug)]
60 : #[cfg_attr(test, derive(PartialEq))]
61 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
62 :
63 : /// The result of bottom-most compaction.
64 : #[derive(Debug)]
65 : #[cfg_attr(test, derive(PartialEq))]
66 : pub(crate) struct KeyHistoryRetention {
67 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
68 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
69 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
70 : pub(crate) above_horizon: KeyLogAtLsn,
71 : }
72 :
73 : impl KeyHistoryRetention {
74 338 : async fn pipe_to(
75 338 : self,
76 338 : key: Key,
77 338 : delta_writer: &mut Vec<(Key, Lsn, Value)>,
78 338 : mut image_writer: Option<&mut ImageLayerWriter>,
79 338 : stat: &mut CompactionStatistics,
80 338 : ctx: &RequestContext,
81 338 : ) -> anyhow::Result<()> {
82 338 : let mut first_batch = true;
83 1050 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
84 712 : if first_batch {
85 338 : if logs.len() == 1 && logs[0].1.is_image() {
86 324 : let Value::Image(img) = &logs[0].1 else {
87 0 : unreachable!()
88 : };
89 324 : stat.produce_image_key(img);
90 324 : if let Some(image_writer) = image_writer.as_mut() {
91 328 : image_writer.put_image(key, img.clone(), ctx).await?;
92 0 : } else {
93 0 : delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
94 0 : }
95 : } else {
96 28 : for (lsn, val) in logs {
97 14 : stat.produce_key(&val);
98 14 : delta_writer.push((key, lsn, val));
99 14 : }
100 : }
101 338 : first_batch = false;
102 : } else {
103 438 : for (lsn, val) in logs {
104 64 : stat.produce_key(&val);
105 64 : delta_writer.push((key, lsn, val));
106 64 : }
107 : }
108 : }
109 338 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
110 352 : for (lsn, val) in above_horizon_logs {
111 14 : stat.produce_key(&val);
112 14 : delta_writer.push((key, lsn, val));
113 14 : }
114 338 : Ok(())
115 338 : }
116 : }
117 :
118 : #[derive(Debug, Serialize, Default)]
119 : struct CompactionStatisticsNumSize {
120 : num: u64,
121 : size: u64,
122 : }
123 :
124 : #[derive(Debug, Serialize, Default)]
125 : pub struct CompactionStatistics {
126 : delta_layer_visited: CompactionStatisticsNumSize,
127 : image_layer_visited: CompactionStatisticsNumSize,
128 : delta_layer_produced: CompactionStatisticsNumSize,
129 : image_layer_produced: CompactionStatisticsNumSize,
130 : num_delta_layer_discarded: usize,
131 : num_image_layer_discarded: usize,
132 : num_unique_keys_visited: usize,
133 : wal_keys_visited: CompactionStatisticsNumSize,
134 : image_keys_visited: CompactionStatisticsNumSize,
135 : wal_produced: CompactionStatisticsNumSize,
136 : image_produced: CompactionStatisticsNumSize,
137 : }
138 :
139 : impl CompactionStatistics {
140 530 : fn estimated_size_of_value(val: &Value) -> usize {
141 194 : match val {
142 336 : Value::Image(img) => img.len(),
143 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
144 194 : _ => std::mem::size_of::<NeonWalRecord>(),
145 : }
146 530 : }
147 856 : fn estimated_size_of_key() -> usize {
148 856 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
149 856 : }
150 54 : fn visit_delta_layer(&mut self, size: u64) {
151 54 : self.delta_layer_visited.num += 1;
152 54 : self.delta_layer_visited.size += size;
153 54 : }
154 26 : fn visit_image_layer(&mut self, size: u64) {
155 26 : self.image_layer_visited.num += 1;
156 26 : self.image_layer_visited.size += size;
157 26 : }
158 338 : fn on_unique_key_visited(&mut self) {
159 338 : self.num_unique_keys_visited += 1;
160 338 : }
161 104 : fn visit_wal_key(&mut self, val: &Value) {
162 104 : self.wal_keys_visited.num += 1;
163 104 : self.wal_keys_visited.size +=
164 104 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
165 104 : }
166 336 : fn visit_image_key(&mut self, val: &Value) {
167 336 : self.image_keys_visited.num += 1;
168 336 : self.image_keys_visited.size +=
169 336 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
170 336 : }
171 92 : fn produce_key(&mut self, val: &Value) {
172 92 : match val {
173 2 : Value::Image(img) => self.produce_image_key(img),
174 90 : Value::WalRecord(_) => self.produce_wal_key(val),
175 : }
176 92 : }
177 90 : fn produce_wal_key(&mut self, val: &Value) {
178 90 : self.wal_produced.num += 1;
179 90 : self.wal_produced.size +=
180 90 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
181 90 : }
182 326 : fn produce_image_key(&mut self, val: &Bytes) {
183 326 : self.image_produced.num += 1;
184 326 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
185 326 : }
186 26 : fn discard_delta_layer(&mut self) {
187 26 : self.num_delta_layer_discarded += 1;
188 26 : }
189 6 : fn discard_image_layer(&mut self) {
190 6 : self.num_image_layer_discarded += 1;
191 6 : }
192 32 : fn produce_delta_layer(&mut self, size: u64) {
193 32 : self.delta_layer_produced.num += 1;
194 32 : self.delta_layer_produced.size += size;
195 32 : }
196 12 : fn produce_image_layer(&mut self, size: u64) {
197 12 : self.image_layer_produced.num += 1;
198 12 : self.image_layer_produced.size += size;
199 12 : }
200 : }
201 :
202 : impl Timeline {
203 : /// TODO: cancellation
204 : ///
205 : /// Returns whether the compaction has pending tasks.
206 364 : pub(crate) async fn compact_legacy(
207 364 : self: &Arc<Self>,
208 364 : cancel: &CancellationToken,
209 364 : flags: EnumSet<CompactFlags>,
210 364 : ctx: &RequestContext,
211 364 : ) -> Result<bool, CompactionError> {
212 364 : if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
213 0 : self.compact_with_gc(cancel, flags, ctx)
214 0 : .await
215 0 : .map_err(CompactionError::Other)?;
216 0 : return Ok(false);
217 364 : }
218 364 :
219 364 : if flags.contains(CompactFlags::DryRun) {
220 0 : return Err(CompactionError::Other(anyhow!(
221 0 : "dry-run mode is not supported for legacy compaction for now"
222 0 : )));
223 364 : }
224 364 :
225 364 : // High level strategy for compaction / image creation:
226 364 : //
227 364 : // 1. First, calculate the desired "partitioning" of the
228 364 : // currently in-use key space. The goal is to partition the
229 364 : // key space into roughly fixed-size chunks, but also take into
230 364 : // account any existing image layers, and try to align the
231 364 : // chunk boundaries with the existing image layers to avoid
232 364 : // too much churn. Also try to align chunk boundaries with
233 364 : // relation boundaries. In principle, we don't know about
234 364 : // relation boundaries here, we just deal with key-value
235 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
236 364 : // map relations into key-value pairs. But in practice we know
237 364 : // that 'field6' is the block number, and the fields 1-5
238 364 : // identify a relation. This is just an optimization,
239 364 : // though.
240 364 : //
241 364 : // 2. Once we know the partitioning, for each partition,
242 364 : // decide if it's time to create a new image layer. The
243 364 : // criteria is: there has been too much "churn" since the last
244 364 : // image layer? The "churn" is fuzzy concept, it's a
245 364 : // combination of too many delta files, or too much WAL in
246 364 : // total in the delta file. Or perhaps: if creating an image
247 364 : // file would allow to delete some older files.
248 364 : //
249 364 : // 3. After that, we compact all level0 delta files if there
250 364 : // are too many of them. While compacting, we also garbage
251 364 : // collect any page versions that are no longer needed because
252 364 : // of the new image layers we created in step 2.
253 364 : //
254 364 : // TODO: This high level strategy hasn't been implemented yet.
255 364 : // Below are functions compact_level0() and create_image_layers()
256 364 : // but they are a bit ad hoc and don't quite work like it's explained
257 364 : // above. Rewrite it.
258 364 :
259 364 : // Is the timeline being deleted?
260 364 : if self.is_stopping() {
261 0 : trace!("Dropping out of compaction on timeline shutdown");
262 0 : return Err(CompactionError::ShuttingDown);
263 364 : }
264 364 :
265 364 : let target_file_size = self.get_checkpoint_distance();
266 :
267 : // Define partitioning schema if needed
268 :
269 : // FIXME: the match should only cover repartitioning, not the next steps
270 364 : let (partition_count, has_pending_tasks) = match self
271 364 : .repartition(
272 364 : self.get_last_record_lsn(),
273 364 : self.get_compaction_target_size(),
274 364 : flags,
275 364 : ctx,
276 364 : )
277 16406 : .await
278 : {
279 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
280 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
281 364 : let image_ctx = RequestContextBuilder::extend(ctx)
282 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
283 364 : .build();
284 364 :
285 364 : // 2. Compact
286 364 : let timer = self.metrics.compact_time_histo.start_timer();
287 45831 : let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
288 364 : timer.stop_and_record();
289 364 :
290 364 : let mut partitioning = dense_partitioning;
291 364 : partitioning
292 364 : .parts
293 364 : .extend(sparse_partitioning.into_dense().parts);
294 364 :
295 364 : // 3. Create new image layers for partitions that have been modified
296 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
297 364 : if fully_compacted {
298 364 : let image_layers = self
299 364 : .create_image_layers(
300 364 : &partitioning,
301 364 : lsn,
302 364 : if flags.contains(CompactFlags::ForceImageLayerCreation) {
303 14 : ImageLayerCreationMode::Force
304 : } else {
305 350 : ImageLayerCreationMode::Try
306 : },
307 364 : &image_ctx,
308 : )
309 14517 : .await?;
310 :
311 364 : self.upload_new_image_layers(image_layers)?;
312 : } else {
313 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
314 : }
315 364 : (partitioning.parts.len(), !fully_compacted)
316 : }
317 0 : Err(err) => {
318 0 : // no partitioning? This is normal, if the timeline was just created
319 0 : // as an empty timeline. Also in unit tests, when we use the timeline
320 0 : // as a simple key-value store, ignoring the datadir layout. Log the
321 0 : // error but continue.
322 0 : //
323 0 : // Suppress error when it's due to cancellation
324 0 : if !self.cancel.is_cancelled() {
325 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
326 0 : }
327 0 : (1, false)
328 : }
329 : };
330 :
331 364 : if self.shard_identity.count >= ShardCount::new(2) {
332 : // Limit the number of layer rewrites to the number of partitions: this means its
333 : // runtime should be comparable to a full round of image layer creations, rather than
334 : // being potentially much longer.
335 0 : let rewrite_max = partition_count;
336 0 :
337 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
338 364 : }
339 :
340 364 : Ok(has_pending_tasks)
341 364 : }
342 :
343 : /// Check for layers that are elegible to be rewritten:
344 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
345 : /// we don't indefinitely retain keys in this shard that aren't needed.
346 : /// - For future use: layers beyond pitr_interval that are in formats we would
347 : /// rather not maintain compatibility with indefinitely.
348 : ///
349 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
350 : /// how much work it will try to do in each compaction pass.
351 0 : async fn compact_shard_ancestors(
352 0 : self: &Arc<Self>,
353 0 : rewrite_max: usize,
354 0 : ctx: &RequestContext,
355 0 : ) -> Result<(), CompactionError> {
356 0 : let mut drop_layers = Vec::new();
357 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
358 0 :
359 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
360 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
361 0 : // pitr_interval, for example because a branchpoint references it.
362 0 : //
363 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
364 0 : // are rewriting layers.
365 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
366 0 :
367 0 : tracing::info!(
368 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
369 0 : *latest_gc_cutoff,
370 0 : self.gc_info.read().unwrap().cutoffs.time
371 : );
372 :
373 0 : let layers = self.layers.read().await;
374 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
375 0 : let layer = layers.get_from_desc(&layer_desc);
376 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
377 : // This layer does not belong to a historic ancestor, no need to re-image it.
378 0 : continue;
379 0 : }
380 0 :
381 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
382 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
383 0 : let layer_local_page_count = sharded_range.page_count();
384 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
385 0 : if layer_local_page_count == 0 {
386 : // This ancestral layer only covers keys that belong to other shards.
387 : // We include the full metadata in the log: if we had some critical bug that caused
388 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
389 0 : info!(%layer, old_metadata=?layer.metadata(),
390 0 : "dropping layer after shard split, contains no keys for this shard.",
391 : );
392 :
393 0 : if cfg!(debug_assertions) {
394 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
395 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
396 : // should be !is_key_disposable()
397 0 : let range = layer_desc.get_key_range();
398 0 : let mut key = range.start;
399 0 : while key < range.end {
400 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
401 0 : key = key.next();
402 : }
403 0 : }
404 :
405 0 : drop_layers.push(layer);
406 0 : continue;
407 0 : } else if layer_local_page_count != u32::MAX
408 0 : && layer_local_page_count == layer_raw_page_count
409 : {
410 0 : debug!(%layer,
411 0 : "layer is entirely shard local ({} keys), no need to filter it",
412 : layer_local_page_count
413 : );
414 0 : continue;
415 0 : }
416 0 :
417 0 : // Don't bother re-writing a layer unless it will at least halve its size
418 0 : if layer_local_page_count != u32::MAX
419 0 : && layer_local_page_count > layer_raw_page_count / 2
420 : {
421 0 : debug!(%layer,
422 0 : "layer is already mostly local ({}/{}), not rewriting",
423 : layer_local_page_count,
424 : layer_raw_page_count
425 : );
426 0 : }
427 :
428 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
429 : // without incurring the I/O cost of a rewrite.
430 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
431 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
432 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
433 0 : continue;
434 0 : }
435 0 :
436 0 : if layer_desc.is_delta() {
437 : // We do not yet implement rewrite of delta layers
438 0 : debug!(%layer, "Skipping rewrite of delta layer");
439 0 : continue;
440 0 : }
441 0 :
442 0 : // Only rewrite layers if their generations differ. This guarantees:
443 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
444 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
445 0 : if layer.metadata().generation == self.generation {
446 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
447 0 : continue;
448 0 : }
449 0 :
450 0 : if layers_to_rewrite.len() >= rewrite_max {
451 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
452 0 : layers_to_rewrite.len()
453 : );
454 0 : continue;
455 0 : }
456 0 :
457 0 : // Fall through: all our conditions for doing a rewrite passed.
458 0 : layers_to_rewrite.push(layer);
459 : }
460 :
461 : // Drop read lock on layer map before we start doing time-consuming I/O
462 0 : drop(layers);
463 0 :
464 0 : let mut replace_image_layers = Vec::new();
465 :
466 0 : for layer in layers_to_rewrite {
467 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
468 0 : let mut image_layer_writer = ImageLayerWriter::new(
469 0 : self.conf,
470 0 : self.timeline_id,
471 0 : self.tenant_shard_id,
472 0 : &layer.layer_desc().key_range,
473 0 : layer.layer_desc().image_layer_lsn(),
474 0 : ctx,
475 0 : )
476 0 : .await
477 0 : .map_err(CompactionError::Other)?;
478 :
479 : // Safety of layer rewrites:
480 : // - We are writing to a different local file path than we are reading from, so the old Layer
481 : // cannot interfere with the new one.
482 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
483 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
484 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
485 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
486 : // - Any readers that have a reference to the old layer will keep it alive until they are done
487 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
488 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
489 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
490 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
491 : // - ingestion, which only inserts layers, therefore cannot collide with us.
492 0 : let resident = layer.download_and_keep_resident().await?;
493 :
494 0 : let keys_written = resident
495 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
496 0 : .await?;
497 :
498 0 : if keys_written > 0 {
499 0 : let new_layer = image_layer_writer
500 0 : .finish(self, ctx)
501 0 : .await
502 0 : .map_err(CompactionError::Other)?;
503 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
504 0 : layer.metadata().file_size,
505 0 : new_layer.metadata().file_size);
506 :
507 0 : replace_image_layers.push((layer, new_layer));
508 0 : } else {
509 0 : // Drop the old layer. Usually for this case we would already have noticed that
510 0 : // the layer has no data for us with the ShardedRange check above, but
511 0 : drop_layers.push(layer);
512 0 : }
513 : }
514 :
515 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
516 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
517 : // to remote index) and be removed. This is inefficient but safe.
518 : fail::fail_point!("compact-shard-ancestors-localonly");
519 :
520 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
521 0 : self.rewrite_layers(replace_image_layers, drop_layers)
522 0 : .await?;
523 :
524 : fail::fail_point!("compact-shard-ancestors-enqueued");
525 :
526 : // We wait for all uploads to complete before finishing this compaction stage. This is not
527 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
528 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
529 : // load.
530 0 : match self.remote_client.wait_completion().await {
531 0 : Ok(()) => (),
532 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
533 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
534 0 : return Err(CompactionError::ShuttingDown)
535 : }
536 : }
537 :
538 : fail::fail_point!("compact-shard-ancestors-persistent");
539 :
540 0 : Ok(())
541 0 : }
542 :
543 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
544 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
545 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
546 : ///
547 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
548 : /// that we know won't be needed for reads.
549 538 : pub(super) async fn update_layer_visibility(
550 538 : &self,
551 538 : ) -> Result<(), super::layer_manager::Shutdown> {
552 538 : let head_lsn = self.get_last_record_lsn();
553 :
554 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
555 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
556 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
557 : // they will be subject to L0->L1 compaction in the near future.
558 538 : let layer_manager = self.layers.read().await;
559 538 : let layer_map = layer_manager.layer_map()?;
560 :
561 538 : let readable_points = {
562 538 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
563 538 :
564 538 : let mut readable_points = Vec::with_capacity(children.len() + 1);
565 538 : for (child_lsn, _child_timeline_id) in &children {
566 0 : readable_points.push(*child_lsn);
567 0 : }
568 538 : readable_points.push(head_lsn);
569 538 : readable_points
570 538 : };
571 538 :
572 538 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
573 3040 : for (layer_desc, visibility) in layer_visibility {
574 2502 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
575 2502 : let layer = layer_manager.get_from_desc(&layer_desc);
576 2502 : layer.set_visibility(visibility);
577 2502 : }
578 :
579 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
580 : // avoid assuming that everything at a branch point is visible.
581 538 : drop(covered);
582 538 : Ok(())
583 538 : }
584 :
585 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
586 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
587 364 : async fn compact_level0(
588 364 : self: &Arc<Self>,
589 364 : target_file_size: u64,
590 364 : ctx: &RequestContext,
591 364 : ) -> Result<bool, CompactionError> {
592 : let CompactLevel0Phase1Result {
593 364 : new_layers,
594 364 : deltas_to_compact,
595 364 : fully_compacted,
596 : } = {
597 364 : let phase1_span = info_span!("compact_level0_phase1");
598 364 : let ctx = ctx.attached_child();
599 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
600 364 : version: Some(2),
601 364 : tenant_id: Some(self.tenant_shard_id),
602 364 : timeline_id: Some(self.timeline_id),
603 364 : ..Default::default()
604 364 : };
605 364 :
606 364 : let begin = tokio::time::Instant::now();
607 364 : let phase1_layers_locked = self.layers.read().await;
608 364 : let now = tokio::time::Instant::now();
609 364 : stats.read_lock_acquisition_micros =
610 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
611 364 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
612 364 : .instrument(phase1_span)
613 45831 : .await?
614 : };
615 :
616 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
617 : // nothing to do
618 336 : return Ok(true);
619 28 : }
620 28 :
621 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
622 0 : .await?;
623 28 : Ok(fully_compacted)
624 364 : }
625 :
626 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
627 364 : async fn compact_level0_phase1<'a>(
628 364 : self: &'a Arc<Self>,
629 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
630 364 : mut stats: CompactLevel0Phase1StatsBuilder,
631 364 : target_file_size: u64,
632 364 : ctx: &RequestContext,
633 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
634 364 : stats.read_lock_held_spawn_blocking_startup_micros =
635 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
636 364 : let layers = guard.layer_map()?;
637 364 : let level0_deltas = layers.level0_deltas();
638 364 : stats.level0_deltas_count = Some(level0_deltas.len());
639 364 :
640 364 : // Only compact if enough layers have accumulated.
641 364 : let threshold = self.get_compaction_threshold();
642 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
643 336 : debug!(
644 0 : level0_deltas = level0_deltas.len(),
645 0 : threshold, "too few deltas to compact"
646 : );
647 336 : return Ok(CompactLevel0Phase1Result::default());
648 28 : }
649 28 :
650 28 : let mut level0_deltas = level0_deltas
651 28 : .iter()
652 402 : .map(|x| guard.get_from_desc(x))
653 28 : .collect::<Vec<_>>();
654 28 :
655 28 : // Gather the files to compact in this iteration.
656 28 : //
657 28 : // Start with the oldest Level 0 delta file, and collect any other
658 28 : // level 0 files that form a contiguous sequence, such that the end
659 28 : // LSN of previous file matches the start LSN of the next file.
660 28 : //
661 28 : // Note that if the files don't form such a sequence, we might
662 28 : // "compact" just a single file. That's a bit pointless, but it allows
663 28 : // us to get rid of the level 0 file, and compact the other files on
664 28 : // the next iteration. This could probably made smarter, but such
665 28 : // "gaps" in the sequence of level 0 files should only happen in case
666 28 : // of a crash, partial download from cloud storage, or something like
667 28 : // that, so it's not a big deal in practice.
668 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
669 28 : let mut level0_deltas_iter = level0_deltas.iter();
670 28 :
671 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
672 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
673 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
674 28 :
675 28 : // Accumulate the size of layers in `deltas_to_compact`
676 28 : let mut deltas_to_compact_bytes = 0;
677 28 :
678 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
679 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
680 28 : // work in this function to only operate on this much delta data at once.
681 28 : //
682 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
683 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
684 28 : // still let them compact a full stack of L0s in one go.
685 28 : let delta_size_limit = std::cmp::max(
686 28 : self.get_compaction_threshold(),
687 28 : DEFAULT_COMPACTION_THRESHOLD,
688 28 : ) as u64
689 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
690 28 :
691 28 : let mut fully_compacted = true;
692 28 :
693 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
694 402 : for l in level0_deltas_iter {
695 374 : let lsn_range = &l.layer_desc().lsn_range;
696 374 :
697 374 : if lsn_range.start != prev_lsn_end {
698 0 : break;
699 374 : }
700 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
701 374 : deltas_to_compact_bytes += l.metadata().file_size;
702 374 : prev_lsn_end = lsn_range.end;
703 374 :
704 374 : if deltas_to_compact_bytes >= delta_size_limit {
705 0 : info!(
706 0 : l0_deltas_selected = deltas_to_compact.len(),
707 0 : l0_deltas_total = level0_deltas.len(),
708 0 : "L0 compaction picker hit max delta layer size limit: {}",
709 : delta_size_limit
710 : );
711 0 : fully_compacted = false;
712 0 :
713 0 : // Proceed with compaction, but only a subset of L0s
714 0 : break;
715 374 : }
716 : }
717 28 : let lsn_range = Range {
718 28 : start: deltas_to_compact
719 28 : .first()
720 28 : .unwrap()
721 28 : .layer_desc()
722 28 : .lsn_range
723 28 : .start,
724 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
725 28 : };
726 28 :
727 28 : info!(
728 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
729 0 : lsn_range.start,
730 0 : lsn_range.end,
731 0 : deltas_to_compact.len(),
732 0 : level0_deltas.len()
733 : );
734 :
735 402 : for l in deltas_to_compact.iter() {
736 402 : info!("compact includes {l}");
737 : }
738 :
739 : // We don't need the original list of layers anymore. Drop it so that
740 : // we don't accidentally use it later in the function.
741 28 : drop(level0_deltas);
742 28 :
743 28 : stats.read_lock_held_prerequisites_micros = stats
744 28 : .read_lock_held_spawn_blocking_startup_micros
745 28 : .till_now();
746 :
747 : // TODO: replace with streaming k-merge
748 28 : let all_keys = {
749 28 : let mut all_keys = Vec::new();
750 402 : for l in deltas_to_compact.iter() {
751 402 : if self.cancel.is_cancelled() {
752 0 : return Err(CompactionError::ShuttingDown);
753 402 : }
754 2364 : all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
755 : }
756 : // The current stdlib sorting implementation is designed in a way where it is
757 : // particularly fast where the slice is made up of sorted sub-ranges.
758 4431736 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
759 28 : all_keys
760 28 : };
761 28 :
762 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
763 :
764 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
765 : //
766 : // A hole is a key range for which this compaction doesn't have any WAL records.
767 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
768 : // cover the hole, but actually don't contain any WAL records for that key range.
769 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
770 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
771 : //
772 : // The algorithm chooses holes as follows.
773 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
774 : // - Filter: min threshold on range length
775 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
776 : //
777 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
778 : #[derive(PartialEq, Eq)]
779 : struct Hole {
780 : key_range: Range<Key>,
781 : coverage_size: usize,
782 : }
783 28 : let holes: Vec<Hole> = {
784 : use std::cmp::Ordering;
785 : impl Ord for Hole {
786 0 : fn cmp(&self, other: &Self) -> Ordering {
787 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
788 0 : }
789 : }
790 : impl PartialOrd for Hole {
791 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
792 0 : Some(self.cmp(other))
793 0 : }
794 : }
795 28 : let max_holes = deltas_to_compact.len();
796 28 : let last_record_lsn = self.get_last_record_lsn();
797 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
798 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
799 28 : // min-heap (reserve space for one more element added before eviction)
800 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
801 28 : let mut prev: Option<Key> = None;
802 :
803 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
804 2064038 : if let Some(prev_key) = prev {
805 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
806 : // compaction is the gap between data key and metadata keys.
807 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
808 0 : && !Key::is_metadata_key(&prev_key)
809 : {
810 0 : let key_range = prev_key..next_key;
811 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
812 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
813 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
814 0 : // That is why it is better to measure size of hole as number of covering image layers.
815 0 : let coverage_size =
816 0 : layers.image_coverage(&key_range, last_record_lsn).len();
817 0 : if coverage_size >= min_hole_coverage_size {
818 0 : heap.push(Hole {
819 0 : key_range,
820 0 : coverage_size,
821 0 : });
822 0 : if heap.len() > max_holes {
823 0 : heap.pop(); // remove smallest hole
824 0 : }
825 0 : }
826 2064010 : }
827 28 : }
828 2064038 : prev = Some(next_key.next());
829 : }
830 28 : let mut holes = heap.into_vec();
831 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
832 28 : holes
833 28 : };
834 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
835 28 : drop_rlock(guard);
836 28 :
837 28 : if self.cancel.is_cancelled() {
838 0 : return Err(CompactionError::ShuttingDown);
839 28 : }
840 28 :
841 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
842 :
843 : // This iterator walks through all key-value pairs from all the layers
844 : // we're compacting, in key, LSN order.
845 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
846 : // then the Value::Image is ordered before Value::WalRecord.
847 : //
848 : // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
849 : // option and validation code once we've reached confidence.
850 : enum AllValuesIter<'a> {
851 : PageCachedBlobIo {
852 : all_keys_iter: VecIter<'a>,
853 : },
854 : StreamingKmergeBypassingPageCache {
855 : merge_iter: MergeIterator<'a>,
856 : },
857 : ValidatingStreamingKmergeBypassingPageCache {
858 : mode: CompactL0BypassPageCacheValidation,
859 : merge_iter: MergeIterator<'a>,
860 : all_keys_iter: VecIter<'a>,
861 : },
862 : }
863 : type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
864 : impl AllValuesIter<'_> {
865 2064066 : async fn next_all_keys_iter(
866 2064066 : iter: &mut VecIter<'_>,
867 2064066 : ctx: &RequestContext,
868 2064066 : ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
869 : let Some(DeltaEntry {
870 2064038 : key,
871 2064038 : lsn,
872 2064038 : val: value_ref,
873 : ..
874 2064066 : }) = iter.next()
875 : else {
876 28 : return Ok(None);
877 : };
878 2064038 : let value = value_ref.load(ctx).await?;
879 2064038 : Ok(Some((*key, *lsn, value)))
880 2064066 : }
881 2064066 : async fn next(
882 2064066 : &mut self,
883 2064066 : ctx: &RequestContext,
884 2064066 : ) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
885 2064066 : match self {
886 0 : AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
887 0 : Self::next_all_keys_iter(iter, ctx).await
888 : }
889 0 : AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
890 2064066 : AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
891 : // advance both iterators
892 2064066 : let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
893 2064066 : let merge_iter_item = merge_iter.next().await;
894 : // compare results & log warnings as needed
895 : macro_rules! rate_limited_warn {
896 : ($($arg:tt)*) => {{
897 : if cfg!(debug_assertions) || cfg!(feature = "testing") {
898 : warn!($($arg)*);
899 : panic!("CompactL0BypassPageCacheValidation failure, check logs");
900 : }
901 : use once_cell::sync::Lazy;
902 : use utils::rate_limit::RateLimit;
903 : use std::sync::Mutex;
904 : use std::time::Duration;
905 : static LOGGED: Lazy<Mutex<RateLimit>> =
906 0 : Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
907 : let mut rate_limit = LOGGED.lock().unwrap();
908 0 : rate_limit.call(|| {
909 0 : warn!($($arg)*);
910 0 : });
911 : }}
912 : }
913 2064066 : match (&all_keys_iter_item, &merge_iter_item) {
914 0 : (Err(_), Err(_)) => {
915 0 : // don't bother asserting equivality of the errors
916 0 : }
917 0 : (Err(all_keys), Ok(merge)) => {
918 0 : rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
919 0 : },
920 0 : (Ok(all_keys), Err(merge)) => {
921 0 : rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
922 0 : },
923 28 : (Ok(None), Ok(None)) => { }
924 0 : (Ok(Some(all_keys)), Ok(None)) => {
925 0 : rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
926 0 : }
927 0 : (Ok(None), Ok(Some(merge))) => {
928 0 : rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
929 0 : }
930 2064038 : (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
931 2064038 : match mode {
932 : // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
933 : CompactL0BypassPageCacheValidation::KeyLsn => {
934 0 : let all_keys = (all_keys_key, all_keys_lsn);
935 0 : let merge = (merge_key, merge_lsn);
936 0 : if all_keys != merge {
937 0 : rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
938 0 : }
939 : }
940 : CompactL0BypassPageCacheValidation::KeyLsnValue => {
941 2064038 : let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
942 2064038 : let merge = (merge_key, merge_lsn, merge_value);
943 2064038 : if all_keys != merge {
944 0 : rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
945 2064038 : }
946 : }
947 : }
948 : }
949 : }
950 : // in case of mismatch, trust the legacy all_keys_iter_item
951 2064066 : all_keys_iter_item
952 2064066 : }.instrument(info_span!("next")).await
953 : }
954 2064066 : }
955 : }
956 28 : let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
957 0 : CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
958 0 : all_keys_iter: all_keys.iter(),
959 0 : },
960 28 : CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
961 28 : let merge_iter = {
962 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
963 402 : for l in deltas_to_compact.iter() {
964 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
965 402 : deltas.push(l);
966 : }
967 28 : MergeIterator::create(&deltas, &[], ctx)
968 28 : };
969 28 : match validate {
970 0 : None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
971 28 : Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
972 28 : mode: validate.clone(),
973 28 : merge_iter,
974 28 : all_keys_iter: all_keys.iter(),
975 28 : },
976 : }
977 : }
978 : };
979 :
980 : // This iterator walks through all keys and is needed to calculate size used by each key
981 28 : let mut all_keys_iter = all_keys
982 28 : .iter()
983 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
984 2064010 : .coalesce(|mut prev, cur| {
985 2064010 : // Coalesce keys that belong to the same key pair.
986 2064010 : // This ensures that compaction doesn't put them
987 2064010 : // into different layer files.
988 2064010 : // Still limit this by the target file size,
989 2064010 : // so that we keep the size of the files in
990 2064010 : // check.
991 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
992 40038 : prev.2 += cur.2;
993 40038 : Ok(prev)
994 : } else {
995 2023972 : Err((prev, cur))
996 : }
997 2064010 : });
998 28 :
999 28 : // Merge the contents of all the input delta layers into a new set
1000 28 : // of delta layers, based on the current partitioning.
1001 28 : //
1002 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1003 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1004 28 : // would be too large. In that case, we also split on the LSN dimension.
1005 28 : //
1006 28 : // LSN
1007 28 : // ^
1008 28 : // |
1009 28 : // | +-----------+ +--+--+--+--+
1010 28 : // | | | | | | | |
1011 28 : // | +-----------+ | | | | |
1012 28 : // | | | | | | | |
1013 28 : // | +-----------+ ==> | | | | |
1014 28 : // | | | | | | | |
1015 28 : // | +-----------+ | | | | |
1016 28 : // | | | | | | | |
1017 28 : // | +-----------+ +--+--+--+--+
1018 28 : // |
1019 28 : // +--------------> key
1020 28 : //
1021 28 : //
1022 28 : // If one key (X) has a lot of page versions:
1023 28 : //
1024 28 : // LSN
1025 28 : // ^
1026 28 : // | (X)
1027 28 : // | +-----------+ +--+--+--+--+
1028 28 : // | | | | | | | |
1029 28 : // | +-----------+ | | +--+ |
1030 28 : // | | | | | | | |
1031 28 : // | +-----------+ ==> | | | | |
1032 28 : // | | | | | +--+ |
1033 28 : // | +-----------+ | | | | |
1034 28 : // | | | | | | | |
1035 28 : // | +-----------+ +--+--+--+--+
1036 28 : // |
1037 28 : // +--------------> key
1038 28 : // TODO: this actually divides the layers into fixed-size chunks, not
1039 28 : // based on the partitioning.
1040 28 : //
1041 28 : // TODO: we should also opportunistically materialize and
1042 28 : // garbage collect what we can.
1043 28 : let mut new_layers = Vec::new();
1044 28 : let mut prev_key: Option<Key> = None;
1045 28 : let mut writer: Option<DeltaLayerWriter> = None;
1046 28 : let mut key_values_total_size = 0u64;
1047 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1048 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1049 28 : let mut next_hole = 0; // index of next hole in holes vector
1050 28 :
1051 28 : let mut keys = 0;
1052 :
1053 2064066 : while let Some((key, lsn, value)) = all_values_iter
1054 2064066 : .next(ctx)
1055 39353 : .await
1056 2064066 : .map_err(CompactionError::Other)?
1057 : {
1058 2064038 : keys += 1;
1059 2064038 :
1060 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1061 : // avoid hitting the cancellation token on every key. in benches, we end up
1062 : // shuffling an order of million keys per layer, this means we'll check it
1063 : // around tens of times per layer.
1064 0 : return Err(CompactionError::ShuttingDown);
1065 2064038 : }
1066 2064038 :
1067 2064038 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
1068 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1069 2064038 : if !same_key || lsn == dup_end_lsn {
1070 2024000 : let mut next_key_size = 0u64;
1071 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1072 2024000 : dup_start_lsn = Lsn::INVALID;
1073 2024000 : if !same_key {
1074 2024000 : dup_end_lsn = Lsn::INVALID;
1075 2024000 : }
1076 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1077 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1078 2024000 : next_key_size = next_size;
1079 2024000 : if key != next_key {
1080 2023972 : if dup_end_lsn.is_valid() {
1081 0 : // We are writting segment with duplicates:
1082 0 : // place all remaining values of this key in separate segment
1083 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1084 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1085 2023972 : }
1086 2023972 : break;
1087 28 : }
1088 28 : key_values_total_size += next_size;
1089 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1090 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1091 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1092 : // Split key between multiple layers: such layer can contain only single key
1093 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1094 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1095 : } else {
1096 0 : lsn // start with the first LSN for this key
1097 : };
1098 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1099 0 : break;
1100 28 : }
1101 : }
1102 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1103 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1104 0 : dup_start_lsn = dup_end_lsn;
1105 0 : dup_end_lsn = lsn_range.end;
1106 2024000 : }
1107 2024000 : if writer.is_some() {
1108 2023972 : let written_size = writer.as_mut().unwrap().size();
1109 2023972 : let contains_hole =
1110 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1111 : // check if key cause layer overflow or contains hole...
1112 2023972 : if is_dup_layer
1113 2023972 : || dup_end_lsn.is_valid()
1114 2023972 : || written_size + key_values_total_size > target_file_size
1115 2023692 : || contains_hole
1116 : {
1117 : // ... if so, flush previous layer and prepare to write new one
1118 280 : let (desc, path) = writer
1119 280 : .take()
1120 280 : .unwrap()
1121 280 : .finish(prev_key.unwrap().next(), ctx)
1122 720 : .await
1123 280 : .map_err(CompactionError::Other)?;
1124 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1125 280 : .map_err(CompactionError::Other)?;
1126 :
1127 280 : new_layers.push(new_delta);
1128 280 : writer = None;
1129 280 :
1130 280 : if contains_hole {
1131 0 : // skip hole
1132 0 : next_hole += 1;
1133 280 : }
1134 2023692 : }
1135 28 : }
1136 : // Remember size of key value because at next iteration we will access next item
1137 2024000 : key_values_total_size = next_key_size;
1138 40038 : }
1139 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1140 0 : Err(CompactionError::Other(anyhow::anyhow!(
1141 0 : "failpoint delta-layer-writer-fail-before-finish"
1142 0 : )))
1143 2064038 : });
1144 :
1145 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1146 2064038 : if writer.is_none() {
1147 308 : if self.cancel.is_cancelled() {
1148 : // to be somewhat responsive to cancellation, check for each new layer
1149 0 : return Err(CompactionError::ShuttingDown);
1150 308 : }
1151 : // Create writer if not initiaized yet
1152 308 : writer = Some(
1153 : DeltaLayerWriter::new(
1154 308 : self.conf,
1155 308 : self.timeline_id,
1156 308 : self.tenant_shard_id,
1157 308 : key,
1158 308 : if dup_end_lsn.is_valid() {
1159 : // this is a layer containing slice of values of the same key
1160 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1161 0 : dup_start_lsn..dup_end_lsn
1162 : } else {
1163 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1164 308 : lsn_range.clone()
1165 : },
1166 308 : ctx,
1167 : )
1168 154 : .await
1169 308 : .map_err(CompactionError::Other)?,
1170 : );
1171 :
1172 308 : keys = 0;
1173 2063730 : }
1174 :
1175 2064038 : writer
1176 2064038 : .as_mut()
1177 2064038 : .unwrap()
1178 2064038 : .put_value(key, lsn, value, ctx)
1179 1220 : .await
1180 2064038 : .map_err(CompactionError::Other)?;
1181 : } else {
1182 0 : debug!(
1183 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
1184 0 : key,
1185 0 : self.shard_identity.get_shard_number(&key)
1186 : );
1187 : }
1188 :
1189 2064038 : if !new_layers.is_empty() {
1190 19786 : fail_point!("after-timeline-compacted-first-L1");
1191 2044252 : }
1192 :
1193 2064038 : prev_key = Some(key);
1194 : }
1195 28 : if let Some(writer) = writer {
1196 28 : let (desc, path) = writer
1197 28 : .finish(prev_key.unwrap().next(), ctx)
1198 1992 : .await
1199 28 : .map_err(CompactionError::Other)?;
1200 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1201 28 : .map_err(CompactionError::Other)?;
1202 28 : new_layers.push(new_delta);
1203 0 : }
1204 :
1205 : // Sync layers
1206 28 : if !new_layers.is_empty() {
1207 : // Print a warning if the created layer is larger than double the target size
1208 : // Add two pages for potential overhead. This should in theory be already
1209 : // accounted for in the target calculation, but for very small targets,
1210 : // we still might easily hit the limit otherwise.
1211 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1212 308 : for layer in new_layers.iter() {
1213 308 : if layer.layer_desc().file_size > warn_limit {
1214 0 : warn!(
1215 : %layer,
1216 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1217 : );
1218 308 : }
1219 : }
1220 :
1221 : // The writer.finish() above already did the fsync of the inodes.
1222 : // We just need to fsync the directory in which these inodes are linked,
1223 : // which we know to be the timeline directory.
1224 : //
1225 : // We use fatal_err() below because the after writer.finish() returns with success,
1226 : // the in-memory state of the filesystem already has the layer file in its final place,
1227 : // and subsequent pageserver code could think it's durable while it really isn't.
1228 28 : let timeline_dir = VirtualFile::open(
1229 28 : &self
1230 28 : .conf
1231 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1232 28 : ctx,
1233 28 : )
1234 14 : .await
1235 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1236 28 : timeline_dir
1237 28 : .sync_all()
1238 14 : .await
1239 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1240 0 : }
1241 :
1242 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1243 28 : stats.new_deltas_count = Some(new_layers.len());
1244 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1245 28 :
1246 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1247 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1248 : {
1249 28 : Ok(stats_json) => {
1250 28 : info!(
1251 0 : stats_json = stats_json.as_str(),
1252 0 : "compact_level0_phase1 stats available"
1253 : )
1254 : }
1255 0 : Err(e) => {
1256 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1257 : }
1258 : }
1259 :
1260 : // Without this, rustc complains about deltas_to_compact still
1261 : // being borrowed when we `.into_iter()` below.
1262 28 : drop(all_values_iter);
1263 28 :
1264 28 : Ok(CompactLevel0Phase1Result {
1265 28 : new_layers,
1266 28 : deltas_to_compact: deltas_to_compact
1267 28 : .into_iter()
1268 402 : .map(|x| x.drop_eviction_guard())
1269 28 : .collect::<Vec<_>>(),
1270 28 : fully_compacted,
1271 28 : })
1272 364 : }
1273 : }
1274 :
1275 : #[derive(Default)]
1276 : struct CompactLevel0Phase1Result {
1277 : new_layers: Vec<ResidentLayer>,
1278 : deltas_to_compact: Vec<Layer>,
1279 : // Whether we have included all L0 layers, or selected only part of them due to the
1280 : // L0 compaction size limit.
1281 : fully_compacted: bool,
1282 : }
1283 :
1284 : #[derive(Default)]
1285 : struct CompactLevel0Phase1StatsBuilder {
1286 : version: Option<u64>,
1287 : tenant_id: Option<TenantShardId>,
1288 : timeline_id: Option<TimelineId>,
1289 : read_lock_acquisition_micros: DurationRecorder,
1290 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1291 : read_lock_held_key_sort_micros: DurationRecorder,
1292 : read_lock_held_prerequisites_micros: DurationRecorder,
1293 : read_lock_held_compute_holes_micros: DurationRecorder,
1294 : read_lock_drop_micros: DurationRecorder,
1295 : write_layer_files_micros: DurationRecorder,
1296 : level0_deltas_count: Option<usize>,
1297 : new_deltas_count: Option<usize>,
1298 : new_deltas_size: Option<u64>,
1299 : }
1300 :
1301 : #[derive(serde::Serialize)]
1302 : struct CompactLevel0Phase1Stats {
1303 : version: u64,
1304 : tenant_id: TenantShardId,
1305 : timeline_id: TimelineId,
1306 : read_lock_acquisition_micros: RecordedDuration,
1307 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1308 : read_lock_held_key_sort_micros: RecordedDuration,
1309 : read_lock_held_prerequisites_micros: RecordedDuration,
1310 : read_lock_held_compute_holes_micros: RecordedDuration,
1311 : read_lock_drop_micros: RecordedDuration,
1312 : write_layer_files_micros: RecordedDuration,
1313 : level0_deltas_count: usize,
1314 : new_deltas_count: usize,
1315 : new_deltas_size: u64,
1316 : }
1317 :
1318 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1319 : type Error = anyhow::Error;
1320 :
1321 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1322 28 : Ok(Self {
1323 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1324 28 : tenant_id: value
1325 28 : .tenant_id
1326 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1327 28 : timeline_id: value
1328 28 : .timeline_id
1329 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1330 28 : read_lock_acquisition_micros: value
1331 28 : .read_lock_acquisition_micros
1332 28 : .into_recorded()
1333 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1334 28 : read_lock_held_spawn_blocking_startup_micros: value
1335 28 : .read_lock_held_spawn_blocking_startup_micros
1336 28 : .into_recorded()
1337 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1338 28 : read_lock_held_key_sort_micros: value
1339 28 : .read_lock_held_key_sort_micros
1340 28 : .into_recorded()
1341 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1342 28 : read_lock_held_prerequisites_micros: value
1343 28 : .read_lock_held_prerequisites_micros
1344 28 : .into_recorded()
1345 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1346 28 : read_lock_held_compute_holes_micros: value
1347 28 : .read_lock_held_compute_holes_micros
1348 28 : .into_recorded()
1349 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1350 28 : read_lock_drop_micros: value
1351 28 : .read_lock_drop_micros
1352 28 : .into_recorded()
1353 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1354 28 : write_layer_files_micros: value
1355 28 : .write_layer_files_micros
1356 28 : .into_recorded()
1357 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1358 28 : level0_deltas_count: value
1359 28 : .level0_deltas_count
1360 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1361 28 : new_deltas_count: value
1362 28 : .new_deltas_count
1363 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1364 28 : new_deltas_size: value
1365 28 : .new_deltas_size
1366 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1367 : })
1368 28 : }
1369 : }
1370 :
1371 0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
1372 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
1373 : pub enum CompactL0Phase1ValueAccess {
1374 : /// The old way.
1375 : PageCachedBlobIo,
1376 : /// The new way.
1377 : StreamingKmerge {
1378 : /// If set, we run both the old way and the new way, validate that
1379 : /// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
1380 : /// and if the validation fails,
1381 : /// - in tests: fail them with a panic or
1382 : /// - in prod, log a rate-limited warning and use the old way's results.
1383 : ///
1384 : /// If not set, we only run the new way and trust its results.
1385 : validate: Option<CompactL0BypassPageCacheValidation>,
1386 : },
1387 : }
1388 :
1389 : /// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
1390 0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
1391 : #[serde(rename_all = "kebab-case")]
1392 : pub enum CompactL0BypassPageCacheValidation {
1393 : /// Validate that the series of (key, lsn) pairs are the same.
1394 : KeyLsn,
1395 : /// Validate that the entire output of old and new way is identical.
1396 : KeyLsnValue,
1397 : }
1398 :
1399 : impl Default for CompactL0Phase1ValueAccess {
1400 212 : fn default() -> Self {
1401 212 : CompactL0Phase1ValueAccess::StreamingKmerge {
1402 212 : // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
1403 212 : validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
1404 212 : }
1405 212 : }
1406 : }
1407 :
1408 : impl Timeline {
1409 : /// Entry point for new tiered compaction algorithm.
1410 : ///
1411 : /// All the real work is in the implementation in the pageserver_compaction
1412 : /// crate. The code here would apply to any algorithm implemented by the
1413 : /// same interface, but tiered is the only one at the moment.
1414 : ///
1415 : /// TODO: cancellation
1416 0 : pub(crate) async fn compact_tiered(
1417 0 : self: &Arc<Self>,
1418 0 : _cancel: &CancellationToken,
1419 0 : ctx: &RequestContext,
1420 0 : ) -> Result<(), CompactionError> {
1421 0 : let fanout = self.get_compaction_threshold() as u64;
1422 0 : let target_file_size = self.get_checkpoint_distance();
1423 :
1424 : // Find the top of the historical layers
1425 0 : let end_lsn = {
1426 0 : let guard = self.layers.read().await;
1427 0 : let layers = guard.layer_map()?;
1428 :
1429 0 : let l0_deltas = layers.level0_deltas();
1430 0 :
1431 0 : // As an optimization, if we find that there are too few L0 layers,
1432 0 : // bail out early. We know that the compaction algorithm would do
1433 0 : // nothing in that case.
1434 0 : if l0_deltas.len() < fanout as usize {
1435 : // doesn't need compacting
1436 0 : return Ok(());
1437 0 : }
1438 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1439 0 : };
1440 0 :
1441 0 : // Is the timeline being deleted?
1442 0 : if self.is_stopping() {
1443 0 : trace!("Dropping out of compaction on timeline shutdown");
1444 0 : return Err(CompactionError::ShuttingDown);
1445 0 : }
1446 :
1447 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1448 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1449 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1450 0 :
1451 0 : pageserver_compaction::compact_tiered::compact_tiered(
1452 0 : &mut adaptor,
1453 0 : end_lsn,
1454 0 : target_file_size,
1455 0 : fanout,
1456 0 : ctx,
1457 0 : )
1458 0 : .await
1459 : // TODO: compact_tiered needs to return CompactionError
1460 0 : .map_err(CompactionError::Other)?;
1461 :
1462 0 : adaptor.flush_updates().await?;
1463 0 : Ok(())
1464 0 : }
1465 :
1466 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1467 : ///
1468 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1469 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1470 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1471 : ///
1472 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1473 : ///
1474 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1475 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1476 : ///
1477 : /// The function will produce:
1478 : ///
1479 : /// ```plain
1480 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1481 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1482 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1483 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1484 : /// ```
1485 : ///
1486 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1487 346 : pub(crate) async fn generate_key_retention(
1488 346 : self: &Arc<Timeline>,
1489 346 : key: Key,
1490 346 : full_history: &[(Key, Lsn, Value)],
1491 346 : horizon: Lsn,
1492 346 : retain_lsn_below_horizon: &[Lsn],
1493 346 : delta_threshold_cnt: usize,
1494 346 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1495 346 : ) -> anyhow::Result<KeyHistoryRetention> {
1496 346 : // Pre-checks for the invariants
1497 346 : if cfg!(debug_assertions) {
1498 836 : for (log_key, _, _) in full_history {
1499 490 : assert_eq!(log_key, &key, "mismatched key");
1500 : }
1501 346 : for i in 1..full_history.len() {
1502 144 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1503 144 : if full_history[i - 1].1 == full_history[i].1 {
1504 0 : assert!(
1505 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1506 0 : "unordered delta/image, or duplicated delta"
1507 : );
1508 144 : }
1509 : }
1510 : // There was an assertion for no base image that checks if the first
1511 : // record in the history is `will_init` before, but it was removed.
1512 : // This is explained in the test cases for generate_key_retention.
1513 : // Search "incomplete history" for more information.
1514 732 : for lsn in retain_lsn_below_horizon {
1515 386 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1516 : }
1517 346 : for i in 1..retain_lsn_below_horizon.len() {
1518 186 : assert!(
1519 186 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1520 0 : "unordered LSN"
1521 : );
1522 : }
1523 0 : }
1524 346 : let has_ancestor = base_img_from_ancestor.is_some();
1525 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1526 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1527 346 : let (mut split_history, lsn_split_points) = {
1528 346 : let mut split_history = Vec::new();
1529 346 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1530 346 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1531 732 : for lsn in retain_lsn_below_horizon {
1532 386 : lsn_split_points.push(*lsn);
1533 386 : }
1534 346 : lsn_split_points.push(horizon);
1535 346 : let mut current_idx = 0;
1536 836 : for item @ (_, lsn, _) in full_history {
1537 598 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1538 108 : current_idx += 1;
1539 108 : }
1540 490 : split_history[current_idx].push(item);
1541 : }
1542 346 : (split_history, lsn_split_points)
1543 : };
1544 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1545 1424 : for split_for_lsn in &mut split_history {
1546 1078 : let mut prev_lsn = None;
1547 1078 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1548 1078 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1549 490 : if let Some(prev_lsn) = &prev_lsn {
1550 60 : if *prev_lsn == lsn {
1551 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1552 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1553 : // drop this delta and keep the image.
1554 : //
1555 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1556 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1557 : // dropped.
1558 : //
1559 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1560 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1561 0 : continue;
1562 60 : }
1563 430 : }
1564 490 : prev_lsn = Some(lsn);
1565 490 : new_split_for_lsn.push(record);
1566 : }
1567 1078 : *split_for_lsn = new_split_for_lsn;
1568 : }
1569 : // Step 3: generate images when necessary
1570 346 : let mut retention = Vec::with_capacity(split_history.len());
1571 346 : let mut records_since_last_image = 0;
1572 346 : let batch_cnt = split_history.len();
1573 346 : assert!(
1574 346 : batch_cnt >= 2,
1575 0 : "should have at least below + above horizon batches"
1576 : );
1577 346 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1578 346 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1579 18 : replay_history.push((key, lsn, Value::Image(img)));
1580 328 : }
1581 :
1582 : /// Generate debug information for the replay history
1583 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1584 0 : use std::fmt::Write;
1585 0 : let mut output = String::new();
1586 0 : if let Some((key, _, _)) = replay_history.first() {
1587 0 : write!(output, "key={} ", key).unwrap();
1588 0 : let mut cnt = 0;
1589 0 : for (_, lsn, val) in replay_history {
1590 0 : if val.is_image() {
1591 0 : write!(output, "i@{} ", lsn).unwrap();
1592 0 : } else if val.will_init() {
1593 0 : write!(output, "di@{} ", lsn).unwrap();
1594 0 : } else {
1595 0 : write!(output, "d@{} ", lsn).unwrap();
1596 0 : }
1597 0 : cnt += 1;
1598 0 : if cnt >= 128 {
1599 0 : write!(output, "... and more").unwrap();
1600 0 : break;
1601 0 : }
1602 : }
1603 0 : } else {
1604 0 : write!(output, "<no history>").unwrap();
1605 0 : }
1606 0 : output
1607 0 : }
1608 :
1609 0 : fn generate_debug_trace(
1610 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1611 0 : full_history: &[(Key, Lsn, Value)],
1612 0 : lsns: &[Lsn],
1613 0 : horizon: Lsn,
1614 0 : ) -> String {
1615 0 : use std::fmt::Write;
1616 0 : let mut output = String::new();
1617 0 : if let Some(replay_history) = replay_history {
1618 0 : writeln!(
1619 0 : output,
1620 0 : "replay_history: {}",
1621 0 : generate_history_trace(replay_history)
1622 0 : )
1623 0 : .unwrap();
1624 0 : } else {
1625 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1626 0 : }
1627 0 : writeln!(
1628 0 : output,
1629 0 : "full_history: {}",
1630 0 : generate_history_trace(full_history)
1631 0 : )
1632 0 : .unwrap();
1633 0 : writeln!(
1634 0 : output,
1635 0 : "when processing: [{}] horizon={}",
1636 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1637 0 : horizon
1638 0 : )
1639 0 : .unwrap();
1640 0 : output
1641 0 : }
1642 :
1643 1078 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1644 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1645 1078 : records_since_last_image += split_for_lsn.len();
1646 1078 : let generate_image = if i == 0 && !has_ancestor {
1647 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1648 328 : true
1649 750 : } else if i == batch_cnt - 1 {
1650 : // Do not generate images for the last batch (above horizon)
1651 346 : false
1652 404 : } else if records_since_last_image >= delta_threshold_cnt {
1653 : // Generate images when there are too many records
1654 6 : true
1655 : } else {
1656 398 : false
1657 : };
1658 1078 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1659 : // Only retain the items after the last image record
1660 1346 : for idx in (0..replay_history.len()).rev() {
1661 1346 : if replay_history[idx].2.will_init() {
1662 1078 : replay_history = replay_history[idx..].to_vec();
1663 1078 : break;
1664 268 : }
1665 : }
1666 1078 : if let Some((_, _, val)) = replay_history.first() {
1667 1078 : if !val.will_init() {
1668 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
1669 0 : || {
1670 0 : generate_debug_trace(
1671 0 : Some(&replay_history),
1672 0 : full_history,
1673 0 : retain_lsn_below_horizon,
1674 0 : horizon,
1675 0 : )
1676 0 : },
1677 0 : );
1678 1078 : }
1679 0 : }
1680 1078 : if generate_image && records_since_last_image > 0 {
1681 334 : records_since_last_image = 0;
1682 334 : let replay_history_for_debug = if cfg!(debug_assertions) {
1683 334 : Some(replay_history.clone())
1684 : } else {
1685 0 : None
1686 : };
1687 334 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1688 334 : let history = std::mem::take(&mut replay_history);
1689 334 : let mut img = None;
1690 334 : let mut records = Vec::with_capacity(history.len());
1691 334 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1692 334 : img = Some((*lsn, val.clone()));
1693 334 : for (_, lsn, val) in history.into_iter().skip(1) {
1694 34 : let Value::WalRecord(rec) = val else {
1695 0 : return Err(anyhow::anyhow!(
1696 0 : "invalid record, first record is image, expect walrecords"
1697 0 : ))
1698 0 : .with_context(|| {
1699 0 : generate_debug_trace(
1700 0 : replay_history_for_debug_ref,
1701 0 : full_history,
1702 0 : retain_lsn_below_horizon,
1703 0 : horizon,
1704 0 : )
1705 0 : });
1706 : };
1707 34 : records.push((lsn, rec));
1708 : }
1709 : } else {
1710 0 : for (_, lsn, val) in history.into_iter() {
1711 0 : let Value::WalRecord(rec) = val else {
1712 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
1713 0 : .with_context(|| generate_debug_trace(
1714 0 : replay_history_for_debug_ref,
1715 0 : full_history,
1716 0 : retain_lsn_below_horizon,
1717 0 : horizon,
1718 0 : ));
1719 : };
1720 0 : records.push((lsn, rec));
1721 : }
1722 : }
1723 334 : records.reverse();
1724 334 : let state = ValueReconstructState { img, records };
1725 334 : let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
1726 334 : let img = self.reconstruct_value(key, request_lsn, state).await?;
1727 334 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
1728 334 : retention.push(vec![(request_lsn, Value::Image(img))]);
1729 744 : } else {
1730 744 : let deltas = split_for_lsn
1731 744 : .iter()
1732 744 : .map(|(_, lsn, value)| (*lsn, value.clone()))
1733 744 : .collect_vec();
1734 744 : retention.push(deltas);
1735 744 : }
1736 : }
1737 346 : let mut result = Vec::with_capacity(retention.len());
1738 346 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
1739 1078 : for (idx, logs) in retention.into_iter().enumerate() {
1740 1078 : if idx == lsn_split_points.len() {
1741 346 : return Ok(KeyHistoryRetention {
1742 346 : below_horizon: result,
1743 346 : above_horizon: KeyLogAtLsn(logs),
1744 346 : });
1745 732 : } else {
1746 732 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
1747 732 : }
1748 : }
1749 0 : unreachable!("key retention is empty")
1750 346 : }
1751 :
1752 : /// An experimental compaction building block that combines compaction with garbage collection.
1753 : ///
1754 : /// The current implementation picks all delta + image layers that are below or intersecting with
1755 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
1756 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
1757 : /// and create delta layers with all deltas >= gc horizon.
1758 20 : pub(crate) async fn compact_with_gc(
1759 20 : self: &Arc<Self>,
1760 20 : cancel: &CancellationToken,
1761 20 : flags: EnumSet<CompactFlags>,
1762 20 : ctx: &RequestContext,
1763 20 : ) -> anyhow::Result<()> {
1764 20 : use std::collections::BTreeSet;
1765 20 :
1766 20 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
1767 20 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
1768 20 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
1769 20 :
1770 20 : let gc_lock = async {
1771 : tokio::select! {
1772 : guard = self.gc_lock.lock() => Ok(guard),
1773 : // TODO: refactor to CompactionError to correctly pass cancelled error
1774 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
1775 : }
1776 20 : };
1777 :
1778 20 : let gc_lock = crate::timed(
1779 20 : gc_lock,
1780 20 : "acquires gc lock",
1781 20 : std::time::Duration::from_secs(5),
1782 20 : )
1783 0 : .await?;
1784 :
1785 20 : let dry_run = flags.contains(CompactFlags::DryRun);
1786 20 :
1787 20 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
1788 :
1789 : scopeguard::defer! {
1790 : info!("done enhanced gc bottom-most compaction");
1791 : };
1792 :
1793 20 : let mut stat = CompactionStatistics::default();
1794 :
1795 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
1796 : // The layer selection has the following properties:
1797 : // 1. If a layer is in the selection, all layers below it are in the selection.
1798 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
1799 20 : let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
1800 20 : let guard = self.layers.read().await;
1801 20 : let layers = guard.layer_map()?;
1802 20 : let gc_info = self.gc_info.read().unwrap();
1803 20 : let mut retain_lsns_below_horizon = Vec::new();
1804 20 : let gc_cutoff = gc_info.cutoffs.select_min();
1805 22 : for (lsn, _timeline_id) in &gc_info.retain_lsns {
1806 22 : if lsn < &gc_cutoff {
1807 22 : retain_lsns_below_horizon.push(*lsn);
1808 22 : }
1809 : }
1810 20 : for lsn in gc_info.leases.keys() {
1811 0 : if lsn < &gc_cutoff {
1812 0 : retain_lsns_below_horizon.push(*lsn);
1813 0 : }
1814 : }
1815 20 : let mut selected_layers = Vec::new();
1816 20 : drop(gc_info);
1817 98 : for desc in layers.iter_historic_layers() {
1818 98 : if desc.get_lsn_range().start <= gc_cutoff {
1819 80 : selected_layers.push(guard.get_from_desc(&desc));
1820 80 : }
1821 : }
1822 20 : retain_lsns_below_horizon.sort();
1823 20 : (selected_layers, gc_cutoff, retain_lsns_below_horizon)
1824 : };
1825 20 : let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
1826 2 : Lsn(self.ancestor_lsn.0 + 1)
1827 : } else {
1828 18 : let res = retain_lsns_below_horizon
1829 18 : .first()
1830 18 : .copied()
1831 18 : .unwrap_or(gc_cutoff);
1832 18 : if cfg!(debug_assertions) {
1833 18 : assert_eq!(
1834 18 : res,
1835 18 : retain_lsns_below_horizon
1836 18 : .iter()
1837 18 : .min()
1838 18 : .copied()
1839 18 : .unwrap_or(gc_cutoff)
1840 18 : );
1841 0 : }
1842 18 : res
1843 : };
1844 20 : info!(
1845 0 : "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
1846 0 : layer_selection.len(),
1847 : gc_cutoff,
1848 : lowest_retain_lsn
1849 : );
1850 : // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
1851 : // Also, collect the layer information to decide when to split the new delta layers.
1852 20 : let mut downloaded_layers = Vec::new();
1853 20 : let mut delta_split_points = BTreeSet::new();
1854 100 : for layer in &layer_selection {
1855 80 : let resident_layer = layer.download_and_keep_resident().await?;
1856 80 : downloaded_layers.push(resident_layer);
1857 80 :
1858 80 : let desc = layer.layer_desc();
1859 80 : if desc.is_delta() {
1860 54 : // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
1861 54 : // so that we can avoid having too many small delta layers.
1862 54 : let key_range = desc.get_key_range();
1863 54 : delta_split_points.insert(key_range.start);
1864 54 : delta_split_points.insert(key_range.end);
1865 54 : stat.visit_delta_layer(desc.file_size());
1866 54 : } else {
1867 26 : stat.visit_image_layer(desc.file_size());
1868 26 : }
1869 : }
1870 20 : let mut delta_layers = Vec::new();
1871 20 : let mut image_layers = Vec::new();
1872 100 : for resident_layer in &downloaded_layers {
1873 80 : if resident_layer.layer_desc().is_delta() {
1874 54 : let layer = resident_layer.get_as_delta(ctx).await?;
1875 54 : delta_layers.push(layer);
1876 26 : } else {
1877 26 : let layer = resident_layer.get_as_image(ctx).await?;
1878 26 : image_layers.push(layer);
1879 : }
1880 : }
1881 20 : let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
1882 20 : // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
1883 20 : // Data of the same key.
1884 20 : let mut accumulated_values = Vec::new();
1885 20 : let mut last_key: Option<Key> = None;
1886 20 :
1887 20 : enum FlushDeltaResult {
1888 20 : /// Create a new resident layer
1889 20 : CreateResidentLayer(ResidentLayer),
1890 20 : /// Keep an original delta layer
1891 20 : KeepLayer(PersistentLayerKey),
1892 20 : }
1893 20 :
1894 20 : #[allow(clippy::too_many_arguments)]
1895 338 : async fn flush_deltas(
1896 338 : deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
1897 338 : last_key: Key,
1898 338 : delta_split_points: &[Key],
1899 338 : current_delta_split_point: &mut usize,
1900 338 : tline: &Arc<Timeline>,
1901 338 : lowest_retain_lsn: Lsn,
1902 338 : ctx: &RequestContext,
1903 338 : stats: &mut CompactionStatistics,
1904 338 : dry_run: bool,
1905 338 : last_batch: bool,
1906 338 : ) -> anyhow::Result<Option<FlushDeltaResult>> {
1907 338 : // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
1908 338 : // overlapping layers.
1909 338 : //
1910 338 : // If we have a structure like this:
1911 338 : //
1912 338 : // | Delta 1 | | Delta 4 |
1913 338 : // |---------| Delta 2 |---------|
1914 338 : // | Delta 3 | | Delta 5 |
1915 338 : //
1916 338 : // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
1917 338 : // A simple solution here is to split the delta layers using the original boundary, while this
1918 338 : // might produce a lot of small layers. This should be improved and fixed in the future.
1919 338 : let mut need_split = false;
1920 428 : while *current_delta_split_point < delta_split_points.len()
1921 362 : && last_key >= delta_split_points[*current_delta_split_point]
1922 90 : {
1923 90 : *current_delta_split_point += 1;
1924 90 : need_split = true;
1925 90 : }
1926 338 : if !need_split && !last_batch {
1927 232 : return Ok(None);
1928 106 : }
1929 106 : let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
1930 106 : if deltas.is_empty() {
1931 48 : return Ok(None);
1932 58 : }
1933 92 : let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
1934 58 : let delta_key = PersistentLayerKey {
1935 58 : key_range: {
1936 58 : let key_start = deltas.first().unwrap().0;
1937 58 : let key_end = deltas.last().unwrap().0.next();
1938 58 : key_start..key_end
1939 58 : },
1940 58 : lsn_range: lowest_retain_lsn..end_lsn,
1941 58 : is_delta: true,
1942 58 : };
1943 20 : {
1944 20 : // Hack: skip delta layer if we need to produce a layer of a same key-lsn.
1945 20 : //
1946 20 : // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
1947 20 : // For example, consider the case where a single delta with range [0x10,0x50) exists.
1948 20 : // And we have branches at LSN 0x10, 0x20, 0x30.
1949 20 : // Then we delete branch @ 0x20.
1950 20 : // Bottom-most compaction may now delete the delta [0x20,0x30).
1951 20 : // And that wouldnt' change the shape of the layer.
1952 20 : //
1953 20 : // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
1954 20 : // That's why it's safe to skip.
1955 58 : let guard = tline.layers.read().await;
1956 20 :
1957 58 : if guard.contains_key(&delta_key) {
1958 26 : let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
1959 26 : drop(guard);
1960 26 : if layer_generation == tline.generation {
1961 26 : stats.discard_delta_layer();
1962 26 : // TODO: depending on whether we design this compaction process to run along with
1963 26 : // other compactions, there could be layer map modifications after we drop the
1964 26 : // layer guard, and in case it creates duplicated layer key, we will still error
1965 26 : // in the end.
1966 26 : info!(
1967 20 : key=%delta_key,
1968 20 : ?layer_generation,
1969 20 : "discard delta layer due to duplicated layer in the same generation"
1970 20 : );
1971 26 : return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
1972 20 : }
1973 32 : }
1974 20 : }
1975 20 :
1976 32 : let mut delta_layer_writer = DeltaLayerWriter::new(
1977 32 : tline.conf,
1978 32 : tline.timeline_id,
1979 32 : tline.tenant_shard_id,
1980 32 : delta_key.key_range.start,
1981 32 : lowest_retain_lsn..end_lsn,
1982 32 : ctx,
1983 32 : )
1984 20 : .await?;
1985 90 : for (key, lsn, val) in deltas {
1986 58 : delta_layer_writer.put_value(key, lsn, val, ctx).await?;
1987 20 : }
1988 20 :
1989 32 : stats.produce_delta_layer(delta_layer_writer.size());
1990 32 : if dry_run {
1991 20 : return Ok(None);
1992 24 : }
1993 20 :
1994 24 : let (desc, path) = delta_layer_writer
1995 24 : .finish(delta_key.key_range.end, ctx)
1996 62 : .await?;
1997 24 : let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
1998 24 : Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
1999 338 : }
2000 20 :
2001 20 : // Hack the key range to be min..(max-1). Otherwise, the image layer will be
2002 20 : // interpreted as an L0 delta layer.
2003 20 : let hack_image_layer_range = {
2004 20 : let mut end_key = Key::MAX;
2005 20 : end_key.field6 -= 1;
2006 20 : Key::MIN..end_key
2007 : };
2008 :
2009 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2010 : // when some condition meet.
2011 20 : let mut image_layer_writer = if self.ancestor_timeline.is_none() {
2012 : Some(
2013 18 : ImageLayerWriter::new(
2014 18 : self.conf,
2015 18 : self.timeline_id,
2016 18 : self.tenant_shard_id,
2017 18 : &hack_image_layer_range, // covers the full key range
2018 18 : lowest_retain_lsn,
2019 18 : ctx,
2020 18 : )
2021 9 : .await?,
2022 : )
2023 : } else {
2024 2 : None
2025 : };
2026 :
2027 : /// Returns None if there is no ancestor branch. Throw an error when the key is not found.
2028 : ///
2029 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2030 : /// is needed for reconstruction. This should be fixed in the future.
2031 : ///
2032 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2033 : /// images.
2034 338 : async fn get_ancestor_image(
2035 338 : tline: &Arc<Timeline>,
2036 338 : key: Key,
2037 338 : ctx: &RequestContext,
2038 338 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2039 338 : if tline.ancestor_timeline.is_none() {
2040 324 : return Ok(None);
2041 14 : };
2042 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2043 : // as much existing code as possible.
2044 14 : let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
2045 14 : Ok(Some((key, tline.ancestor_lsn, img)))
2046 338 : }
2047 20 : let image_layer_key = PersistentLayerKey {
2048 20 : key_range: hack_image_layer_range,
2049 20 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
2050 20 : is_delta: false,
2051 20 : };
2052 :
2053 : // Like with delta layers, it can happen that we re-produce an already existing image layer.
2054 : // This could happen when a user triggers force compaction and image generation. In this case,
2055 : // it's always safe to rewrite the layer.
2056 20 : let discard_image_layer = {
2057 20 : let guard = self.layers.read().await;
2058 20 : if guard.contains_key(&image_layer_key) {
2059 6 : let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
2060 6 : drop(guard);
2061 6 : if layer_generation == self.generation {
2062 : // TODO: depending on whether we design this compaction process to run along with
2063 : // other compactions, there could be layer map modifications after we drop the
2064 : // layer guard, and in case it creates duplicated layer key, we will still error
2065 : // in the end.
2066 6 : info!(
2067 : key=%image_layer_key,
2068 : ?layer_generation,
2069 0 : "discard image layer due to duplicated layer key in the same generation",
2070 : );
2071 6 : true
2072 : } else {
2073 0 : false
2074 : }
2075 : } else {
2076 14 : false
2077 : }
2078 : };
2079 :
2080 : // Actually, we can decide not to write to the image layer at all at this point because
2081 : // the key and LSN range are determined. However, to keep things simple here, we still
2082 : // create this writer, and discard the writer in the end.
2083 :
2084 20 : let mut delta_values = Vec::new();
2085 20 : let delta_split_points = delta_split_points.into_iter().collect_vec();
2086 20 : let mut current_delta_split_point = 0;
2087 20 : let mut delta_layers = Vec::new();
2088 460 : while let Some((key, lsn, val)) = merge_iter.next().await? {
2089 440 : if cancel.is_cancelled() {
2090 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2091 440 : }
2092 440 : match val {
2093 336 : Value::Image(_) => stat.visit_image_key(&val),
2094 104 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2095 : }
2096 440 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2097 122 : if last_key.is_none() {
2098 20 : last_key = Some(key);
2099 102 : }
2100 122 : accumulated_values.push((key, lsn, val));
2101 : } else {
2102 318 : let last_key = last_key.as_mut().unwrap();
2103 318 : stat.on_unique_key_visited();
2104 318 : let retention = self
2105 318 : .generate_key_retention(
2106 318 : *last_key,
2107 318 : &accumulated_values,
2108 318 : gc_cutoff,
2109 318 : &retain_lsns_below_horizon,
2110 318 : COMPACTION_DELTA_THRESHOLD,
2111 318 : get_ancestor_image(self, *last_key, ctx).await?,
2112 : )
2113 0 : .await?;
2114 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
2115 318 : retention
2116 318 : .pipe_to(
2117 318 : *last_key,
2118 318 : &mut delta_values,
2119 318 : image_layer_writer.as_mut(),
2120 318 : &mut stat,
2121 318 : ctx,
2122 318 : )
2123 310 : .await?;
2124 318 : delta_layers.extend(
2125 318 : flush_deltas(
2126 318 : &mut delta_values,
2127 318 : *last_key,
2128 318 : &delta_split_points,
2129 318 : &mut current_delta_split_point,
2130 318 : self,
2131 318 : lowest_retain_lsn,
2132 318 : ctx,
2133 318 : &mut stat,
2134 318 : dry_run,
2135 318 : false,
2136 318 : )
2137 71 : .await?,
2138 : );
2139 318 : accumulated_values.clear();
2140 318 : *last_key = key;
2141 318 : accumulated_values.push((key, lsn, val));
2142 : }
2143 : }
2144 :
2145 20 : let last_key = last_key.expect("no keys produced during compaction");
2146 20 : // TODO: move this part to the loop body
2147 20 : stat.on_unique_key_visited();
2148 20 : let retention = self
2149 20 : .generate_key_retention(
2150 20 : last_key,
2151 20 : &accumulated_values,
2152 20 : gc_cutoff,
2153 20 : &retain_lsns_below_horizon,
2154 20 : COMPACTION_DELTA_THRESHOLD,
2155 20 : get_ancestor_image(self, last_key, ctx).await?,
2156 : )
2157 0 : .await?;
2158 : // Put the image into the image layer. Currently we have a single big layer for the compaction.
2159 20 : retention
2160 20 : .pipe_to(
2161 20 : last_key,
2162 20 : &mut delta_values,
2163 20 : image_layer_writer.as_mut(),
2164 20 : &mut stat,
2165 20 : ctx,
2166 20 : )
2167 18 : .await?;
2168 20 : delta_layers.extend(
2169 20 : flush_deltas(
2170 20 : &mut delta_values,
2171 20 : last_key,
2172 20 : &delta_split_points,
2173 20 : &mut current_delta_split_point,
2174 20 : self,
2175 20 : lowest_retain_lsn,
2176 20 : ctx,
2177 20 : &mut stat,
2178 20 : dry_run,
2179 20 : true,
2180 20 : )
2181 7 : .await?,
2182 : );
2183 20 : assert!(delta_values.is_empty(), "unprocessed keys");
2184 :
2185 20 : let image_layer = if discard_image_layer {
2186 6 : stat.discard_image_layer();
2187 6 : None
2188 14 : } else if let Some(writer) = image_layer_writer {
2189 12 : stat.produce_image_layer(writer.size());
2190 12 : if !dry_run {
2191 20 : Some(writer.finish(self, ctx).await?)
2192 : } else {
2193 2 : None
2194 : }
2195 : } else {
2196 2 : None
2197 : };
2198 :
2199 20 : info!(
2200 0 : "gc-compaction statistics: {}",
2201 0 : serde_json::to_string(&stat)?
2202 : );
2203 :
2204 20 : if dry_run {
2205 2 : return Ok(());
2206 18 : }
2207 18 :
2208 18 : info!(
2209 0 : "produced {} delta layers and {} image layers",
2210 0 : delta_layers.len(),
2211 0 : if image_layer.is_some() { 1 } else { 0 }
2212 : );
2213 18 : let mut compact_to = Vec::new();
2214 18 : let mut keep_layers = HashSet::new();
2215 68 : for action in delta_layers {
2216 50 : match action {
2217 24 : FlushDeltaResult::CreateResidentLayer(layer) => {
2218 24 : compact_to.push(layer);
2219 24 : }
2220 26 : FlushDeltaResult::KeepLayer(l) => {
2221 26 : keep_layers.insert(l);
2222 26 : }
2223 : }
2224 : }
2225 18 : if discard_image_layer {
2226 6 : keep_layers.insert(image_layer_key);
2227 12 : }
2228 18 : let mut layer_selection = layer_selection;
2229 72 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2230 18 : compact_to.extend(image_layer);
2231 :
2232 : // Step 3: Place back to the layer map.
2233 : {
2234 18 : let mut guard = self.layers.write().await;
2235 18 : guard
2236 18 : .open_mut()?
2237 18 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2238 18 : };
2239 18 : self.remote_client
2240 18 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2241 :
2242 18 : drop(gc_lock);
2243 18 :
2244 18 : Ok(())
2245 20 : }
2246 : }
2247 :
2248 : struct TimelineAdaptor {
2249 : timeline: Arc<Timeline>,
2250 :
2251 : keyspace: (Lsn, KeySpace),
2252 :
2253 : new_deltas: Vec<ResidentLayer>,
2254 : new_images: Vec<ResidentLayer>,
2255 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2256 : }
2257 :
2258 : impl TimelineAdaptor {
2259 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2260 0 : Self {
2261 0 : timeline: timeline.clone(),
2262 0 : keyspace,
2263 0 : new_images: Vec::new(),
2264 0 : new_deltas: Vec::new(),
2265 0 : layers_to_delete: Vec::new(),
2266 0 : }
2267 0 : }
2268 :
2269 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2270 0 : let layers_to_delete = {
2271 0 : let guard = self.timeline.layers.read().await;
2272 0 : self.layers_to_delete
2273 0 : .iter()
2274 0 : .map(|x| guard.get_from_desc(x))
2275 0 : .collect::<Vec<Layer>>()
2276 0 : };
2277 0 : self.timeline
2278 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2279 0 : .await?;
2280 :
2281 0 : self.timeline
2282 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2283 :
2284 0 : self.new_deltas.clear();
2285 0 : self.layers_to_delete.clear();
2286 0 : Ok(())
2287 0 : }
2288 : }
2289 :
2290 : #[derive(Clone)]
2291 : struct ResidentDeltaLayer(ResidentLayer);
2292 : #[derive(Clone)]
2293 : struct ResidentImageLayer(ResidentLayer);
2294 :
2295 : impl CompactionJobExecutor for TimelineAdaptor {
2296 : type Key = crate::repository::Key;
2297 :
2298 : type Layer = OwnArc<PersistentLayerDesc>;
2299 : type DeltaLayer = ResidentDeltaLayer;
2300 : type ImageLayer = ResidentImageLayer;
2301 :
2302 : type RequestContext = crate::context::RequestContext;
2303 :
2304 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2305 0 : self.timeline.get_shard_identity()
2306 0 : }
2307 :
2308 0 : async fn get_layers(
2309 0 : &mut self,
2310 0 : key_range: &Range<Key>,
2311 0 : lsn_range: &Range<Lsn>,
2312 0 : _ctx: &RequestContext,
2313 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2314 0 : self.flush_updates().await?;
2315 :
2316 0 : let guard = self.timeline.layers.read().await;
2317 0 : let layer_map = guard.layer_map()?;
2318 :
2319 0 : let result = layer_map
2320 0 : .iter_historic_layers()
2321 0 : .filter(|l| {
2322 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2323 0 : })
2324 0 : .map(OwnArc)
2325 0 : .collect();
2326 0 : Ok(result)
2327 0 : }
2328 :
2329 0 : async fn get_keyspace(
2330 0 : &mut self,
2331 0 : key_range: &Range<Key>,
2332 0 : lsn: Lsn,
2333 0 : _ctx: &RequestContext,
2334 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
2335 0 : if lsn == self.keyspace.0 {
2336 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
2337 0 : &self.keyspace.1.ranges,
2338 0 : key_range,
2339 0 : ))
2340 : } else {
2341 : // The current compaction implementation only ever requests the key space
2342 : // at the compaction end LSN.
2343 0 : anyhow::bail!("keyspace not available for requested lsn");
2344 : }
2345 0 : }
2346 :
2347 0 : async fn downcast_delta_layer(
2348 0 : &self,
2349 0 : layer: &OwnArc<PersistentLayerDesc>,
2350 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
2351 0 : // this is a lot more complex than a simple downcast...
2352 0 : if layer.is_delta() {
2353 0 : let l = {
2354 0 : let guard = self.timeline.layers.read().await;
2355 0 : guard.get_from_desc(layer)
2356 : };
2357 0 : let result = l.download_and_keep_resident().await?;
2358 :
2359 0 : Ok(Some(ResidentDeltaLayer(result)))
2360 : } else {
2361 0 : Ok(None)
2362 : }
2363 0 : }
2364 :
2365 0 : async fn create_image(
2366 0 : &mut self,
2367 0 : lsn: Lsn,
2368 0 : key_range: &Range<Key>,
2369 0 : ctx: &RequestContext,
2370 0 : ) -> anyhow::Result<()> {
2371 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
2372 0 : }
2373 :
2374 0 : async fn create_delta(
2375 0 : &mut self,
2376 0 : lsn_range: &Range<Lsn>,
2377 0 : key_range: &Range<Key>,
2378 0 : input_layers: &[ResidentDeltaLayer],
2379 0 : ctx: &RequestContext,
2380 0 : ) -> anyhow::Result<()> {
2381 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2382 :
2383 0 : let mut all_entries = Vec::new();
2384 0 : for dl in input_layers.iter() {
2385 0 : all_entries.extend(dl.load_keys(ctx).await?);
2386 : }
2387 :
2388 : // The current stdlib sorting implementation is designed in a way where it is
2389 : // particularly fast where the slice is made up of sorted sub-ranges.
2390 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
2391 :
2392 0 : let mut writer = DeltaLayerWriter::new(
2393 0 : self.timeline.conf,
2394 0 : self.timeline.timeline_id,
2395 0 : self.timeline.tenant_shard_id,
2396 0 : key_range.start,
2397 0 : lsn_range.clone(),
2398 0 : ctx,
2399 0 : )
2400 0 : .await?;
2401 :
2402 0 : let mut dup_values = 0;
2403 0 :
2404 0 : // This iterator walks through all key-value pairs from all the layers
2405 0 : // we're compacting, in key, LSN order.
2406 0 : let mut prev: Option<(Key, Lsn)> = None;
2407 : for &DeltaEntry {
2408 0 : key, lsn, ref val, ..
2409 0 : } in all_entries.iter()
2410 : {
2411 0 : if prev == Some((key, lsn)) {
2412 : // This is a duplicate. Skip it.
2413 : //
2414 : // It can happen if compaction is interrupted after writing some
2415 : // layers but not all, and we are compacting the range again.
2416 : // The calculations in the algorithm assume that there are no
2417 : // duplicates, so the math on targeted file size is likely off,
2418 : // and we will create smaller files than expected.
2419 0 : dup_values += 1;
2420 0 : continue;
2421 0 : }
2422 :
2423 0 : let value = val.load(ctx).await?;
2424 :
2425 0 : writer.put_value(key, lsn, value, ctx).await?;
2426 :
2427 0 : prev = Some((key, lsn));
2428 : }
2429 :
2430 0 : if dup_values > 0 {
2431 0 : warn!("delta layer created with {} duplicate values", dup_values);
2432 0 : }
2433 :
2434 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2435 0 : Err(anyhow::anyhow!(
2436 0 : "failpoint delta-layer-writer-fail-before-finish"
2437 0 : ))
2438 0 : });
2439 :
2440 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
2441 0 : let new_delta_layer =
2442 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
2443 :
2444 0 : self.new_deltas.push(new_delta_layer);
2445 0 : Ok(())
2446 0 : }
2447 :
2448 0 : async fn delete_layer(
2449 0 : &mut self,
2450 0 : layer: &OwnArc<PersistentLayerDesc>,
2451 0 : _ctx: &RequestContext,
2452 0 : ) -> anyhow::Result<()> {
2453 0 : self.layers_to_delete.push(layer.clone().0);
2454 0 : Ok(())
2455 0 : }
2456 : }
2457 :
2458 : impl TimelineAdaptor {
2459 0 : async fn create_image_impl(
2460 0 : &mut self,
2461 0 : lsn: Lsn,
2462 0 : key_range: &Range<Key>,
2463 0 : ctx: &RequestContext,
2464 0 : ) -> Result<(), CreateImageLayersError> {
2465 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
2466 :
2467 0 : let image_layer_writer = ImageLayerWriter::new(
2468 0 : self.timeline.conf,
2469 0 : self.timeline.timeline_id,
2470 0 : self.timeline.tenant_shard_id,
2471 0 : key_range,
2472 0 : lsn,
2473 0 : ctx,
2474 0 : )
2475 0 : .await?;
2476 :
2477 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
2478 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
2479 0 : "failpoint image-layer-writer-fail-before-finish"
2480 0 : )))
2481 0 : });
2482 :
2483 0 : let keyspace = KeySpace {
2484 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
2485 : };
2486 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
2487 0 : let start = Key::MIN;
2488 : let ImageLayerCreationOutcome {
2489 0 : image,
2490 : next_start_key: _,
2491 0 : } = self
2492 0 : .timeline
2493 0 : .create_image_layer_for_rel_blocks(
2494 0 : &keyspace,
2495 0 : image_layer_writer,
2496 0 : lsn,
2497 0 : ctx,
2498 0 : key_range.clone(),
2499 0 : start,
2500 0 : )
2501 0 : .await?;
2502 :
2503 0 : if let Some(image_layer) = image {
2504 0 : self.new_images.push(image_layer);
2505 0 : }
2506 :
2507 0 : timer.stop_and_record();
2508 0 :
2509 0 : Ok(())
2510 0 : }
2511 : }
2512 :
2513 : impl CompactionRequestContext for crate::context::RequestContext {}
2514 :
2515 : #[derive(Debug, Clone)]
2516 : pub struct OwnArc<T>(pub Arc<T>);
2517 :
2518 : impl<T> Deref for OwnArc<T> {
2519 : type Target = <Arc<T> as Deref>::Target;
2520 0 : fn deref(&self) -> &Self::Target {
2521 0 : &self.0
2522 0 : }
2523 : }
2524 :
2525 : impl<T> AsRef<T> for OwnArc<T> {
2526 0 : fn as_ref(&self) -> &T {
2527 0 : self.0.as_ref()
2528 0 : }
2529 : }
2530 :
2531 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
2532 0 : fn key_range(&self) -> &Range<Key> {
2533 0 : &self.key_range
2534 0 : }
2535 0 : fn lsn_range(&self) -> &Range<Lsn> {
2536 0 : &self.lsn_range
2537 0 : }
2538 0 : fn file_size(&self) -> u64 {
2539 0 : self.file_size
2540 0 : }
2541 0 : fn short_id(&self) -> std::string::String {
2542 0 : self.as_ref().short_id().to_string()
2543 0 : }
2544 0 : fn is_delta(&self) -> bool {
2545 0 : self.as_ref().is_delta()
2546 0 : }
2547 : }
2548 :
2549 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
2550 0 : fn key_range(&self) -> &Range<Key> {
2551 0 : &self.layer_desc().key_range
2552 0 : }
2553 0 : fn lsn_range(&self) -> &Range<Lsn> {
2554 0 : &self.layer_desc().lsn_range
2555 0 : }
2556 0 : fn file_size(&self) -> u64 {
2557 0 : self.layer_desc().file_size
2558 0 : }
2559 0 : fn short_id(&self) -> std::string::String {
2560 0 : self.layer_desc().short_id().to_string()
2561 0 : }
2562 0 : fn is_delta(&self) -> bool {
2563 0 : true
2564 0 : }
2565 : }
2566 :
2567 : use crate::tenant::timeline::DeltaEntry;
2568 :
2569 : impl CompactionLayer<Key> for ResidentDeltaLayer {
2570 0 : fn key_range(&self) -> &Range<Key> {
2571 0 : &self.0.layer_desc().key_range
2572 0 : }
2573 0 : fn lsn_range(&self) -> &Range<Lsn> {
2574 0 : &self.0.layer_desc().lsn_range
2575 0 : }
2576 0 : fn file_size(&self) -> u64 {
2577 0 : self.0.layer_desc().file_size
2578 0 : }
2579 0 : fn short_id(&self) -> std::string::String {
2580 0 : self.0.layer_desc().short_id().to_string()
2581 0 : }
2582 0 : fn is_delta(&self) -> bool {
2583 0 : true
2584 0 : }
2585 : }
2586 :
2587 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
2588 : type DeltaEntry<'a> = DeltaEntry<'a>;
2589 :
2590 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
2591 0 : self.0.load_keys(ctx).await
2592 0 : }
2593 : }
2594 :
2595 : impl CompactionLayer<Key> for ResidentImageLayer {
2596 0 : fn key_range(&self) -> &Range<Key> {
2597 0 : &self.0.layer_desc().key_range
2598 0 : }
2599 0 : fn lsn_range(&self) -> &Range<Lsn> {
2600 0 : &self.0.layer_desc().lsn_range
2601 0 : }
2602 0 : fn file_size(&self) -> u64 {
2603 0 : self.0.layer_desc().file_size
2604 0 : }
2605 0 : fn short_id(&self) -> std::string::String {
2606 0 : self.0.layer_desc().short_id().to_string()
2607 0 : }
2608 0 : fn is_delta(&self) -> bool {
2609 0 : false
2610 0 : }
2611 : }
2612 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|