Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 : use std::time::{Duration, Instant};
11 :
12 : use super::layer_manager::LayerManager;
13 : use super::{
14 : CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
15 : GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
16 : Timeline,
17 : };
18 :
19 : use crate::tenant::timeline::DeltaEntry;
20 : use crate::walredo::RedoAttemptType;
21 : use anyhow::{Context, anyhow};
22 : use bytes::Bytes;
23 : use enumset::EnumSet;
24 : use fail::fail_point;
25 : use futures::FutureExt;
26 : use itertools::Itertools;
27 : use once_cell::sync::Lazy;
28 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
29 : use pageserver_api::key::{KEY_SIZE, Key};
30 : use pageserver_api::keyspace::{KeySpace, ShardedRange};
31 : use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
32 : use pageserver_api::record::NeonWalRecord;
33 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
34 : use pageserver_api::value::Value;
35 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
36 : use pageserver_compaction::interface::*;
37 : use serde::Serialize;
38 : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
41 : use utils::critical;
42 : use utils::id::TimelineId;
43 : use utils::lsn::Lsn;
44 :
45 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
46 : use crate::page_cache;
47 : use crate::statvfs::Statvfs;
48 : use crate::tenant::checks::check_valid_layermap;
49 : use crate::tenant::gc_block::GcBlock;
50 : use crate::tenant::layer_map::LayerMap;
51 : use crate::tenant::remote_timeline_client::WaitCompletionError;
52 : use crate::tenant::remote_timeline_client::index::GcCompactionState;
53 : use crate::tenant::storage_layer::batch_split_writer::{
54 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
55 : };
56 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
57 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
58 : use crate::tenant::storage_layer::{
59 : AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
60 : ValueReconstructState,
61 : };
62 : use crate::tenant::tasks::log_compaction_error;
63 : use crate::tenant::timeline::{
64 : DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
65 : ResidentLayer, drop_rlock,
66 : };
67 : use crate::tenant::{DeltaLayer, MaybeOffloaded};
68 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
69 :
70 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
71 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
72 :
73 : /// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
74 : /// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
75 : ///
76 : /// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
77 : /// shard split, which gets expensive for large tenants.
78 : const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
79 :
80 : #[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
81 : pub struct GcCompactionJobId(pub usize);
82 :
83 : impl std::fmt::Display for GcCompactionJobId {
84 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 0 : write!(f, "{}", self.0)
86 0 : }
87 : }
88 :
89 : pub struct GcCompactionCombinedSettings {
90 : pub gc_compaction_enabled: bool,
91 : pub gc_compaction_verification: bool,
92 : pub gc_compaction_initial_threshold_kb: u64,
93 : pub gc_compaction_ratio_percent: u64,
94 : }
95 :
96 : #[derive(Debug, Clone)]
97 : pub enum GcCompactionQueueItem {
98 : MetaJob {
99 : /// Compaction options
100 : options: CompactOptions,
101 : /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
102 : auto: bool,
103 : },
104 : SubCompactionJob(CompactOptions),
105 : Notify(GcCompactionJobId, Option<Lsn>),
106 : }
107 :
108 : /// Statistics for gc-compaction meta jobs, which contains several sub compaction jobs.
109 : #[derive(Debug, Clone, Serialize, Default)]
110 : pub struct GcCompactionMetaStatistics {
111 : /// The total number of sub compaction jobs.
112 : pub total_sub_compaction_jobs: usize,
113 : /// The total number of sub compaction jobs that failed.
114 : pub failed_sub_compaction_jobs: usize,
115 : /// The total number of sub compaction jobs that succeeded.
116 : pub succeeded_sub_compaction_jobs: usize,
117 : /// The layer size before compaction.
118 : pub before_compaction_layer_size: u64,
119 : /// The layer size after compaction.
120 : pub after_compaction_layer_size: u64,
121 : /// The start time of the meta job.
122 : pub start_time: Option<chrono::DateTime<chrono::Utc>>,
123 : /// The end time of the meta job.
124 : pub end_time: Option<chrono::DateTime<chrono::Utc>>,
125 : /// The duration of the meta job.
126 : pub duration_secs: f64,
127 : /// The id of the meta job.
128 : pub meta_job_id: GcCompactionJobId,
129 : /// The LSN below which the layers are compacted, used to compute the statistics.
130 : pub below_lsn: Lsn,
131 : /// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
132 : pub retention_ratio: f64,
133 : }
134 :
135 : impl GcCompactionMetaStatistics {
136 0 : fn finalize(&mut self) {
137 0 : let end_time = chrono::Utc::now();
138 0 : if let Some(start_time) = self.start_time {
139 0 : if end_time > start_time {
140 0 : let delta = end_time - start_time;
141 0 : if let Ok(std_dur) = delta.to_std() {
142 0 : self.duration_secs = std_dur.as_secs_f64();
143 0 : }
144 0 : }
145 0 : }
146 0 : self.retention_ratio = self.after_compaction_layer_size as f64
147 0 : / (self.before_compaction_layer_size as f64 + 1.0);
148 0 : self.end_time = Some(end_time);
149 0 : }
150 : }
151 :
152 : impl GcCompactionQueueItem {
153 0 : pub fn into_compact_info_resp(
154 0 : self,
155 0 : id: GcCompactionJobId,
156 0 : running: bool,
157 0 : ) -> Option<CompactInfoResponse> {
158 0 : match self {
159 0 : GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
160 0 : compact_key_range: options.compact_key_range,
161 0 : compact_lsn_range: options.compact_lsn_range,
162 0 : sub_compaction: options.sub_compaction,
163 0 : running,
164 0 : job_id: id.0,
165 0 : }),
166 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
167 0 : compact_key_range: options.compact_key_range,
168 0 : compact_lsn_range: options.compact_lsn_range,
169 0 : sub_compaction: options.sub_compaction,
170 0 : running,
171 0 : job_id: id.0,
172 0 : }),
173 0 : GcCompactionQueueItem::Notify(_, _) => None,
174 : }
175 0 : }
176 : }
177 :
178 : #[derive(Default)]
179 : struct GcCompactionGuardItems {
180 : notify: Option<tokio::sync::oneshot::Sender<()>>,
181 : permit: Option<OwnedSemaphorePermit>,
182 : }
183 :
184 : struct GcCompactionQueueInner {
185 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
186 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
187 : guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
188 : last_id: GcCompactionJobId,
189 : meta_statistics: Option<GcCompactionMetaStatistics>,
190 : }
191 :
192 : impl GcCompactionQueueInner {
193 0 : fn next_id(&mut self) -> GcCompactionJobId {
194 0 : let id = self.last_id;
195 0 : self.last_id = GcCompactionJobId(id.0 + 1);
196 0 : id
197 0 : }
198 : }
199 :
200 : /// A structure to store gc_compaction jobs.
201 : pub struct GcCompactionQueue {
202 : /// All items in the queue, and the currently-running job.
203 : inner: std::sync::Mutex<GcCompactionQueueInner>,
204 : /// Ensure only one thread is consuming the queue.
205 : consumer_lock: tokio::sync::Mutex<()>,
206 : }
207 :
208 0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
209 0 : // Only allow two timelines on one pageserver to run gc compaction at a time.
210 0 : Arc::new(Semaphore::new(2))
211 0 : });
212 :
213 : impl GcCompactionQueue {
214 0 : pub fn new() -> Self {
215 0 : GcCompactionQueue {
216 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
217 0 : running: None,
218 0 : queued: VecDeque::new(),
219 0 : guards: HashMap::new(),
220 0 : last_id: GcCompactionJobId(0),
221 0 : meta_statistics: None,
222 0 : }),
223 0 : consumer_lock: tokio::sync::Mutex::new(()),
224 0 : }
225 0 : }
226 :
227 0 : pub fn cancel_scheduled(&self) {
228 0 : let mut guard = self.inner.lock().unwrap();
229 0 : guard.queued.clear();
230 0 : // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
231 0 : // API is only used for testing purposes, so we can drop everything here.
232 0 : guard.guards.clear();
233 0 : }
234 :
235 : /// Schedule a manual compaction job.
236 0 : pub fn schedule_manual_compaction(
237 0 : &self,
238 0 : options: CompactOptions,
239 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
240 0 : ) -> GcCompactionJobId {
241 0 : let mut guard = self.inner.lock().unwrap();
242 0 : let id = guard.next_id();
243 0 : guard.queued.push_back((
244 0 : id,
245 0 : GcCompactionQueueItem::MetaJob {
246 0 : options,
247 0 : auto: false,
248 0 : },
249 0 : ));
250 0 : guard.guards.entry(id).or_default().notify = notify;
251 0 : info!("scheduled compaction job id={}", id);
252 0 : id
253 0 : }
254 :
255 : /// Schedule an auto compaction job.
256 0 : fn schedule_auto_compaction(
257 0 : &self,
258 0 : options: CompactOptions,
259 0 : permit: OwnedSemaphorePermit,
260 0 : ) -> GcCompactionJobId {
261 0 : let mut guard = self.inner.lock().unwrap();
262 0 : let id = guard.next_id();
263 0 : guard.queued.push_back((
264 0 : id,
265 0 : GcCompactionQueueItem::MetaJob {
266 0 : options,
267 0 : auto: true,
268 0 : },
269 0 : ));
270 0 : guard.guards.entry(id).or_default().permit = Some(permit);
271 0 : id
272 0 : }
273 :
274 : /// Trigger an auto compaction.
275 0 : pub async fn trigger_auto_compaction(
276 0 : &self,
277 0 : timeline: &Arc<Timeline>,
278 0 : ) -> Result<(), CompactionError> {
279 0 : let GcCompactionCombinedSettings {
280 0 : gc_compaction_enabled,
281 0 : gc_compaction_initial_threshold_kb,
282 0 : gc_compaction_ratio_percent,
283 0 : ..
284 0 : } = timeline.get_gc_compaction_settings();
285 0 : if !gc_compaction_enabled {
286 0 : return Ok(());
287 0 : }
288 0 : if self.remaining_jobs_num() > 0 {
289 : // Only schedule auto compaction when the queue is empty
290 0 : return Ok(());
291 0 : }
292 0 : if timeline.ancestor_timeline().is_some() {
293 : // Do not trigger auto compaction for child timelines. We haven't tested
294 : // it enough in staging yet.
295 0 : return Ok(());
296 0 : }
297 0 : if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
298 : // If the gc watermark is not set, we don't need to trigger auto compaction.
299 : // This check is the same as in `gc_compaction_split_jobs` but we don't log
300 : // here and we can also skip the computation of the trigger condition earlier.
301 0 : return Ok(());
302 0 : }
303 :
304 0 : let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
305 : // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
306 : // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
307 : // to ensure the fairness while avoid starving other tasks.
308 0 : return Ok(());
309 : };
310 :
311 0 : let gc_compaction_state = timeline.get_gc_compaction_state();
312 0 : let l2_lsn = gc_compaction_state
313 0 : .map(|x| x.last_completed_lsn)
314 0 : .unwrap_or(Lsn::INVALID);
315 :
316 0 : let layers = {
317 0 : let guard = timeline.layers.read().await;
318 0 : let layer_map = guard.layer_map()?;
319 0 : layer_map.iter_historic_layers().collect_vec()
320 0 : };
321 0 : let mut l2_size: u64 = 0;
322 0 : let mut l1_size = 0;
323 0 : let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
324 0 : for layer in layers {
325 0 : if layer.lsn_range.start <= l2_lsn {
326 0 : l2_size += layer.file_size();
327 0 : } else if layer.lsn_range.start <= gc_cutoff {
328 0 : l1_size += layer.file_size();
329 0 : }
330 : }
331 :
332 0 : fn trigger_compaction(
333 0 : l1_size: u64,
334 0 : l2_size: u64,
335 0 : gc_compaction_initial_threshold_kb: u64,
336 0 : gc_compaction_ratio_percent: u64,
337 0 : ) -> bool {
338 : const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
339 0 : if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
340 : // Do not auto-trigger when physical size >= 150GB
341 0 : return false;
342 0 : }
343 0 : // initial trigger
344 0 : if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
345 0 : info!(
346 0 : "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
347 : l1_size, gc_compaction_initial_threshold_kb
348 : );
349 0 : return true;
350 0 : }
351 0 : // size ratio trigger
352 0 : if l2_size == 0 {
353 0 : return false;
354 0 : }
355 0 : if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
356 0 : info!(
357 0 : "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
358 : l1_size, l2_size, gc_compaction_ratio_percent
359 : );
360 0 : return true;
361 0 : }
362 0 : false
363 0 : }
364 :
365 0 : if trigger_compaction(
366 0 : l1_size,
367 0 : l2_size,
368 0 : gc_compaction_initial_threshold_kb,
369 0 : gc_compaction_ratio_percent,
370 0 : ) {
371 0 : self.schedule_auto_compaction(
372 0 : CompactOptions {
373 0 : flags: {
374 0 : let mut flags = EnumSet::new();
375 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
376 0 : if timeline.get_compaction_l0_first() {
377 0 : flags |= CompactFlags::YieldForL0;
378 0 : }
379 0 : flags
380 0 : },
381 0 : sub_compaction: true,
382 0 : // Only auto-trigger gc-compaction over the data keyspace due to concerns in
383 0 : // https://github.com/neondatabase/neon/issues/11318.
384 0 : compact_key_range: Some(CompactKeyRange {
385 0 : start: Key::MIN,
386 0 : end: Key::metadata_key_range().start,
387 0 : }),
388 0 : compact_lsn_range: None,
389 0 : sub_compaction_max_job_size_mb: None,
390 0 : },
391 0 : permit,
392 0 : );
393 0 : info!(
394 0 : "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
395 : l1_size, l2_size, l2_lsn, gc_cutoff
396 : );
397 : } else {
398 0 : debug!(
399 0 : "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
400 : l1_size, l2_size, l2_lsn, gc_cutoff
401 : );
402 : }
403 0 : Ok(())
404 0 : }
405 :
406 0 : async fn collect_layer_below_lsn(
407 0 : &self,
408 0 : timeline: &Arc<Timeline>,
409 0 : lsn: Lsn,
410 0 : ) -> Result<u64, CompactionError> {
411 0 : let guard = timeline.layers.read().await;
412 0 : let layer_map = guard.layer_map()?;
413 0 : let layers = layer_map.iter_historic_layers().collect_vec();
414 0 : let mut size = 0;
415 0 : for layer in layers {
416 0 : if layer.lsn_range.start <= lsn {
417 0 : size += layer.file_size();
418 0 : }
419 : }
420 0 : Ok(size)
421 0 : }
422 :
423 : /// Notify the caller the job has finished and unblock GC.
424 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
425 0 : info!("compaction job id={} finished", id);
426 0 : let mut guard = self.inner.lock().unwrap();
427 0 : if let Some(items) = guard.guards.remove(&id) {
428 0 : if let Some(tx) = items.notify {
429 0 : let _ = tx.send(());
430 0 : }
431 0 : }
432 0 : if let Some(ref meta_statistics) = guard.meta_statistics {
433 0 : if meta_statistics.meta_job_id == id {
434 0 : if let Ok(stats) = serde_json::to_string(&meta_statistics) {
435 0 : info!(
436 0 : "gc-compaction meta statistics for job id = {}: {}",
437 : id, stats
438 : );
439 0 : }
440 0 : }
441 0 : }
442 0 : }
443 :
444 0 : fn clear_running_job(&self) {
445 0 : let mut guard = self.inner.lock().unwrap();
446 0 : guard.running = None;
447 0 : }
448 :
449 0 : async fn handle_sub_compaction(
450 0 : &self,
451 0 : id: GcCompactionJobId,
452 0 : options: CompactOptions,
453 0 : timeline: &Arc<Timeline>,
454 0 : auto: bool,
455 0 : ) -> Result<(), CompactionError> {
456 0 : info!(
457 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
458 : );
459 0 : let res = timeline
460 0 : .gc_compaction_split_jobs(
461 0 : GcCompactJob::from_compact_options(options.clone()),
462 0 : options.sub_compaction_max_job_size_mb,
463 0 : )
464 0 : .await;
465 0 : let jobs = match res {
466 0 : Ok(jobs) => jobs,
467 0 : Err(err) => {
468 0 : warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
469 0 : self.notify_and_unblock(id);
470 0 : return Err(err);
471 : }
472 : };
473 0 : if jobs.is_empty() {
474 0 : info!("no jobs to run, skipping scheduled compaction task");
475 0 : self.notify_and_unblock(id);
476 : } else {
477 0 : let jobs_len = jobs.len();
478 0 : let mut pending_tasks = Vec::new();
479 0 : // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
480 0 : // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
481 0 : let expected_l2_lsn = jobs
482 0 : .iter()
483 0 : .map(|job| job.compact_lsn_range.end)
484 0 : .max()
485 0 : .unwrap();
486 0 : for job in jobs {
487 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
488 : // until we do further refactors to allow directly call `compact_with_gc`.
489 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
490 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
491 0 : if job.dry_run {
492 0 : flags |= CompactFlags::DryRun;
493 0 : }
494 0 : if options.flags.contains(CompactFlags::YieldForL0) {
495 0 : flags |= CompactFlags::YieldForL0;
496 0 : }
497 0 : let options = CompactOptions {
498 0 : flags,
499 0 : sub_compaction: false,
500 0 : compact_key_range: Some(job.compact_key_range.into()),
501 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
502 0 : sub_compaction_max_job_size_mb: None,
503 0 : };
504 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
505 : }
506 :
507 0 : if !auto {
508 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
509 0 : } else {
510 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, Some(expected_l2_lsn)));
511 0 : }
512 :
513 0 : let layer_size = self
514 0 : .collect_layer_below_lsn(timeline, expected_l2_lsn)
515 0 : .await?;
516 :
517 : {
518 0 : let mut guard = self.inner.lock().unwrap();
519 0 : let mut tasks = Vec::new();
520 0 : for task in pending_tasks {
521 0 : let id = guard.next_id();
522 0 : tasks.push((id, task));
523 0 : }
524 0 : tasks.reverse();
525 0 : for item in tasks {
526 0 : guard.queued.push_front(item);
527 0 : }
528 0 : guard.meta_statistics = Some(GcCompactionMetaStatistics {
529 0 : meta_job_id: id,
530 0 : start_time: Some(chrono::Utc::now()),
531 0 : before_compaction_layer_size: layer_size,
532 0 : below_lsn: expected_l2_lsn,
533 0 : total_sub_compaction_jobs: jobs_len,
534 0 : ..Default::default()
535 0 : });
536 0 : }
537 0 :
538 0 : info!(
539 0 : "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
540 : jobs_len
541 : );
542 : }
543 0 : Ok(())
544 0 : }
545 :
546 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
547 0 : pub async fn iteration(
548 0 : &self,
549 0 : cancel: &CancellationToken,
550 0 : ctx: &RequestContext,
551 0 : gc_block: &GcBlock,
552 0 : timeline: &Arc<Timeline>,
553 0 : ) -> Result<CompactionOutcome, CompactionError> {
554 0 : let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
555 0 : if let Err(err) = &res {
556 0 : log_compaction_error(err, None, cancel.is_cancelled(), true);
557 0 : }
558 0 : match res {
559 0 : Ok(res) => Ok(res),
560 0 : Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
561 : Err(_) => {
562 : // There are some cases where traditional gc might collect some layer
563 : // files causing gc-compaction cannot read the full history of the key.
564 : // This needs to be resolved in the long-term by improving the compaction
565 : // process. For now, let's simply avoid such errors triggering the
566 : // circuit breaker.
567 0 : Ok(CompactionOutcome::Skipped)
568 : }
569 : }
570 0 : }
571 :
572 0 : async fn iteration_inner(
573 0 : &self,
574 0 : cancel: &CancellationToken,
575 0 : ctx: &RequestContext,
576 0 : gc_block: &GcBlock,
577 0 : timeline: &Arc<Timeline>,
578 0 : ) -> Result<CompactionOutcome, CompactionError> {
579 0 : let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
580 0 : return Err(CompactionError::AlreadyRunning(
581 0 : "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
582 0 : ));
583 : };
584 : let has_pending_tasks;
585 0 : let mut yield_for_l0 = false;
586 0 : let Some((id, item)) = ({
587 0 : let mut guard = self.inner.lock().unwrap();
588 0 : if let Some((id, item)) = guard.queued.pop_front() {
589 0 : guard.running = Some((id, item.clone()));
590 0 : has_pending_tasks = !guard.queued.is_empty();
591 0 : Some((id, item))
592 : } else {
593 0 : has_pending_tasks = false;
594 0 : None
595 : }
596 : }) else {
597 0 : self.trigger_auto_compaction(timeline).await?;
598 : // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
599 : // have not implemented preemption mechanism yet. We always want to yield it to more important
600 : // tasks if there is one.
601 0 : return Ok(CompactionOutcome::Done);
602 : };
603 0 : match item {
604 0 : GcCompactionQueueItem::MetaJob { options, auto } => {
605 0 : if !options
606 0 : .flags
607 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
608 : {
609 0 : warn!(
610 0 : "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
611 : options
612 : );
613 0 : } else if options.sub_compaction {
614 0 : info!(
615 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
616 : );
617 0 : self.handle_sub_compaction(id, options, timeline, auto)
618 0 : .await?;
619 : } else {
620 : // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
621 : // in this branch.
622 0 : let _gc_guard = match gc_block.start().await {
623 0 : Ok(guard) => guard,
624 0 : Err(e) => {
625 0 : self.notify_and_unblock(id);
626 0 : self.clear_running_job();
627 0 : return Err(CompactionError::Other(anyhow!(
628 0 : "cannot run gc-compaction because gc is blocked: {}",
629 0 : e
630 0 : )));
631 : }
632 : };
633 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
634 0 : let compaction_result = match res {
635 0 : Ok(res) => res,
636 0 : Err(err) => {
637 0 : warn!(%err, "failed to run gc-compaction");
638 0 : self.notify_and_unblock(id);
639 0 : self.clear_running_job();
640 0 : return Err(err);
641 : }
642 : };
643 0 : if compaction_result == CompactionOutcome::YieldForL0 {
644 0 : yield_for_l0 = true;
645 0 : }
646 : }
647 : }
648 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
649 : // TODO: error handling, clear the queue if any task fails?
650 0 : let _gc_guard = match gc_block.start().await {
651 0 : Ok(guard) => guard,
652 0 : Err(e) => {
653 0 : self.clear_running_job();
654 0 : return Err(CompactionError::Other(anyhow!(
655 0 : "cannot run gc-compaction because gc is blocked: {}",
656 0 : e
657 0 : )));
658 : }
659 : };
660 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
661 0 : let compaction_result = match res {
662 0 : Ok(res) => res,
663 0 : Err(err) => {
664 0 : warn!(%err, "failed to run gc-compaction subcompaction job");
665 0 : self.clear_running_job();
666 0 : let mut guard = self.inner.lock().unwrap();
667 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
668 0 : meta_statistics.failed_sub_compaction_jobs += 1;
669 0 : }
670 0 : return Err(err);
671 : }
672 : };
673 0 : if compaction_result == CompactionOutcome::YieldForL0 {
674 0 : // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
675 0 : // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
676 0 : // we need to clean things up before returning from the function.
677 0 : yield_for_l0 = true;
678 0 : }
679 : {
680 0 : let mut guard = self.inner.lock().unwrap();
681 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
682 0 : meta_statistics.succeeded_sub_compaction_jobs += 1;
683 0 : }
684 : }
685 : }
686 0 : GcCompactionQueueItem::Notify(id, l2_lsn) => {
687 0 : let below_lsn = {
688 0 : let mut guard = self.inner.lock().unwrap();
689 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
690 0 : meta_statistics.below_lsn
691 : } else {
692 0 : Lsn::INVALID
693 : }
694 : };
695 0 : let layer_size = if below_lsn != Lsn::INVALID {
696 0 : self.collect_layer_below_lsn(timeline, below_lsn).await?
697 : } else {
698 0 : 0
699 : };
700 : {
701 0 : let mut guard = self.inner.lock().unwrap();
702 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
703 0 : meta_statistics.after_compaction_layer_size = layer_size;
704 0 : meta_statistics.finalize();
705 0 : }
706 : }
707 0 : self.notify_and_unblock(id);
708 0 : if let Some(l2_lsn) = l2_lsn {
709 0 : let current_l2_lsn = timeline
710 0 : .get_gc_compaction_state()
711 0 : .map(|x| x.last_completed_lsn)
712 0 : .unwrap_or(Lsn::INVALID);
713 0 : if l2_lsn >= current_l2_lsn {
714 0 : info!("l2_lsn updated to {}", l2_lsn);
715 0 : timeline
716 0 : .update_gc_compaction_state(GcCompactionState {
717 0 : last_completed_lsn: l2_lsn,
718 0 : })
719 0 : .map_err(CompactionError::Other)?;
720 : } else {
721 0 : warn!(
722 0 : "l2_lsn updated to {} but it is less than the current l2_lsn {}",
723 : l2_lsn, current_l2_lsn
724 : );
725 : }
726 0 : }
727 : }
728 : }
729 0 : self.clear_running_job();
730 0 : Ok(if yield_for_l0 {
731 0 : tracing::info!("give up gc-compaction: yield for L0 compaction");
732 0 : CompactionOutcome::YieldForL0
733 0 : } else if has_pending_tasks {
734 0 : CompactionOutcome::Pending
735 : } else {
736 0 : CompactionOutcome::Done
737 : })
738 0 : }
739 :
740 : #[allow(clippy::type_complexity)]
741 0 : pub fn remaining_jobs(
742 0 : &self,
743 0 : ) -> (
744 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
745 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
746 0 : ) {
747 0 : let guard = self.inner.lock().unwrap();
748 0 : (guard.running.clone(), guard.queued.clone())
749 0 : }
750 :
751 0 : pub fn remaining_jobs_num(&self) -> usize {
752 0 : let guard = self.inner.lock().unwrap();
753 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
754 0 : }
755 : }
756 :
757 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
758 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
759 : /// called.
760 : #[derive(Debug, Clone)]
761 : pub(crate) struct GcCompactJob {
762 : pub dry_run: bool,
763 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
764 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
765 : pub compact_key_range: Range<Key>,
766 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
767 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
768 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
769 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
770 : pub compact_lsn_range: Range<Lsn>,
771 : }
772 :
773 : impl GcCompactJob {
774 336 : pub fn from_compact_options(options: CompactOptions) -> Self {
775 336 : GcCompactJob {
776 336 : dry_run: options.flags.contains(CompactFlags::DryRun),
777 336 : compact_key_range: options
778 336 : .compact_key_range
779 336 : .map(|x| x.into())
780 336 : .unwrap_or(Key::MIN..Key::MAX),
781 336 : compact_lsn_range: options
782 336 : .compact_lsn_range
783 336 : .map(|x| x.into())
784 336 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
785 336 : }
786 336 : }
787 : }
788 :
789 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
790 : /// and contains the exact layers we want to compact.
791 : pub struct GcCompactionJobDescription {
792 : /// All layers to read in the compaction job
793 : selected_layers: Vec<Layer>,
794 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
795 : /// keep all deltas <= this LSN or generate an image == this LSN.
796 : gc_cutoff: Lsn,
797 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
798 : /// generate an image == this LSN.
799 : retain_lsns_below_horizon: Vec<Lsn>,
800 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
801 : /// \>= this LSN will be kept and will not be rewritten.
802 : max_layer_lsn: Lsn,
803 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
804 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
805 : /// k-merge within gc-compaction.
806 : min_layer_lsn: Lsn,
807 : /// Only compact layers overlapping with this range.
808 : compaction_key_range: Range<Key>,
809 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
810 : /// This field is here solely for debugging. The field will not be read once the compaction
811 : /// description is generated.
812 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
813 : }
814 :
815 : /// The result of bottom-most compaction for a single key at each LSN.
816 : #[derive(Debug)]
817 : #[cfg_attr(test, derive(PartialEq))]
818 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
819 :
820 : /// The result of bottom-most compaction.
821 : #[derive(Debug)]
822 : #[cfg_attr(test, derive(PartialEq))]
823 : pub(crate) struct KeyHistoryRetention {
824 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
825 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
826 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
827 : pub(crate) above_horizon: KeyLogAtLsn,
828 : }
829 :
830 : impl KeyHistoryRetention {
831 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
832 : ///
833 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
834 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
835 : /// And we have branches at LSN 0x10, 0x20, 0x30.
836 : /// Then we delete branch @ 0x20.
837 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
838 : /// And that wouldnt' change the shape of the layer.
839 : ///
840 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
841 : ///
842 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
843 444 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
844 444 : if dry_run {
845 0 : return true;
846 444 : }
847 444 : if LayerMap::is_l0(&key.key_range, key.is_delta) {
848 : // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
849 : // We should ignore such layers.
850 0 : return true;
851 444 : }
852 : let layer_generation;
853 : {
854 444 : let guard = tline.layers.read().await;
855 444 : if !guard.contains_key(key) {
856 312 : return false;
857 132 : }
858 132 : layer_generation = guard.get_from_key(key).metadata().generation;
859 132 : }
860 132 : if layer_generation == tline.generation {
861 132 : info!(
862 : key=%key,
863 : ?layer_generation,
864 0 : "discard layer due to duplicated layer key in the same generation",
865 : );
866 132 : true
867 : } else {
868 0 : false
869 : }
870 444 : }
871 :
872 : /// Pipe a history of a single key to the writers.
873 : ///
874 : /// If `image_writer` is none, the images will be placed into the delta layers.
875 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
876 : #[allow(clippy::too_many_arguments)]
877 3828 : async fn pipe_to(
878 3828 : self,
879 3828 : key: Key,
880 3828 : delta_writer: &mut SplitDeltaLayerWriter<'_>,
881 3828 : mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
882 3828 : stat: &mut CompactionStatistics,
883 3828 : ctx: &RequestContext,
884 3828 : ) -> anyhow::Result<()> {
885 3828 : let mut first_batch = true;
886 12264 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
887 8436 : if first_batch {
888 3828 : if logs.len() == 1 && logs[0].1.is_image() {
889 3600 : let Value::Image(img) = &logs[0].1 else {
890 0 : unreachable!()
891 : };
892 3600 : stat.produce_image_key(img);
893 3600 : if let Some(image_writer) = image_writer.as_mut() {
894 3600 : image_writer.put_image(key, img.clone(), ctx).await?;
895 : } else {
896 0 : delta_writer
897 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
898 0 : .await?;
899 : }
900 : } else {
901 396 : for (lsn, val) in logs {
902 168 : stat.produce_key(&val);
903 168 : delta_writer.put_value(key, lsn, val, ctx).await?;
904 : }
905 : }
906 3828 : first_batch = false;
907 : } else {
908 5304 : for (lsn, val) in logs {
909 696 : stat.produce_key(&val);
910 696 : delta_writer.put_value(key, lsn, val, ctx).await?;
911 : }
912 : }
913 : }
914 3828 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
915 4176 : for (lsn, val) in above_horizon_logs {
916 348 : stat.produce_key(&val);
917 348 : delta_writer.put_value(key, lsn, val, ctx).await?;
918 : }
919 3828 : Ok(())
920 3828 : }
921 :
922 : /// Verify if every key in the retention is readable by replaying the logs.
923 3876 : async fn verify(
924 3876 : &self,
925 3876 : key: Key,
926 3876 : base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
927 3876 : full_history: &[(Key, Lsn, Value)],
928 3876 : tline: &Arc<Timeline>,
929 3876 : ) -> anyhow::Result<()> {
930 : // Usually the min_lsn should be the first record but we do a full iteration to be safe.
931 5496 : let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
932 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
933 0 : return Ok(());
934 : };
935 5496 : let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
936 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
937 0 : return Ok(());
938 : };
939 3876 : let mut base_img = base_img_from_ancestor
940 3876 : .as_ref()
941 3876 : .map(|(_, lsn, img)| (*lsn, img));
942 3876 : let mut history = Vec::new();
943 :
944 12324 : async fn collect_and_verify(
945 12324 : key: Key,
946 12324 : lsn: Lsn,
947 12324 : base_img: &Option<(Lsn, &Bytes)>,
948 12324 : history: &[(Lsn, &NeonWalRecord)],
949 12324 : tline: &Arc<Timeline>,
950 12324 : skip_empty: bool,
951 12324 : ) -> anyhow::Result<()> {
952 12324 : if base_img.is_none() && history.is_empty() {
953 0 : if skip_empty {
954 0 : return Ok(());
955 0 : }
956 0 : anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
957 12324 : };
958 12324 :
959 12324 : let mut records = history
960 12324 : .iter()
961 12324 : .map(|(lsn, val)| (*lsn, (*val).clone()))
962 12324 : .collect::<Vec<_>>();
963 12324 :
964 12324 : // WAL redo requires records in the reverse LSN order
965 12324 : records.reverse();
966 12324 : let data = ValueReconstructState {
967 12324 : img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
968 12324 : records,
969 12324 : };
970 12324 :
971 12324 : tline
972 12324 : .reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
973 12324 : .await
974 12324 : .with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
975 :
976 12324 : Ok(())
977 12324 : }
978 :
979 12432 : for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
980 13152 : for (lsn, val) in logs {
981 912 : match val {
982 3684 : Value::Image(img) => {
983 3684 : base_img = Some((*lsn, img));
984 3684 : history.clear();
985 3684 : }
986 912 : Value::WalRecord(rec) if val.will_init() => {
987 0 : base_img = None;
988 0 : history.clear();
989 0 : history.push((*lsn, rec));
990 0 : }
991 912 : Value::WalRecord(rec) => {
992 912 : history.push((*lsn, rec));
993 912 : }
994 : }
995 : }
996 8556 : if *retain_lsn >= min_lsn {
997 : // Only verify after the key appears in the full history for the first time.
998 :
999 : // We don't modify history: in theory, we could replace the history with a single
1000 : // image as in `generate_key_retention` to make redos at later LSNs faster. But we
1001 : // want to verify everything as if they are read from the real layer map.
1002 8388 : collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
1003 8388 : .await
1004 8388 : .context("below horizon retain_lsn")?;
1005 168 : }
1006 : }
1007 :
1008 4320 : for (lsn, val) in &self.above_horizon.0 {
1009 384 : match val {
1010 60 : Value::Image(img) => {
1011 60 : // Above the GC horizon, we verify every time we see an image.
1012 60 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
1013 60 : .await
1014 60 : .context("above horizon full image")?;
1015 60 : base_img = Some((*lsn, img));
1016 60 : history.clear();
1017 : }
1018 384 : Value::WalRecord(rec) if val.will_init() => {
1019 0 : // Above the GC horizon, we verify every time we see an init record.
1020 0 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
1021 0 : .await
1022 0 : .context("above horizon init record")?;
1023 0 : base_img = None;
1024 0 : history.clear();
1025 0 : history.push((*lsn, rec));
1026 : }
1027 384 : Value::WalRecord(rec) => {
1028 384 : history.push((*lsn, rec));
1029 384 : }
1030 : }
1031 : }
1032 : // Ensure the latest record is readable.
1033 3876 : collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
1034 3876 : .await
1035 3876 : .context("latest record")?;
1036 3876 : Ok(())
1037 3876 : }
1038 : }
1039 :
1040 : #[derive(Debug, Serialize, Default)]
1041 : struct CompactionStatisticsNumSize {
1042 : num: u64,
1043 : size: u64,
1044 : }
1045 :
1046 : #[derive(Debug, Serialize, Default)]
1047 : pub struct CompactionStatistics {
1048 : /// Delta layer visited (maybe compressed, physical size)
1049 : delta_layer_visited: CompactionStatisticsNumSize,
1050 : /// Image layer visited (maybe compressed, physical size)
1051 : image_layer_visited: CompactionStatisticsNumSize,
1052 : /// Delta layer produced (maybe compressed, physical size)
1053 : delta_layer_produced: CompactionStatisticsNumSize,
1054 : /// Image layer produced (maybe compressed, physical size)
1055 : image_layer_produced: CompactionStatisticsNumSize,
1056 : /// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
1057 : delta_layer_discarded: CompactionStatisticsNumSize,
1058 : /// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
1059 : image_layer_discarded: CompactionStatisticsNumSize,
1060 : num_unique_keys_visited: usize,
1061 : /// Delta visited (uncompressed, original size)
1062 : wal_keys_visited: CompactionStatisticsNumSize,
1063 : /// Image visited (uncompressed, original size)
1064 : image_keys_visited: CompactionStatisticsNumSize,
1065 : /// Delta produced (uncompressed, original size)
1066 : wal_produced: CompactionStatisticsNumSize,
1067 : /// Image produced (uncompressed, original size)
1068 : image_produced: CompactionStatisticsNumSize,
1069 :
1070 : // Time spent in each phase
1071 : time_acquire_lock_secs: f64,
1072 : time_analyze_secs: f64,
1073 : time_download_layer_secs: f64,
1074 : time_to_first_kv_pair_secs: f64,
1075 : time_main_loop_secs: f64,
1076 : time_final_phase_secs: f64,
1077 : time_total_secs: f64,
1078 :
1079 : // Summary
1080 : /// Ratio of the key-value size after/before gc-compaction.
1081 : uncompressed_retention_ratio: f64,
1082 : /// Ratio of the physical size after/before gc-compaction.
1083 : compressed_retention_ratio: f64,
1084 : }
1085 :
1086 : impl CompactionStatistics {
1087 6408 : fn estimated_size_of_value(val: &Value) -> usize {
1088 2628 : match val {
1089 3780 : Value::Image(img) => img.len(),
1090 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
1091 2628 : _ => std::mem::size_of::<NeonWalRecord>(),
1092 : }
1093 6408 : }
1094 10068 : fn estimated_size_of_key() -> usize {
1095 10068 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
1096 10068 : }
1097 528 : fn visit_delta_layer(&mut self, size: u64) {
1098 528 : self.delta_layer_visited.num += 1;
1099 528 : self.delta_layer_visited.size += size;
1100 528 : }
1101 420 : fn visit_image_layer(&mut self, size: u64) {
1102 420 : self.image_layer_visited.num += 1;
1103 420 : self.image_layer_visited.size += size;
1104 420 : }
1105 3840 : fn on_unique_key_visited(&mut self) {
1106 3840 : self.num_unique_keys_visited += 1;
1107 3840 : }
1108 1476 : fn visit_wal_key(&mut self, val: &Value) {
1109 1476 : self.wal_keys_visited.num += 1;
1110 1476 : self.wal_keys_visited.size +=
1111 1476 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1112 1476 : }
1113 3780 : fn visit_image_key(&mut self, val: &Value) {
1114 3780 : self.image_keys_visited.num += 1;
1115 3780 : self.image_keys_visited.size +=
1116 3780 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1117 3780 : }
1118 1212 : fn produce_key(&mut self, val: &Value) {
1119 1212 : match val {
1120 60 : Value::Image(img) => self.produce_image_key(img),
1121 1152 : Value::WalRecord(_) => self.produce_wal_key(val),
1122 : }
1123 1212 : }
1124 1152 : fn produce_wal_key(&mut self, val: &Value) {
1125 1152 : self.wal_produced.num += 1;
1126 1152 : self.wal_produced.size +=
1127 1152 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1128 1152 : }
1129 3660 : fn produce_image_key(&mut self, val: &Bytes) {
1130 3660 : self.image_produced.num += 1;
1131 3660 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
1132 3660 : }
1133 84 : fn discard_delta_layer(&mut self, original_size: u64) {
1134 84 : self.delta_layer_discarded.num += 1;
1135 84 : self.delta_layer_discarded.size += original_size;
1136 84 : }
1137 48 : fn discard_image_layer(&mut self, original_size: u64) {
1138 48 : self.image_layer_discarded.num += 1;
1139 48 : self.image_layer_discarded.size += original_size;
1140 48 : }
1141 144 : fn produce_delta_layer(&mut self, size: u64) {
1142 144 : self.delta_layer_produced.num += 1;
1143 144 : self.delta_layer_produced.size += size;
1144 144 : }
1145 180 : fn produce_image_layer(&mut self, size: u64) {
1146 180 : self.image_layer_produced.num += 1;
1147 180 : self.image_layer_produced.size += size;
1148 180 : }
1149 312 : fn finalize(&mut self) {
1150 312 : let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
1151 312 : let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
1152 312 : self.uncompressed_retention_ratio =
1153 312 : produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
1154 312 : let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
1155 312 : let produced_physical_size = self.image_layer_produced.size
1156 312 : + self.delta_layer_produced.size
1157 312 : + self.image_layer_discarded.size
1158 312 : + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
1159 312 : self.compressed_retention_ratio =
1160 312 : produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
1161 312 : }
1162 : }
1163 :
1164 : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
1165 : pub enum CompactionOutcome {
1166 : #[default]
1167 : /// No layers need to be compacted after this round. Compaction doesn't need
1168 : /// to be immediately scheduled.
1169 : Done,
1170 : /// Still has pending layers to be compacted after this round. Ideally, the scheduler
1171 : /// should immediately schedule another compaction.
1172 : Pending,
1173 : /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
1174 : /// guaranteed when `compaction_l0_first` is enabled).
1175 : YieldForL0,
1176 : /// Compaction was skipped, because the timeline is ineligible for compaction.
1177 : Skipped,
1178 : }
1179 :
1180 : impl Timeline {
1181 : /// TODO: cancellation
1182 : ///
1183 : /// Returns whether the compaction has pending tasks.
1184 2184 : pub(crate) async fn compact_legacy(
1185 2184 : self: &Arc<Self>,
1186 2184 : cancel: &CancellationToken,
1187 2184 : options: CompactOptions,
1188 2184 : ctx: &RequestContext,
1189 2184 : ) -> Result<CompactionOutcome, CompactionError> {
1190 2184 : if options
1191 2184 : .flags
1192 2184 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
1193 : {
1194 0 : self.compact_with_gc(cancel, options, ctx).await?;
1195 0 : return Ok(CompactionOutcome::Done);
1196 2184 : }
1197 2184 :
1198 2184 : if options.flags.contains(CompactFlags::DryRun) {
1199 0 : return Err(CompactionError::Other(anyhow!(
1200 0 : "dry-run mode is not supported for legacy compaction for now"
1201 0 : )));
1202 2184 : }
1203 2184 :
1204 2184 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
1205 : // maybe useful in the future? could implement this at some point
1206 0 : return Err(CompactionError::Other(anyhow!(
1207 0 : "compaction range is not supported for legacy compaction for now"
1208 0 : )));
1209 2184 : }
1210 2184 :
1211 2184 : // High level strategy for compaction / image creation:
1212 2184 : //
1213 2184 : // 1. First, do a L0 compaction to ensure we move the L0
1214 2184 : // layers into the historic layer map get flat levels of
1215 2184 : // layers. If we did not compact all L0 layers, we will
1216 2184 : // prioritize compacting the timeline again and not do
1217 2184 : // any of the compactions below.
1218 2184 : //
1219 2184 : // 2. Then, calculate the desired "partitioning" of the
1220 2184 : // currently in-use key space. The goal is to partition the
1221 2184 : // key space into roughly fixed-size chunks, but also take into
1222 2184 : // account any existing image layers, and try to align the
1223 2184 : // chunk boundaries with the existing image layers to avoid
1224 2184 : // too much churn. Also try to align chunk boundaries with
1225 2184 : // relation boundaries. In principle, we don't know about
1226 2184 : // relation boundaries here, we just deal with key-value
1227 2184 : // pairs, and the code in pgdatadir_mapping.rs knows how to
1228 2184 : // map relations into key-value pairs. But in practice we know
1229 2184 : // that 'field6' is the block number, and the fields 1-5
1230 2184 : // identify a relation. This is just an optimization,
1231 2184 : // though.
1232 2184 : //
1233 2184 : // 3. Once we know the partitioning, for each partition,
1234 2184 : // decide if it's time to create a new image layer. The
1235 2184 : // criteria is: there has been too much "churn" since the last
1236 2184 : // image layer? The "churn" is fuzzy concept, it's a
1237 2184 : // combination of too many delta files, or too much WAL in
1238 2184 : // total in the delta file. Or perhaps: if creating an image
1239 2184 : // file would allow to delete some older files.
1240 2184 : //
1241 2184 : // 4. In the end, if the tenant gets auto-sharded, we will run
1242 2184 : // a shard-ancestor compaction.
1243 2184 :
1244 2184 : // Is the timeline being deleted?
1245 2184 : if self.is_stopping() {
1246 0 : trace!("Dropping out of compaction on timeline shutdown");
1247 0 : return Err(CompactionError::ShuttingDown);
1248 2184 : }
1249 2184 :
1250 2184 : let target_file_size = self.get_checkpoint_distance();
1251 :
1252 : // Define partitioning schema if needed
1253 :
1254 : // 1. L0 Compact
1255 2184 : let l0_outcome = {
1256 2184 : let timer = self.metrics.compact_time_histo.start_timer();
1257 2184 : let l0_outcome = self
1258 2184 : .compact_level0(
1259 2184 : target_file_size,
1260 2184 : options.flags.contains(CompactFlags::ForceL0Compaction),
1261 2184 : ctx,
1262 2184 : )
1263 2184 : .await?;
1264 2184 : timer.stop_and_record();
1265 2184 : l0_outcome
1266 2184 : };
1267 2184 :
1268 2184 : if options.flags.contains(CompactFlags::OnlyL0Compaction) {
1269 0 : return Ok(l0_outcome);
1270 2184 : }
1271 2184 :
1272 2184 : // Yield if we have pending L0 compaction. The scheduler will do another pass.
1273 2184 : if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
1274 0 : && options.flags.contains(CompactFlags::YieldForL0)
1275 : {
1276 0 : info!("image/ancestor compaction yielding for L0 compaction");
1277 0 : return Ok(CompactionOutcome::YieldForL0);
1278 2184 : }
1279 2184 :
1280 2184 : // 2. Repartition and create image layers if necessary
1281 2184 : match self
1282 2184 : .repartition(
1283 2184 : self.get_last_record_lsn(),
1284 2184 : self.get_compaction_target_size(),
1285 2184 : options.flags,
1286 2184 : ctx,
1287 2184 : )
1288 2184 : .await
1289 : {
1290 2184 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
1291 2184 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
1292 2184 : let image_ctx = RequestContextBuilder::from(ctx)
1293 2184 : .access_stats_behavior(AccessStatsBehavior::Skip)
1294 2184 : .attached_child();
1295 2184 :
1296 2184 : let mut partitioning = dense_partitioning;
1297 2184 : partitioning
1298 2184 : .parts
1299 2184 : .extend(sparse_partitioning.into_dense().parts);
1300 :
1301 : // 3. Create new image layers for partitions that have been modified "enough".
1302 2184 : let (image_layers, outcome) = self
1303 2184 : .create_image_layers(
1304 2184 : &partitioning,
1305 2184 : lsn,
1306 2184 : if options
1307 2184 : .flags
1308 2184 : .contains(CompactFlags::ForceImageLayerCreation)
1309 : {
1310 84 : ImageLayerCreationMode::Force
1311 : } else {
1312 2100 : ImageLayerCreationMode::Try
1313 : },
1314 2184 : &image_ctx,
1315 2184 : self.last_image_layer_creation_status
1316 2184 : .load()
1317 2184 : .as_ref()
1318 2184 : .clone(),
1319 2184 : options.flags.contains(CompactFlags::YieldForL0),
1320 2184 : )
1321 2184 : .await
1322 2184 : .inspect_err(|err| {
1323 : if let CreateImageLayersError::GetVectoredError(
1324 : GetVectoredError::MissingKey(_),
1325 0 : ) = err
1326 : {
1327 0 : critical!("missing key during compaction: {err:?}");
1328 0 : }
1329 2184 : })?;
1330 :
1331 2184 : self.last_image_layer_creation_status
1332 2184 : .store(Arc::new(outcome.clone()));
1333 2184 :
1334 2184 : self.upload_new_image_layers(image_layers)?;
1335 2184 : if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
1336 : // Yield and do not do any other kind of compaction.
1337 0 : info!(
1338 0 : "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
1339 : );
1340 0 : return Ok(CompactionOutcome::YieldForL0);
1341 2184 : }
1342 : }
1343 :
1344 : // Suppress errors when cancelled.
1345 0 : Err(_) if self.cancel.is_cancelled() => {}
1346 0 : Err(err) if err.is_cancel() => {}
1347 :
1348 : // Alert on critical errors that indicate data corruption.
1349 0 : Err(err) if err.is_critical() => {
1350 0 : critical!("could not compact, repartitioning keyspace failed: {err:?}");
1351 : }
1352 :
1353 : // Log other errors. No partitioning? This is normal, if the timeline was just created
1354 : // as an empty timeline. Also in unit tests, when we use the timeline as a simple
1355 : // key-value store, ignoring the datadir layout. Log the error but continue.
1356 0 : Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
1357 : };
1358 :
1359 2184 : let partition_count = self.partitioning.read().0.0.parts.len();
1360 2184 :
1361 2184 : // 4. Shard ancestor compaction
1362 2184 : if self.get_compaction_shard_ancestor() && self.shard_identity.count >= ShardCount::new(2) {
1363 : // Limit the number of layer rewrites to the number of partitions: this means its
1364 : // runtime should be comparable to a full round of image layer creations, rather than
1365 : // being potentially much longer.
1366 0 : let rewrite_max = partition_count;
1367 :
1368 0 : let outcome = self
1369 0 : .compact_shard_ancestors(
1370 0 : rewrite_max,
1371 0 : options.flags.contains(CompactFlags::YieldForL0),
1372 0 : ctx,
1373 0 : )
1374 0 : .await?;
1375 0 : match outcome {
1376 0 : CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
1377 0 : CompactionOutcome::Done | CompactionOutcome::Skipped => {}
1378 : }
1379 2184 : }
1380 :
1381 2184 : Ok(CompactionOutcome::Done)
1382 2184 : }
1383 :
1384 : /// Check for layers that are elegible to be rewritten:
1385 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
1386 : /// we don't indefinitely retain keys in this shard that aren't needed.
1387 : /// - For future use: layers beyond pitr_interval that are in formats we would
1388 : /// rather not maintain compatibility with indefinitely.
1389 : ///
1390 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
1391 : /// how much work it will try to do in each compaction pass.
1392 0 : async fn compact_shard_ancestors(
1393 0 : self: &Arc<Self>,
1394 0 : rewrite_max: usize,
1395 0 : yield_for_l0: bool,
1396 0 : ctx: &RequestContext,
1397 0 : ) -> Result<CompactionOutcome, CompactionError> {
1398 0 : let mut outcome = CompactionOutcome::Done;
1399 0 : let mut drop_layers = Vec::new();
1400 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
1401 0 :
1402 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
1403 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
1404 0 : // pitr_interval, for example because a branchpoint references it.
1405 0 : //
1406 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
1407 0 : // are rewriting layers.
1408 0 : let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
1409 0 : let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
1410 :
1411 0 : let layers = self.layers.read().await;
1412 0 : let layers_iter = layers.layer_map()?.iter_historic_layers();
1413 0 : let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
1414 0 : for layer_desc in layers_iter {
1415 0 : layers_checked += 1;
1416 0 : let layer = layers.get_from_desc(&layer_desc);
1417 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
1418 : // This layer does not belong to a historic ancestor, no need to re-image it.
1419 0 : continue;
1420 0 : }
1421 0 :
1422 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
1423 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
1424 0 : let layer_local_page_count = sharded_range.page_count();
1425 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
1426 0 : if layer_local_page_count == 0 {
1427 : // This ancestral layer only covers keys that belong to other shards.
1428 : // We include the full metadata in the log: if we had some critical bug that caused
1429 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
1430 0 : debug!(%layer, old_metadata=?layer.metadata(),
1431 0 : "dropping layer after shard split, contains no keys for this shard",
1432 : );
1433 :
1434 0 : if cfg!(debug_assertions) {
1435 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
1436 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
1437 : // should be !is_key_disposable()
1438 : // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
1439 0 : let range = layer_desc.get_key_range();
1440 0 : let mut key = range.start;
1441 0 : while key < range.end {
1442 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
1443 0 : key = key.next();
1444 : }
1445 0 : }
1446 :
1447 0 : drop_layers.push(layer);
1448 0 : continue;
1449 0 : } else if layer_local_page_count != u32::MAX
1450 0 : && layer_local_page_count == layer_raw_page_count
1451 : {
1452 0 : debug!(%layer,
1453 0 : "layer is entirely shard local ({} keys), no need to filter it",
1454 : layer_local_page_count
1455 : );
1456 0 : continue;
1457 0 : }
1458 0 :
1459 0 : // Only rewrite a layer if we can reclaim significant space.
1460 0 : if layer_local_page_count != u32::MAX
1461 0 : && layer_local_page_count as f64 / layer_raw_page_count as f64
1462 0 : <= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
1463 : {
1464 0 : debug!(%layer,
1465 0 : "layer has a large share of local pages \
1466 0 : ({layer_local_page_count}/{layer_raw_page_count} > \
1467 0 : {ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
1468 : );
1469 0 : }
1470 :
1471 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
1472 : // without incurring the I/O cost of a rewrite.
1473 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
1474 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
1475 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
1476 0 : continue;
1477 0 : }
1478 0 :
1479 0 : // We do not yet implement rewrite of delta layers.
1480 0 : if layer_desc.is_delta() {
1481 0 : debug!(%layer, "Skipping rewrite of delta layer");
1482 0 : continue;
1483 0 : }
1484 0 :
1485 0 : // We don't bother rewriting layers that aren't visible, since these won't be needed by
1486 0 : // reads and will likely be garbage collected soon.
1487 0 : if layer.visibility() != LayerVisibilityHint::Visible {
1488 0 : debug!(%layer, "Skipping rewrite of invisible layer");
1489 0 : continue;
1490 0 : }
1491 0 :
1492 0 : // Only rewrite layers if their generations differ. This guarantees:
1493 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
1494 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
1495 0 : if layer.metadata().generation == self.generation {
1496 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
1497 0 : continue;
1498 0 : }
1499 0 :
1500 0 : if layers_to_rewrite.len() >= rewrite_max {
1501 0 : debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
1502 0 : layers_to_rewrite.len()
1503 : );
1504 0 : outcome = CompactionOutcome::Pending;
1505 0 : break;
1506 0 : }
1507 0 :
1508 0 : // Fall through: all our conditions for doing a rewrite passed.
1509 0 : layers_to_rewrite.push(layer);
1510 : }
1511 :
1512 : // Drop read lock on layer map before we start doing time-consuming I/O.
1513 0 : drop(layers);
1514 0 :
1515 0 : // Drop out early if there's nothing to do.
1516 0 : if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
1517 0 : return Ok(CompactionOutcome::Done);
1518 0 : }
1519 0 :
1520 0 : info!(
1521 0 : "starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
1522 0 : checked {layers_checked}/{layers_total} layers \
1523 0 : (latest_gc_cutoff={} pitr_cutoff={})",
1524 0 : layers_to_rewrite.len(),
1525 0 : drop_layers.len(),
1526 0 : *latest_gc_cutoff,
1527 : pitr_cutoff,
1528 : );
1529 0 : let started = Instant::now();
1530 0 :
1531 0 : let mut replace_image_layers = Vec::new();
1532 :
1533 0 : for layer in layers_to_rewrite {
1534 0 : if self.cancel.is_cancelled() {
1535 0 : return Err(CompactionError::ShuttingDown);
1536 0 : }
1537 0 :
1538 0 : info!(layer=%layer, "rewriting layer after shard split");
1539 0 : let mut image_layer_writer = ImageLayerWriter::new(
1540 0 : self.conf,
1541 0 : self.timeline_id,
1542 0 : self.tenant_shard_id,
1543 0 : &layer.layer_desc().key_range,
1544 0 : layer.layer_desc().image_layer_lsn(),
1545 0 : &self.gate,
1546 0 : self.cancel.clone(),
1547 0 : ctx,
1548 0 : )
1549 0 : .await
1550 0 : .map_err(CompactionError::Other)?;
1551 :
1552 : // Safety of layer rewrites:
1553 : // - We are writing to a different local file path than we are reading from, so the old Layer
1554 : // cannot interfere with the new one.
1555 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
1556 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
1557 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
1558 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
1559 : // - Any readers that have a reference to the old layer will keep it alive until they are done
1560 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
1561 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
1562 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
1563 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
1564 : // - ingestion, which only inserts layers, therefore cannot collide with us.
1565 0 : let resident = layer.download_and_keep_resident(ctx).await?;
1566 :
1567 0 : let keys_written = resident
1568 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
1569 0 : .await?;
1570 :
1571 0 : if keys_written > 0 {
1572 0 : let (desc, path) = image_layer_writer
1573 0 : .finish(ctx)
1574 0 : .await
1575 0 : .map_err(CompactionError::Other)?;
1576 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
1577 0 : .map_err(CompactionError::Other)?;
1578 0 : info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
1579 0 : layer.metadata().file_size,
1580 0 : new_layer.metadata().file_size);
1581 :
1582 0 : replace_image_layers.push((layer, new_layer));
1583 0 : } else {
1584 0 : // Drop the old layer. Usually for this case we would already have noticed that
1585 0 : // the layer has no data for us with the ShardedRange check above, but
1586 0 : drop_layers.push(layer);
1587 0 : }
1588 :
1589 : // Yield for L0 compaction if necessary, but make sure we update the layer map below
1590 : // with the work we've already done.
1591 0 : if yield_for_l0
1592 0 : && self
1593 0 : .l0_compaction_trigger
1594 0 : .notified()
1595 0 : .now_or_never()
1596 0 : .is_some()
1597 : {
1598 0 : info!("shard ancestor compaction yielding for L0 compaction");
1599 0 : outcome = CompactionOutcome::YieldForL0;
1600 0 : break;
1601 0 : }
1602 : }
1603 :
1604 0 : for layer in &drop_layers {
1605 0 : info!(%layer, old_metadata=?layer.metadata(),
1606 0 : "dropping layer after shard split (no keys for this shard)",
1607 : );
1608 : }
1609 :
1610 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
1611 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
1612 : // to remote index) and be removed. This is inefficient but safe.
1613 0 : fail::fail_point!("compact-shard-ancestors-localonly");
1614 0 :
1615 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
1616 0 : self.rewrite_layers(replace_image_layers, drop_layers)
1617 0 : .await?;
1618 :
1619 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
1620 0 :
1621 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
1622 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
1623 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
1624 0 : // load.
1625 0 : if outcome != CompactionOutcome::YieldForL0 {
1626 0 : info!("shard ancestor compaction waiting for uploads");
1627 0 : tokio::select! {
1628 0 : result = self.remote_client.wait_completion() => match result {
1629 0 : Ok(()) => {},
1630 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
1631 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
1632 0 : return Err(CompactionError::ShuttingDown);
1633 : }
1634 : },
1635 : // Don't wait if there's L0 compaction to do. We don't need to update the outcome
1636 : // here, because we've already done the actual work.
1637 0 : _ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
1638 : }
1639 0 : }
1640 :
1641 0 : info!(
1642 0 : "shard ancestor compaction done in {:.3}s{}",
1643 0 : started.elapsed().as_secs_f64(),
1644 0 : match outcome {
1645 : CompactionOutcome::Pending =>
1646 0 : format!(", with pending work (rewrite_max={rewrite_max})"),
1647 0 : CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
1648 0 : CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
1649 : }
1650 : );
1651 :
1652 0 : fail::fail_point!("compact-shard-ancestors-persistent");
1653 0 :
1654 0 : Ok(outcome)
1655 0 : }
1656 :
1657 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
1658 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
1659 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
1660 : ///
1661 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
1662 : /// that we know won't be needed for reads.
1663 1440 : pub(crate) async fn update_layer_visibility(
1664 1440 : &self,
1665 1440 : ) -> Result<(), super::layer_manager::Shutdown> {
1666 1440 : let head_lsn = self.get_last_record_lsn();
1667 :
1668 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
1669 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
1670 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
1671 : // they will be subject to L0->L1 compaction in the near future.
1672 1440 : let layer_manager = self.layers.read().await;
1673 1440 : let layer_map = layer_manager.layer_map()?;
1674 :
1675 1440 : let readable_points = {
1676 1440 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
1677 1440 :
1678 1440 : let mut readable_points = Vec::with_capacity(children.len() + 1);
1679 1440 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
1680 0 : if *is_offloaded == MaybeOffloaded::Yes {
1681 0 : continue;
1682 0 : }
1683 0 : readable_points.push(*child_lsn);
1684 : }
1685 1440 : readable_points.push(head_lsn);
1686 1440 : readable_points
1687 1440 : };
1688 1440 :
1689 1440 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
1690 3624 : for (layer_desc, visibility) in layer_visibility {
1691 2184 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
1692 2184 : let layer = layer_manager.get_from_desc(&layer_desc);
1693 2184 : layer.set_visibility(visibility);
1694 2184 : }
1695 :
1696 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
1697 : // avoid assuming that everything at a branch point is visible.
1698 1440 : drop(covered);
1699 1440 : Ok(())
1700 1440 : }
1701 :
1702 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1703 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1704 2184 : async fn compact_level0(
1705 2184 : self: &Arc<Self>,
1706 2184 : target_file_size: u64,
1707 2184 : force_compaction_ignore_threshold: bool,
1708 2184 : ctx: &RequestContext,
1709 2184 : ) -> Result<CompactionOutcome, CompactionError> {
1710 : let CompactLevel0Phase1Result {
1711 2184 : new_layers,
1712 2184 : deltas_to_compact,
1713 2184 : outcome,
1714 : } = {
1715 2184 : let phase1_span = info_span!("compact_level0_phase1");
1716 2184 : let ctx = ctx.attached_child();
1717 2184 : let mut stats = CompactLevel0Phase1StatsBuilder {
1718 2184 : version: Some(2),
1719 2184 : tenant_id: Some(self.tenant_shard_id),
1720 2184 : timeline_id: Some(self.timeline_id),
1721 2184 : ..Default::default()
1722 2184 : };
1723 2184 :
1724 2184 : let begin = tokio::time::Instant::now();
1725 2184 : let phase1_layers_locked = self.layers.read().await;
1726 2184 : let now = tokio::time::Instant::now();
1727 2184 : stats.read_lock_acquisition_micros =
1728 2184 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1729 2184 : self.compact_level0_phase1(
1730 2184 : phase1_layers_locked,
1731 2184 : stats,
1732 2184 : target_file_size,
1733 2184 : force_compaction_ignore_threshold,
1734 2184 : &ctx,
1735 2184 : )
1736 2184 : .instrument(phase1_span)
1737 2184 : .await?
1738 : };
1739 :
1740 2184 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1741 : // nothing to do
1742 2016 : return Ok(CompactionOutcome::Done);
1743 168 : }
1744 168 :
1745 168 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1746 168 : .await?;
1747 168 : Ok(outcome)
1748 2184 : }
1749 :
1750 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1751 2184 : async fn compact_level0_phase1<'a>(
1752 2184 : self: &'a Arc<Self>,
1753 2184 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1754 2184 : mut stats: CompactLevel0Phase1StatsBuilder,
1755 2184 : target_file_size: u64,
1756 2184 : force_compaction_ignore_threshold: bool,
1757 2184 : ctx: &RequestContext,
1758 2184 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1759 2184 : stats.read_lock_held_spawn_blocking_startup_micros =
1760 2184 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1761 2184 : let layers = guard.layer_map()?;
1762 2184 : let level0_deltas = layers.level0_deltas();
1763 2184 : stats.level0_deltas_count = Some(level0_deltas.len());
1764 2184 :
1765 2184 : // Only compact if enough layers have accumulated.
1766 2184 : let threshold = self.get_compaction_threshold();
1767 2184 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1768 2016 : if force_compaction_ignore_threshold {
1769 0 : if !level0_deltas.is_empty() {
1770 0 : info!(
1771 0 : level0_deltas = level0_deltas.len(),
1772 0 : threshold, "too few deltas to compact, but forcing compaction"
1773 : );
1774 : } else {
1775 0 : info!(
1776 0 : level0_deltas = level0_deltas.len(),
1777 0 : threshold, "too few deltas to compact, cannot force compaction"
1778 : );
1779 0 : return Ok(CompactLevel0Phase1Result::default());
1780 : }
1781 : } else {
1782 2016 : debug!(
1783 0 : level0_deltas = level0_deltas.len(),
1784 0 : threshold, "too few deltas to compact"
1785 : );
1786 2016 : return Ok(CompactLevel0Phase1Result::default());
1787 : }
1788 168 : }
1789 :
1790 168 : let mut level0_deltas = level0_deltas
1791 168 : .iter()
1792 2412 : .map(|x| guard.get_from_desc(x))
1793 168 : .collect::<Vec<_>>();
1794 168 :
1795 168 : // Gather the files to compact in this iteration.
1796 168 : //
1797 168 : // Start with the oldest Level 0 delta file, and collect any other
1798 168 : // level 0 files that form a contiguous sequence, such that the end
1799 168 : // LSN of previous file matches the start LSN of the next file.
1800 168 : //
1801 168 : // Note that if the files don't form such a sequence, we might
1802 168 : // "compact" just a single file. That's a bit pointless, but it allows
1803 168 : // us to get rid of the level 0 file, and compact the other files on
1804 168 : // the next iteration. This could probably made smarter, but such
1805 168 : // "gaps" in the sequence of level 0 files should only happen in case
1806 168 : // of a crash, partial download from cloud storage, or something like
1807 168 : // that, so it's not a big deal in practice.
1808 4488 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1809 168 : let mut level0_deltas_iter = level0_deltas.iter();
1810 168 :
1811 168 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1812 168 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1813 168 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1814 168 :
1815 168 : // Accumulate the size of layers in `deltas_to_compact`
1816 168 : let mut deltas_to_compact_bytes = 0;
1817 168 :
1818 168 : // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
1819 168 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1820 168 : // work in this function to only operate on this much delta data at once.
1821 168 : //
1822 168 : // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
1823 168 : // the constraint is not respected, we use the larger of the two.
1824 168 : let delta_size_limit = std::cmp::max(
1825 168 : self.get_compaction_upper_limit(),
1826 168 : self.get_compaction_threshold(),
1827 168 : ) as u64
1828 168 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1829 168 :
1830 168 : let mut fully_compacted = true;
1831 168 :
1832 168 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
1833 2412 : for l in level0_deltas_iter {
1834 2244 : let lsn_range = &l.layer_desc().lsn_range;
1835 2244 :
1836 2244 : if lsn_range.start != prev_lsn_end {
1837 0 : break;
1838 2244 : }
1839 2244 : deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
1840 2244 : deltas_to_compact_bytes += l.metadata().file_size;
1841 2244 : prev_lsn_end = lsn_range.end;
1842 2244 :
1843 2244 : if deltas_to_compact_bytes >= delta_size_limit {
1844 0 : info!(
1845 0 : l0_deltas_selected = deltas_to_compact.len(),
1846 0 : l0_deltas_total = level0_deltas.len(),
1847 0 : "L0 compaction picker hit max delta layer size limit: {}",
1848 : delta_size_limit
1849 : );
1850 0 : fully_compacted = false;
1851 0 :
1852 0 : // Proceed with compaction, but only a subset of L0s
1853 0 : break;
1854 2244 : }
1855 : }
1856 168 : let lsn_range = Range {
1857 168 : start: deltas_to_compact
1858 168 : .first()
1859 168 : .unwrap()
1860 168 : .layer_desc()
1861 168 : .lsn_range
1862 168 : .start,
1863 168 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1864 168 : };
1865 168 :
1866 168 : info!(
1867 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1868 0 : lsn_range.start,
1869 0 : lsn_range.end,
1870 0 : deltas_to_compact.len(),
1871 0 : level0_deltas.len()
1872 : );
1873 :
1874 2412 : for l in deltas_to_compact.iter() {
1875 2412 : info!("compact includes {l}");
1876 : }
1877 :
1878 : // We don't need the original list of layers anymore. Drop it so that
1879 : // we don't accidentally use it later in the function.
1880 168 : drop(level0_deltas);
1881 168 :
1882 168 : stats.read_lock_held_prerequisites_micros = stats
1883 168 : .read_lock_held_spawn_blocking_startup_micros
1884 168 : .till_now();
1885 :
1886 : // TODO: replace with streaming k-merge
1887 168 : let all_keys = {
1888 168 : let mut all_keys = Vec::new();
1889 2412 : for l in deltas_to_compact.iter() {
1890 2412 : if self.cancel.is_cancelled() {
1891 0 : return Err(CompactionError::ShuttingDown);
1892 2412 : }
1893 2412 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1894 2412 : let keys = delta
1895 2412 : .index_entries(ctx)
1896 2412 : .await
1897 2412 : .map_err(CompactionError::Other)?;
1898 2412 : all_keys.extend(keys);
1899 : }
1900 : // The current stdlib sorting implementation is designed in a way where it is
1901 : // particularly fast where the slice is made up of sorted sub-ranges.
1902 26542600 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1903 168 : all_keys
1904 168 : };
1905 168 :
1906 168 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1907 :
1908 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1909 : //
1910 : // A hole is a key range for which this compaction doesn't have any WAL records.
1911 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1912 : // cover the hole, but actually don't contain any WAL records for that key range.
1913 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1914 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1915 : //
1916 : // The algorithm chooses holes as follows.
1917 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1918 : // - Filter: min threshold on range length
1919 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1920 : //
1921 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1922 : #[derive(PartialEq, Eq)]
1923 : struct Hole {
1924 : key_range: Range<Key>,
1925 : coverage_size: usize,
1926 : }
1927 168 : let holes: Vec<Hole> = {
1928 : use std::cmp::Ordering;
1929 : impl Ord for Hole {
1930 0 : fn cmp(&self, other: &Self) -> Ordering {
1931 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1932 0 : }
1933 : }
1934 : impl PartialOrd for Hole {
1935 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1936 0 : Some(self.cmp(other))
1937 0 : }
1938 : }
1939 168 : let max_holes = deltas_to_compact.len();
1940 168 : let last_record_lsn = self.get_last_record_lsn();
1941 168 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1942 168 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1943 168 : // min-heap (reserve space for one more element added before eviction)
1944 168 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1945 168 : let mut prev: Option<Key> = None;
1946 :
1947 12384228 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1948 12384228 : if let Some(prev_key) = prev {
1949 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1950 : // compaction is the gap between data key and metadata keys.
1951 12384060 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1952 0 : && !Key::is_metadata_key(&prev_key)
1953 : {
1954 0 : let key_range = prev_key..next_key;
1955 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1956 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1957 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1958 0 : // That is why it is better to measure size of hole as number of covering image layers.
1959 0 : let coverage_size =
1960 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1961 0 : if coverage_size >= min_hole_coverage_size {
1962 0 : heap.push(Hole {
1963 0 : key_range,
1964 0 : coverage_size,
1965 0 : });
1966 0 : if heap.len() > max_holes {
1967 0 : heap.pop(); // remove smallest hole
1968 0 : }
1969 0 : }
1970 12384060 : }
1971 168 : }
1972 12384228 : prev = Some(next_key.next());
1973 : }
1974 168 : let mut holes = heap.into_vec();
1975 168 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1976 168 : holes
1977 168 : };
1978 168 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1979 168 : drop_rlock(guard);
1980 168 :
1981 168 : if self.cancel.is_cancelled() {
1982 0 : return Err(CompactionError::ShuttingDown);
1983 168 : }
1984 168 :
1985 168 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1986 :
1987 : // This iterator walks through all key-value pairs from all the layers
1988 : // we're compacting, in key, LSN order.
1989 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1990 : // then the Value::Image is ordered before Value::WalRecord.
1991 168 : let mut all_values_iter = {
1992 168 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1993 2412 : for l in deltas_to_compact.iter() {
1994 2412 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1995 2412 : deltas.push(l);
1996 : }
1997 168 : MergeIterator::create(&deltas, &[], ctx)
1998 168 : };
1999 168 :
2000 168 : // This iterator walks through all keys and is needed to calculate size used by each key
2001 168 : let mut all_keys_iter = all_keys
2002 168 : .iter()
2003 12384228 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
2004 12384060 : .coalesce(|mut prev, cur| {
2005 12384060 : // Coalesce keys that belong to the same key pair.
2006 12384060 : // This ensures that compaction doesn't put them
2007 12384060 : // into different layer files.
2008 12384060 : // Still limit this by the target file size,
2009 12384060 : // so that we keep the size of the files in
2010 12384060 : // check.
2011 12384060 : if prev.0 == cur.0 && prev.2 < target_file_size {
2012 240228 : prev.2 += cur.2;
2013 240228 : Ok(prev)
2014 : } else {
2015 12143832 : Err((prev, cur))
2016 : }
2017 12384060 : });
2018 168 :
2019 168 : // Merge the contents of all the input delta layers into a new set
2020 168 : // of delta layers, based on the current partitioning.
2021 168 : //
2022 168 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
2023 168 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
2024 168 : // would be too large. In that case, we also split on the LSN dimension.
2025 168 : //
2026 168 : // LSN
2027 168 : // ^
2028 168 : // |
2029 168 : // | +-----------+ +--+--+--+--+
2030 168 : // | | | | | | | |
2031 168 : // | +-----------+ | | | | |
2032 168 : // | | | | | | | |
2033 168 : // | +-----------+ ==> | | | | |
2034 168 : // | | | | | | | |
2035 168 : // | +-----------+ | | | | |
2036 168 : // | | | | | | | |
2037 168 : // | +-----------+ +--+--+--+--+
2038 168 : // |
2039 168 : // +--------------> key
2040 168 : //
2041 168 : //
2042 168 : // If one key (X) has a lot of page versions:
2043 168 : //
2044 168 : // LSN
2045 168 : // ^
2046 168 : // | (X)
2047 168 : // | +-----------+ +--+--+--+--+
2048 168 : // | | | | | | | |
2049 168 : // | +-----------+ | | +--+ |
2050 168 : // | | | | | | | |
2051 168 : // | +-----------+ ==> | | | | |
2052 168 : // | | | | | +--+ |
2053 168 : // | +-----------+ | | | | |
2054 168 : // | | | | | | | |
2055 168 : // | +-----------+ +--+--+--+--+
2056 168 : // |
2057 168 : // +--------------> key
2058 168 : // TODO: this actually divides the layers into fixed-size chunks, not
2059 168 : // based on the partitioning.
2060 168 : //
2061 168 : // TODO: we should also opportunistically materialize and
2062 168 : // garbage collect what we can.
2063 168 : let mut new_layers = Vec::new();
2064 168 : let mut prev_key: Option<Key> = None;
2065 168 : let mut writer: Option<DeltaLayerWriter> = None;
2066 168 : let mut key_values_total_size = 0u64;
2067 168 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
2068 168 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
2069 168 : let mut next_hole = 0; // index of next hole in holes vector
2070 168 :
2071 168 : let mut keys = 0;
2072 :
2073 12384396 : while let Some((key, lsn, value)) = all_values_iter
2074 12384396 : .next()
2075 12384396 : .await
2076 12384396 : .map_err(CompactionError::Other)?
2077 : {
2078 12384228 : keys += 1;
2079 12384228 :
2080 12384228 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
2081 : // avoid hitting the cancellation token on every key. in benches, we end up
2082 : // shuffling an order of million keys per layer, this means we'll check it
2083 : // around tens of times per layer.
2084 0 : return Err(CompactionError::ShuttingDown);
2085 12384228 : }
2086 12384228 :
2087 12384228 : let same_key = prev_key == Some(key);
2088 12384228 : // We need to check key boundaries once we reach next key or end of layer with the same key
2089 12384228 : if !same_key || lsn == dup_end_lsn {
2090 12144000 : let mut next_key_size = 0u64;
2091 12144000 : let is_dup_layer = dup_end_lsn.is_valid();
2092 12144000 : dup_start_lsn = Lsn::INVALID;
2093 12144000 : if !same_key {
2094 12144000 : dup_end_lsn = Lsn::INVALID;
2095 12144000 : }
2096 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
2097 12144000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
2098 12144000 : next_key_size = next_size;
2099 12144000 : if key != next_key {
2100 12143832 : if dup_end_lsn.is_valid() {
2101 0 : // We are writting segment with duplicates:
2102 0 : // place all remaining values of this key in separate segment
2103 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
2104 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
2105 12143832 : }
2106 12143832 : break;
2107 168 : }
2108 168 : key_values_total_size += next_size;
2109 168 : // Check if it is time to split segment: if total keys size is larger than target file size.
2110 168 : // We need to avoid generation of empty segments if next_size > target_file_size.
2111 168 : if key_values_total_size > target_file_size && lsn != next_lsn {
2112 : // Split key between multiple layers: such layer can contain only single key
2113 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
2114 0 : dup_end_lsn // new segment with duplicates starts where old one stops
2115 : } else {
2116 0 : lsn // start with the first LSN for this key
2117 : };
2118 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
2119 0 : break;
2120 168 : }
2121 : }
2122 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
2123 12144000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
2124 0 : dup_start_lsn = dup_end_lsn;
2125 0 : dup_end_lsn = lsn_range.end;
2126 12144000 : }
2127 12144000 : if writer.is_some() {
2128 12143832 : let written_size = writer.as_mut().unwrap().size();
2129 12143832 : let contains_hole =
2130 12143832 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
2131 : // check if key cause layer overflow or contains hole...
2132 12143832 : if is_dup_layer
2133 12143832 : || dup_end_lsn.is_valid()
2134 12143832 : || written_size + key_values_total_size > target_file_size
2135 12142152 : || contains_hole
2136 : {
2137 : // ... if so, flush previous layer and prepare to write new one
2138 1680 : let (desc, path) = writer
2139 1680 : .take()
2140 1680 : .unwrap()
2141 1680 : .finish(prev_key.unwrap().next(), ctx)
2142 1680 : .await
2143 1680 : .map_err(CompactionError::Other)?;
2144 1680 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2145 1680 : .map_err(CompactionError::Other)?;
2146 :
2147 1680 : new_layers.push(new_delta);
2148 1680 : writer = None;
2149 1680 :
2150 1680 : if contains_hole {
2151 0 : // skip hole
2152 0 : next_hole += 1;
2153 1680 : }
2154 12142152 : }
2155 168 : }
2156 : // Remember size of key value because at next iteration we will access next item
2157 12144000 : key_values_total_size = next_key_size;
2158 240228 : }
2159 12384228 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2160 0 : Err(CompactionError::Other(anyhow::anyhow!(
2161 0 : "failpoint delta-layer-writer-fail-before-finish"
2162 0 : )))
2163 12384228 : });
2164 :
2165 12384228 : if !self.shard_identity.is_key_disposable(&key) {
2166 12384228 : if writer.is_none() {
2167 1848 : if self.cancel.is_cancelled() {
2168 : // to be somewhat responsive to cancellation, check for each new layer
2169 0 : return Err(CompactionError::ShuttingDown);
2170 1848 : }
2171 : // Create writer if not initiaized yet
2172 1848 : writer = Some(
2173 : DeltaLayerWriter::new(
2174 1848 : self.conf,
2175 1848 : self.timeline_id,
2176 1848 : self.tenant_shard_id,
2177 1848 : key,
2178 1848 : if dup_end_lsn.is_valid() {
2179 : // this is a layer containing slice of values of the same key
2180 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
2181 0 : dup_start_lsn..dup_end_lsn
2182 : } else {
2183 1848 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2184 1848 : lsn_range.clone()
2185 : },
2186 1848 : &self.gate,
2187 1848 : self.cancel.clone(),
2188 1848 : ctx,
2189 1848 : )
2190 1848 : .await
2191 1848 : .map_err(CompactionError::Other)?,
2192 : );
2193 :
2194 1848 : keys = 0;
2195 12382380 : }
2196 :
2197 12384228 : writer
2198 12384228 : .as_mut()
2199 12384228 : .unwrap()
2200 12384228 : .put_value(key, lsn, value, ctx)
2201 12384228 : .await
2202 12384228 : .map_err(CompactionError::Other)?;
2203 : } else {
2204 0 : let owner = self.shard_identity.get_shard_number(&key);
2205 0 :
2206 0 : // This happens after a shard split, when we're compacting an L0 created by our parent shard
2207 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
2208 : }
2209 :
2210 12384228 : if !new_layers.is_empty() {
2211 118716 : fail_point!("after-timeline-compacted-first-L1");
2212 12265512 : }
2213 :
2214 12384228 : prev_key = Some(key);
2215 : }
2216 168 : if let Some(writer) = writer {
2217 168 : let (desc, path) = writer
2218 168 : .finish(prev_key.unwrap().next(), ctx)
2219 168 : .await
2220 168 : .map_err(CompactionError::Other)?;
2221 168 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2222 168 : .map_err(CompactionError::Other)?;
2223 168 : new_layers.push(new_delta);
2224 0 : }
2225 :
2226 : // Sync layers
2227 168 : if !new_layers.is_empty() {
2228 : // Print a warning if the created layer is larger than double the target size
2229 : // Add two pages for potential overhead. This should in theory be already
2230 : // accounted for in the target calculation, but for very small targets,
2231 : // we still might easily hit the limit otherwise.
2232 168 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
2233 1848 : for layer in new_layers.iter() {
2234 1848 : if layer.layer_desc().file_size > warn_limit {
2235 0 : warn!(
2236 : %layer,
2237 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
2238 : );
2239 1848 : }
2240 : }
2241 :
2242 : // The writer.finish() above already did the fsync of the inodes.
2243 : // We just need to fsync the directory in which these inodes are linked,
2244 : // which we know to be the timeline directory.
2245 : //
2246 : // We use fatal_err() below because the after writer.finish() returns with success,
2247 : // the in-memory state of the filesystem already has the layer file in its final place,
2248 : // and subsequent pageserver code could think it's durable while it really isn't.
2249 168 : let timeline_dir = VirtualFile::open(
2250 168 : &self
2251 168 : .conf
2252 168 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
2253 168 : ctx,
2254 168 : )
2255 168 : .await
2256 168 : .fatal_err("VirtualFile::open for timeline dir fsync");
2257 168 : timeline_dir
2258 168 : .sync_all()
2259 168 : .await
2260 168 : .fatal_err("VirtualFile::sync_all timeline dir");
2261 0 : }
2262 :
2263 168 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
2264 168 : stats.new_deltas_count = Some(new_layers.len());
2265 1848 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
2266 168 :
2267 168 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
2268 168 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
2269 : {
2270 168 : Ok(stats_json) => {
2271 168 : info!(
2272 0 : stats_json = stats_json.as_str(),
2273 0 : "compact_level0_phase1 stats available"
2274 : )
2275 : }
2276 0 : Err(e) => {
2277 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
2278 : }
2279 : }
2280 :
2281 : // Without this, rustc complains about deltas_to_compact still
2282 : // being borrowed when we `.into_iter()` below.
2283 168 : drop(all_values_iter);
2284 168 :
2285 168 : Ok(CompactLevel0Phase1Result {
2286 168 : new_layers,
2287 168 : deltas_to_compact: deltas_to_compact
2288 168 : .into_iter()
2289 2412 : .map(|x| x.drop_eviction_guard())
2290 168 : .collect::<Vec<_>>(),
2291 168 : outcome: if fully_compacted {
2292 168 : CompactionOutcome::Done
2293 : } else {
2294 0 : CompactionOutcome::Pending
2295 : },
2296 : })
2297 2184 : }
2298 : }
2299 :
2300 : #[derive(Default)]
2301 : struct CompactLevel0Phase1Result {
2302 : new_layers: Vec<ResidentLayer>,
2303 : deltas_to_compact: Vec<Layer>,
2304 : // Whether we have included all L0 layers, or selected only part of them due to the
2305 : // L0 compaction size limit.
2306 : outcome: CompactionOutcome,
2307 : }
2308 :
2309 : #[derive(Default)]
2310 : struct CompactLevel0Phase1StatsBuilder {
2311 : version: Option<u64>,
2312 : tenant_id: Option<TenantShardId>,
2313 : timeline_id: Option<TimelineId>,
2314 : read_lock_acquisition_micros: DurationRecorder,
2315 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
2316 : read_lock_held_key_sort_micros: DurationRecorder,
2317 : read_lock_held_prerequisites_micros: DurationRecorder,
2318 : read_lock_held_compute_holes_micros: DurationRecorder,
2319 : read_lock_drop_micros: DurationRecorder,
2320 : write_layer_files_micros: DurationRecorder,
2321 : level0_deltas_count: Option<usize>,
2322 : new_deltas_count: Option<usize>,
2323 : new_deltas_size: Option<u64>,
2324 : }
2325 :
2326 : #[derive(serde::Serialize)]
2327 : struct CompactLevel0Phase1Stats {
2328 : version: u64,
2329 : tenant_id: TenantShardId,
2330 : timeline_id: TimelineId,
2331 : read_lock_acquisition_micros: RecordedDuration,
2332 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
2333 : read_lock_held_key_sort_micros: RecordedDuration,
2334 : read_lock_held_prerequisites_micros: RecordedDuration,
2335 : read_lock_held_compute_holes_micros: RecordedDuration,
2336 : read_lock_drop_micros: RecordedDuration,
2337 : write_layer_files_micros: RecordedDuration,
2338 : level0_deltas_count: usize,
2339 : new_deltas_count: usize,
2340 : new_deltas_size: u64,
2341 : }
2342 :
2343 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
2344 : type Error = anyhow::Error;
2345 :
2346 168 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
2347 168 : Ok(Self {
2348 168 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
2349 168 : tenant_id: value
2350 168 : .tenant_id
2351 168 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
2352 168 : timeline_id: value
2353 168 : .timeline_id
2354 168 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
2355 168 : read_lock_acquisition_micros: value
2356 168 : .read_lock_acquisition_micros
2357 168 : .into_recorded()
2358 168 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
2359 168 : read_lock_held_spawn_blocking_startup_micros: value
2360 168 : .read_lock_held_spawn_blocking_startup_micros
2361 168 : .into_recorded()
2362 168 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
2363 168 : read_lock_held_key_sort_micros: value
2364 168 : .read_lock_held_key_sort_micros
2365 168 : .into_recorded()
2366 168 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
2367 168 : read_lock_held_prerequisites_micros: value
2368 168 : .read_lock_held_prerequisites_micros
2369 168 : .into_recorded()
2370 168 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
2371 168 : read_lock_held_compute_holes_micros: value
2372 168 : .read_lock_held_compute_holes_micros
2373 168 : .into_recorded()
2374 168 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
2375 168 : read_lock_drop_micros: value
2376 168 : .read_lock_drop_micros
2377 168 : .into_recorded()
2378 168 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
2379 168 : write_layer_files_micros: value
2380 168 : .write_layer_files_micros
2381 168 : .into_recorded()
2382 168 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
2383 168 : level0_deltas_count: value
2384 168 : .level0_deltas_count
2385 168 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
2386 168 : new_deltas_count: value
2387 168 : .new_deltas_count
2388 168 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
2389 168 : new_deltas_size: value
2390 168 : .new_deltas_size
2391 168 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
2392 : })
2393 168 : }
2394 : }
2395 :
2396 : impl Timeline {
2397 : /// Entry point for new tiered compaction algorithm.
2398 : ///
2399 : /// All the real work is in the implementation in the pageserver_compaction
2400 : /// crate. The code here would apply to any algorithm implemented by the
2401 : /// same interface, but tiered is the only one at the moment.
2402 : ///
2403 : /// TODO: cancellation
2404 0 : pub(crate) async fn compact_tiered(
2405 0 : self: &Arc<Self>,
2406 0 : _cancel: &CancellationToken,
2407 0 : ctx: &RequestContext,
2408 0 : ) -> Result<(), CompactionError> {
2409 0 : let fanout = self.get_compaction_threshold() as u64;
2410 0 : let target_file_size = self.get_checkpoint_distance();
2411 :
2412 : // Find the top of the historical layers
2413 0 : let end_lsn = {
2414 0 : let guard = self.layers.read().await;
2415 0 : let layers = guard.layer_map()?;
2416 :
2417 0 : let l0_deltas = layers.level0_deltas();
2418 0 :
2419 0 : // As an optimization, if we find that there are too few L0 layers,
2420 0 : // bail out early. We know that the compaction algorithm would do
2421 0 : // nothing in that case.
2422 0 : if l0_deltas.len() < fanout as usize {
2423 : // doesn't need compacting
2424 0 : return Ok(());
2425 0 : }
2426 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
2427 0 : };
2428 0 :
2429 0 : // Is the timeline being deleted?
2430 0 : if self.is_stopping() {
2431 0 : trace!("Dropping out of compaction on timeline shutdown");
2432 0 : return Err(CompactionError::ShuttingDown);
2433 0 : }
2434 :
2435 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
2436 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
2437 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
2438 0 :
2439 0 : pageserver_compaction::compact_tiered::compact_tiered(
2440 0 : &mut adaptor,
2441 0 : end_lsn,
2442 0 : target_file_size,
2443 0 : fanout,
2444 0 : ctx,
2445 0 : )
2446 0 : .await
2447 : // TODO: compact_tiered needs to return CompactionError
2448 0 : .map_err(CompactionError::Other)?;
2449 :
2450 0 : adaptor.flush_updates().await?;
2451 0 : Ok(())
2452 0 : }
2453 :
2454 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
2455 : ///
2456 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
2457 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
2458 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
2459 : ///
2460 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
2461 : ///
2462 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
2463 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
2464 : ///
2465 : /// The function will produce:
2466 : ///
2467 : /// ```plain
2468 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
2469 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
2470 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
2471 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
2472 : /// ```
2473 : ///
2474 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
2475 : #[allow(clippy::too_many_arguments)]
2476 3888 : pub(crate) async fn generate_key_retention(
2477 3888 : self: &Arc<Timeline>,
2478 3888 : key: Key,
2479 3888 : full_history: &[(Key, Lsn, Value)],
2480 3888 : horizon: Lsn,
2481 3888 : retain_lsn_below_horizon: &[Lsn],
2482 3888 : delta_threshold_cnt: usize,
2483 3888 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
2484 3888 : verification: bool,
2485 3888 : ) -> anyhow::Result<KeyHistoryRetention> {
2486 : // Pre-checks for the invariants
2487 :
2488 3888 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2489 :
2490 3888 : if debug_mode {
2491 9432 : for (log_key, _, _) in full_history {
2492 5544 : assert_eq!(log_key, &key, "mismatched key");
2493 : }
2494 3888 : for i in 1..full_history.len() {
2495 1656 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
2496 1656 : if full_history[i - 1].1 == full_history[i].1 {
2497 0 : assert!(
2498 0 : matches!(full_history[i - 1].2, Value::Image(_)),
2499 0 : "unordered delta/image, or duplicated delta"
2500 : );
2501 1656 : }
2502 : }
2503 : // There was an assertion for no base image that checks if the first
2504 : // record in the history is `will_init` before, but it was removed.
2505 : // This is explained in the test cases for generate_key_retention.
2506 : // Search "incomplete history" for more information.
2507 8568 : for lsn in retain_lsn_below_horizon {
2508 4680 : assert!(lsn < &horizon, "retain lsn must be below horizon")
2509 : }
2510 3888 : for i in 1..retain_lsn_below_horizon.len() {
2511 2136 : assert!(
2512 2136 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
2513 0 : "unordered LSN"
2514 : );
2515 : }
2516 0 : }
2517 3888 : let has_ancestor = base_img_from_ancestor.is_some();
2518 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
2519 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
2520 3888 : let (mut split_history, lsn_split_points) = {
2521 3888 : let mut split_history = Vec::new();
2522 3888 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
2523 3888 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
2524 8568 : for lsn in retain_lsn_below_horizon {
2525 4680 : lsn_split_points.push(*lsn);
2526 4680 : }
2527 3888 : lsn_split_points.push(horizon);
2528 3888 : let mut current_idx = 0;
2529 9432 : for item @ (_, lsn, _) in full_history {
2530 7008 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
2531 1464 : current_idx += 1;
2532 1464 : }
2533 5544 : split_history[current_idx].push(item);
2534 : }
2535 3888 : (split_history, lsn_split_points)
2536 : };
2537 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
2538 16344 : for split_for_lsn in &mut split_history {
2539 12456 : let mut prev_lsn = None;
2540 12456 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
2541 12456 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
2542 5544 : if let Some(prev_lsn) = &prev_lsn {
2543 744 : if *prev_lsn == lsn {
2544 : // The case that we have an LSN with both data from the delta layer and the image layer. As
2545 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
2546 : // drop this delta and keep the image.
2547 : //
2548 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
2549 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
2550 : // dropped.
2551 : //
2552 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
2553 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
2554 0 : continue;
2555 744 : }
2556 4800 : }
2557 5544 : prev_lsn = Some(lsn);
2558 5544 : new_split_for_lsn.push(record);
2559 : }
2560 12456 : *split_for_lsn = new_split_for_lsn;
2561 : }
2562 : // Step 3: generate images when necessary
2563 3888 : let mut retention = Vec::with_capacity(split_history.len());
2564 3888 : let mut records_since_last_image = 0;
2565 3888 : let batch_cnt = split_history.len();
2566 3888 : assert!(
2567 3888 : batch_cnt >= 2,
2568 0 : "should have at least below + above horizon batches"
2569 : );
2570 3888 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
2571 3888 : if let Some((key, lsn, ref img)) = base_img_from_ancestor {
2572 252 : replay_history.push((key, lsn, Value::Image(img.clone())));
2573 3636 : }
2574 :
2575 : /// Generate debug information for the replay history
2576 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
2577 : use std::fmt::Write;
2578 0 : let mut output = String::new();
2579 0 : if let Some((key, _, _)) = replay_history.first() {
2580 0 : write!(output, "key={} ", key).unwrap();
2581 0 : let mut cnt = 0;
2582 0 : for (_, lsn, val) in replay_history {
2583 0 : if val.is_image() {
2584 0 : write!(output, "i@{} ", lsn).unwrap();
2585 0 : } else if val.will_init() {
2586 0 : write!(output, "di@{} ", lsn).unwrap();
2587 0 : } else {
2588 0 : write!(output, "d@{} ", lsn).unwrap();
2589 0 : }
2590 0 : cnt += 1;
2591 0 : if cnt >= 128 {
2592 0 : write!(output, "... and more").unwrap();
2593 0 : break;
2594 0 : }
2595 : }
2596 0 : } else {
2597 0 : write!(output, "<no history>").unwrap();
2598 0 : }
2599 0 : output
2600 0 : }
2601 :
2602 0 : fn generate_debug_trace(
2603 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
2604 0 : full_history: &[(Key, Lsn, Value)],
2605 0 : lsns: &[Lsn],
2606 0 : horizon: Lsn,
2607 0 : ) -> String {
2608 : use std::fmt::Write;
2609 0 : let mut output = String::new();
2610 0 : if let Some(replay_history) = replay_history {
2611 0 : writeln!(
2612 0 : output,
2613 0 : "replay_history: {}",
2614 0 : generate_history_trace(replay_history)
2615 0 : )
2616 0 : .unwrap();
2617 0 : } else {
2618 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
2619 0 : }
2620 0 : writeln!(
2621 0 : output,
2622 0 : "full_history: {}",
2623 0 : generate_history_trace(full_history)
2624 0 : )
2625 0 : .unwrap();
2626 0 : writeln!(
2627 0 : output,
2628 0 : "when processing: [{}] horizon={}",
2629 0 : lsns.iter().map(|l| format!("{l}")).join(","),
2630 0 : horizon
2631 0 : )
2632 0 : .unwrap();
2633 0 : output
2634 0 : }
2635 :
2636 3888 : let mut key_exists = false;
2637 12444 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
2638 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
2639 12444 : records_since_last_image += split_for_lsn.len();
2640 : // Whether to produce an image into the final layer files
2641 12444 : let produce_image = if i == 0 && !has_ancestor {
2642 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
2643 3636 : true
2644 8808 : } else if i == batch_cnt - 1 {
2645 : // Do not generate images for the last batch (above horizon)
2646 3876 : false
2647 4932 : } else if records_since_last_image == 0 {
2648 3864 : false
2649 1068 : } else if records_since_last_image >= delta_threshold_cnt {
2650 : // Generate images when there are too many records
2651 36 : true
2652 : } else {
2653 1032 : false
2654 : };
2655 12444 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
2656 : // Only retain the items after the last image record
2657 15324 : for idx in (0..replay_history.len()).rev() {
2658 15324 : if replay_history[idx].2.will_init() {
2659 12444 : replay_history = replay_history[idx..].to_vec();
2660 12444 : break;
2661 2880 : }
2662 : }
2663 12444 : if replay_history.is_empty() && !key_exists {
2664 : // The key does not exist at earlier LSN, we can skip this iteration.
2665 0 : retention.push(Vec::new());
2666 0 : continue;
2667 12444 : } else {
2668 12444 : key_exists = true;
2669 12444 : }
2670 12444 : let Some((_, _, val)) = replay_history.first() else {
2671 0 : unreachable!("replay history should not be empty once it exists")
2672 : };
2673 12444 : if !val.will_init() {
2674 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
2675 0 : generate_debug_trace(
2676 0 : Some(&replay_history),
2677 0 : full_history,
2678 0 : retain_lsn_below_horizon,
2679 0 : horizon,
2680 0 : )
2681 0 : });
2682 12444 : }
2683 : // Whether to reconstruct the image. In debug mode, we will generate an image
2684 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
2685 : // image into the final layer.
2686 12444 : let img_and_lsn = if produce_image {
2687 3672 : records_since_last_image = 0;
2688 3672 : let replay_history_for_debug = if debug_mode {
2689 3672 : Some(replay_history.clone())
2690 : } else {
2691 0 : None
2692 : };
2693 3672 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
2694 3672 : let history = std::mem::take(&mut replay_history);
2695 3672 : let mut img = None;
2696 3672 : let mut records = Vec::with_capacity(history.len());
2697 3672 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
2698 3540 : img = Some((*lsn, val.clone()));
2699 3540 : for (_, lsn, val) in history.into_iter().skip(1) {
2700 240 : let Value::WalRecord(rec) = val else {
2701 0 : return Err(anyhow::anyhow!(
2702 0 : "invalid record, first record is image, expect walrecords"
2703 0 : ))
2704 0 : .with_context(|| {
2705 0 : generate_debug_trace(
2706 0 : replay_history_for_debug_ref,
2707 0 : full_history,
2708 0 : retain_lsn_below_horizon,
2709 0 : horizon,
2710 0 : )
2711 0 : });
2712 : };
2713 240 : records.push((lsn, rec));
2714 : }
2715 : } else {
2716 216 : for (_, lsn, val) in history.into_iter() {
2717 216 : let Value::WalRecord(rec) = val else {
2718 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2719 0 : .with_context(|| generate_debug_trace(
2720 0 : replay_history_for_debug_ref,
2721 0 : full_history,
2722 0 : retain_lsn_below_horizon,
2723 0 : horizon,
2724 0 : ));
2725 : };
2726 216 : records.push((lsn, rec));
2727 : }
2728 : }
2729 : // WAL redo requires records in the reverse LSN order
2730 3672 : records.reverse();
2731 3672 : let state = ValueReconstructState { img, records };
2732 : // last batch does not generate image so i is always in range, unless we force generate
2733 : // an image during testing
2734 3672 : let request_lsn = if i >= lsn_split_points.len() {
2735 0 : Lsn::MAX
2736 : } else {
2737 3672 : lsn_split_points[i]
2738 : };
2739 3672 : let img = self
2740 3672 : .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
2741 3672 : .await?;
2742 3660 : Some((request_lsn, img))
2743 : } else {
2744 8772 : None
2745 : };
2746 12432 : if produce_image {
2747 3660 : let (request_lsn, img) = img_and_lsn.unwrap();
2748 3660 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2749 3660 : retention.push(vec![(request_lsn, Value::Image(img))]);
2750 8772 : } else {
2751 8772 : let deltas = split_for_lsn
2752 8772 : .iter()
2753 8772 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2754 8772 : .collect_vec();
2755 8772 : retention.push(deltas);
2756 8772 : }
2757 : }
2758 3876 : let mut result = Vec::with_capacity(retention.len());
2759 3876 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2760 12432 : for (idx, logs) in retention.into_iter().enumerate() {
2761 12432 : if idx == lsn_split_points.len() {
2762 3876 : let retention = KeyHistoryRetention {
2763 3876 : below_horizon: result,
2764 3876 : above_horizon: KeyLogAtLsn(logs),
2765 3876 : };
2766 3876 : if verification {
2767 3876 : retention
2768 3876 : .verify(key, &base_img_from_ancestor, full_history, self)
2769 3876 : .await?;
2770 0 : }
2771 3876 : return Ok(retention);
2772 8556 : } else {
2773 8556 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2774 8556 : }
2775 : }
2776 0 : unreachable!("key retention is empty")
2777 3888 : }
2778 :
2779 : /// Check how much space is left on the disk
2780 324 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2781 324 : let tenants_dir = self.conf.tenants_path();
2782 :
2783 324 : let stat = Statvfs::get(&tenants_dir, None)
2784 324 : .context("statvfs failed, presumably directory got unlinked")?;
2785 :
2786 324 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2787 324 :
2788 324 : Ok(avail_bytes)
2789 324 : }
2790 :
2791 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2792 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2793 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2794 : /// compaction.
2795 324 : async fn check_compaction_space(
2796 324 : self: &Arc<Self>,
2797 324 : layer_selection: &[Layer],
2798 324 : ) -> Result<(), CompactionError> {
2799 324 : let available_space = self
2800 324 : .check_available_space()
2801 324 : .await
2802 324 : .map_err(CompactionError::Other)?;
2803 324 : let mut remote_layer_size = 0;
2804 324 : let mut all_layer_size = 0;
2805 1272 : for layer in layer_selection {
2806 948 : let needs_download = layer
2807 948 : .needs_download()
2808 948 : .await
2809 948 : .context("failed to check if layer needs download")
2810 948 : .map_err(CompactionError::Other)?;
2811 948 : if needs_download.is_some() {
2812 0 : remote_layer_size += layer.layer_desc().file_size;
2813 948 : }
2814 948 : all_layer_size += layer.layer_desc().file_size;
2815 : }
2816 324 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2817 324 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2818 : {
2819 0 : return Err(CompactionError::Other(anyhow!(
2820 0 : "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2821 0 : available_space,
2822 0 : allocated_space,
2823 0 : all_layer_size,
2824 0 : remote_layer_size,
2825 0 : all_layer_size + remote_layer_size
2826 0 : )));
2827 324 : }
2828 324 : Ok(())
2829 324 : }
2830 :
2831 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2832 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2833 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2834 : /// here.
2835 336 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2836 336 : let gc_cutoff_lsn = {
2837 336 : let gc_info = self.gc_info.read().unwrap();
2838 336 : gc_info.min_cutoff()
2839 336 : };
2840 336 :
2841 336 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2842 336 : // let watermark = watermark.min(self.standby_horizon.load());
2843 336 :
2844 336 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2845 336 : // them when computing the watermark.
2846 336 : gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
2847 336 : }
2848 :
2849 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2850 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2851 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2852 : /// like a full compaction of the specified keyspace.
2853 0 : pub(crate) async fn gc_compaction_split_jobs(
2854 0 : self: &Arc<Self>,
2855 0 : job: GcCompactJob,
2856 0 : sub_compaction_max_job_size_mb: Option<u64>,
2857 0 : ) -> Result<Vec<GcCompactJob>, CompactionError> {
2858 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2859 0 : job.compact_lsn_range.end
2860 : } else {
2861 0 : self.get_gc_compaction_watermark()
2862 : };
2863 :
2864 0 : if compact_below_lsn == Lsn::INVALID {
2865 0 : tracing::warn!(
2866 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2867 : );
2868 0 : return Ok(vec![]);
2869 0 : }
2870 :
2871 : // Split compaction job to about 4GB each
2872 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2873 0 : let sub_compaction_max_job_size_mb =
2874 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2875 0 :
2876 0 : let mut compact_jobs = Vec::<GcCompactJob>::new();
2877 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2878 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2879 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2880 : // Truncate the key range to be within user specified compaction range.
2881 0 : fn truncate_to(
2882 0 : source_start: &Key,
2883 0 : source_end: &Key,
2884 0 : target_start: &Key,
2885 0 : target_end: &Key,
2886 0 : ) -> Option<(Key, Key)> {
2887 0 : let start = source_start.max(target_start);
2888 0 : let end = source_end.min(target_end);
2889 0 : if start < end {
2890 0 : Some((*start, *end))
2891 : } else {
2892 0 : None
2893 : }
2894 0 : }
2895 0 : let mut split_key_ranges = Vec::new();
2896 0 : let ranges = dense_ks
2897 0 : .parts
2898 0 : .iter()
2899 0 : .map(|partition| partition.ranges.iter())
2900 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2901 0 : .flatten()
2902 0 : .cloned()
2903 0 : .collect_vec();
2904 0 : for range in ranges.iter() {
2905 0 : let Some((start, end)) = truncate_to(
2906 0 : &range.start,
2907 0 : &range.end,
2908 0 : &job.compact_key_range.start,
2909 0 : &job.compact_key_range.end,
2910 0 : ) else {
2911 0 : continue;
2912 : };
2913 0 : split_key_ranges.push((start, end));
2914 : }
2915 0 : split_key_ranges.sort();
2916 0 : let all_layers = {
2917 0 : let guard = self.layers.read().await;
2918 0 : let layer_map = guard.layer_map()?;
2919 0 : layer_map.iter_historic_layers().collect_vec()
2920 0 : };
2921 0 : let mut current_start = None;
2922 0 : let ranges_num = split_key_ranges.len();
2923 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2924 0 : if current_start.is_none() {
2925 0 : current_start = Some(start);
2926 0 : }
2927 0 : let start = current_start.unwrap();
2928 0 : if start >= end {
2929 : // We have already processed this partition.
2930 0 : continue;
2931 0 : }
2932 0 : let overlapping_layers = {
2933 0 : let mut desc = Vec::new();
2934 0 : for layer in all_layers.iter() {
2935 0 : if overlaps_with(&layer.get_key_range(), &(start..end))
2936 0 : && layer.get_lsn_range().start <= compact_below_lsn
2937 0 : {
2938 0 : desc.push(layer.clone());
2939 0 : }
2940 : }
2941 0 : desc
2942 0 : };
2943 0 : let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
2944 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2945 : // Try to extend the compaction range so that we include at least one full layer file.
2946 0 : let extended_end = overlapping_layers
2947 0 : .iter()
2948 0 : .map(|layer| layer.key_range.end)
2949 0 : .min();
2950 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2951 : // In this case, we simply use the specified key range end.
2952 0 : let end = if let Some(extended_end) = extended_end {
2953 0 : extended_end.max(end)
2954 : } else {
2955 0 : end
2956 : };
2957 0 : let end = if ranges_num == idx + 1 {
2958 : // extend the compaction range to the end of the key range if it's the last partition
2959 0 : end.max(job.compact_key_range.end)
2960 : } else {
2961 0 : end
2962 : };
2963 0 : if total_size == 0 && !compact_jobs.is_empty() {
2964 0 : info!(
2965 0 : "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
2966 : start, end, total_size
2967 : );
2968 0 : compact_jobs.last_mut().unwrap().compact_key_range.end = end;
2969 0 : current_start = Some(end);
2970 : } else {
2971 0 : info!(
2972 0 : "splitting compaction job: {}..{}, estimated_size={}",
2973 : start, end, total_size
2974 : );
2975 0 : compact_jobs.push(GcCompactJob {
2976 0 : dry_run: job.dry_run,
2977 0 : compact_key_range: start..end,
2978 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
2979 0 : });
2980 0 : current_start = Some(end);
2981 : }
2982 0 : }
2983 : }
2984 0 : Ok(compact_jobs)
2985 0 : }
2986 :
2987 : /// An experimental compaction building block that combines compaction with garbage collection.
2988 : ///
2989 : /// The current implementation picks all delta + image layers that are below or intersecting with
2990 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
2991 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
2992 : /// and create delta layers with all deltas >= gc horizon.
2993 : ///
2994 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
2995 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
2996 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
2997 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
2998 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
2999 : /// part of the range.
3000 : ///
3001 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
3002 : /// the LSN. Otherwise, it will use the gc cutoff by default.
3003 336 : pub(crate) async fn compact_with_gc(
3004 336 : self: &Arc<Self>,
3005 336 : cancel: &CancellationToken,
3006 336 : options: CompactOptions,
3007 336 : ctx: &RequestContext,
3008 336 : ) -> Result<CompactionOutcome, CompactionError> {
3009 336 : let sub_compaction = options.sub_compaction;
3010 336 : let job = GcCompactJob::from_compact_options(options.clone());
3011 336 : let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
3012 336 : if sub_compaction {
3013 0 : info!(
3014 0 : "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
3015 : );
3016 0 : let jobs = self
3017 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
3018 0 : .await?;
3019 0 : let jobs_len = jobs.len();
3020 0 : for (idx, job) in jobs.into_iter().enumerate() {
3021 0 : info!(
3022 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
3023 0 : idx + 1,
3024 : jobs_len
3025 : );
3026 0 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
3027 0 : .await?;
3028 : }
3029 0 : if jobs_len == 0 {
3030 0 : info!("no jobs to run, skipping gc bottom-most compaction");
3031 0 : }
3032 0 : return Ok(CompactionOutcome::Done);
3033 336 : }
3034 336 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
3035 336 : .await
3036 336 : }
3037 :
3038 336 : async fn compact_with_gc_inner(
3039 336 : self: &Arc<Self>,
3040 336 : cancel: &CancellationToken,
3041 336 : job: GcCompactJob,
3042 336 : ctx: &RequestContext,
3043 336 : yield_for_l0: bool,
3044 336 : ) -> Result<CompactionOutcome, CompactionError> {
3045 336 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
3046 336 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
3047 336 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
3048 336 :
3049 336 : let timer = Instant::now();
3050 336 : let begin_timer = timer;
3051 336 :
3052 336 : let gc_lock = async {
3053 336 : tokio::select! {
3054 336 : guard = self.gc_lock.lock() => Ok(guard),
3055 336 : _ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
3056 : }
3057 336 : };
3058 :
3059 336 : let time_acquire_lock = timer.elapsed();
3060 336 : let timer = Instant::now();
3061 :
3062 336 : let gc_lock = crate::timed(
3063 336 : gc_lock,
3064 336 : "acquires gc lock",
3065 336 : std::time::Duration::from_secs(5),
3066 336 : )
3067 336 : .await?;
3068 :
3069 336 : let dry_run = job.dry_run;
3070 336 : let compact_key_range = job.compact_key_range;
3071 336 : let compact_lsn_range = job.compact_lsn_range;
3072 :
3073 336 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
3074 :
3075 336 : info!(
3076 0 : "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
3077 : compact_key_range.start,
3078 : compact_key_range.end,
3079 : compact_lsn_range.start,
3080 : compact_lsn_range.end
3081 : );
3082 :
3083 336 : scopeguard::defer! {
3084 336 : info!("done enhanced gc bottom-most compaction");
3085 336 : };
3086 336 :
3087 336 : let mut stat = CompactionStatistics::default();
3088 :
3089 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
3090 : // The layer selection has the following properties:
3091 : // 1. If a layer is in the selection, all layers below it are in the selection.
3092 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
3093 324 : let job_desc = {
3094 336 : let guard = self.layers.read().await;
3095 336 : let layers = guard.layer_map()?;
3096 336 : let gc_info = self.gc_info.read().unwrap();
3097 336 : let mut retain_lsns_below_horizon = Vec::new();
3098 336 : let gc_cutoff = {
3099 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
3100 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
3101 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
3102 : // to get the truth data.
3103 336 : let real_gc_cutoff = self.get_gc_compaction_watermark();
3104 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
3105 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
3106 : // the real cutoff.
3107 336 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
3108 300 : if real_gc_cutoff == Lsn::INVALID {
3109 : // If the gc_cutoff is not generated yet, we should not compact anything.
3110 0 : tracing::warn!(
3111 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
3112 : );
3113 0 : return Ok(CompactionOutcome::Skipped);
3114 300 : }
3115 300 : real_gc_cutoff
3116 : } else {
3117 36 : compact_lsn_range.end
3118 : };
3119 336 : if gc_cutoff > real_gc_cutoff {
3120 24 : warn!(
3121 0 : "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
3122 : gc_cutoff, real_gc_cutoff
3123 : );
3124 24 : gc_cutoff = real_gc_cutoff;
3125 312 : }
3126 336 : gc_cutoff
3127 : };
3128 420 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
3129 420 : if lsn < &gc_cutoff {
3130 420 : retain_lsns_below_horizon.push(*lsn);
3131 420 : }
3132 : }
3133 336 : for lsn in gc_info.leases.keys() {
3134 0 : if lsn < &gc_cutoff {
3135 0 : retain_lsns_below_horizon.push(*lsn);
3136 0 : }
3137 : }
3138 336 : let mut selected_layers: Vec<Layer> = Vec::new();
3139 336 : drop(gc_info);
3140 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
3141 336 : let Some(max_layer_lsn) = layers
3142 336 : .iter_historic_layers()
3143 1500 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
3144 1284 : .map(|desc| desc.get_lsn_range().end)
3145 336 : .max()
3146 : else {
3147 0 : info!(
3148 0 : "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
3149 : gc_cutoff
3150 : );
3151 0 : return Ok(CompactionOutcome::Done);
3152 : };
3153 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
3154 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
3155 : // it is a branch.
3156 336 : let Some(min_layer_lsn) = layers
3157 336 : .iter_historic_layers()
3158 1500 : .filter(|desc| {
3159 1500 : if compact_lsn_range.start == Lsn::INVALID {
3160 1224 : true // select all layers below if start == Lsn(0)
3161 : } else {
3162 276 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
3163 : }
3164 1500 : })
3165 1392 : .map(|desc| desc.get_lsn_range().start)
3166 336 : .min()
3167 : else {
3168 0 : info!(
3169 0 : "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
3170 : compact_lsn_range.end
3171 : );
3172 0 : return Ok(CompactionOutcome::Done);
3173 : };
3174 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
3175 : // layers to compact.
3176 336 : let mut rewrite_layers = Vec::new();
3177 1500 : for desc in layers.iter_historic_layers() {
3178 1500 : if desc.get_lsn_range().end <= max_layer_lsn
3179 1284 : && desc.get_lsn_range().start >= min_layer_lsn
3180 1176 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
3181 : {
3182 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
3183 : // even if it might contain extra keys
3184 948 : selected_layers.push(guard.get_from_desc(&desc));
3185 948 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
3186 948 : // to overlap image layers)
3187 948 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
3188 12 : {
3189 12 : rewrite_layers.push(desc);
3190 936 : }
3191 552 : }
3192 : }
3193 336 : if selected_layers.is_empty() {
3194 12 : info!(
3195 0 : "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
3196 : gc_cutoff, compact_key_range.start, compact_key_range.end
3197 : );
3198 12 : return Ok(CompactionOutcome::Done);
3199 324 : }
3200 324 : retain_lsns_below_horizon.sort();
3201 324 : GcCompactionJobDescription {
3202 324 : selected_layers,
3203 324 : gc_cutoff,
3204 324 : retain_lsns_below_horizon,
3205 324 : min_layer_lsn,
3206 324 : max_layer_lsn,
3207 324 : compaction_key_range: compact_key_range,
3208 324 : rewrite_layers,
3209 324 : }
3210 : };
3211 324 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
3212 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
3213 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
3214 48 : (true, job_desc.min_layer_lsn)
3215 276 : } else if self.ancestor_timeline.is_some() {
3216 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
3217 : // LSN ranges all the way to the ancestor timeline.
3218 12 : (true, self.ancestor_lsn)
3219 : } else {
3220 264 : let res = job_desc
3221 264 : .retain_lsns_below_horizon
3222 264 : .first()
3223 264 : .copied()
3224 264 : .unwrap_or(job_desc.gc_cutoff);
3225 264 : if debug_mode {
3226 264 : assert_eq!(
3227 264 : res,
3228 264 : job_desc
3229 264 : .retain_lsns_below_horizon
3230 264 : .iter()
3231 264 : .min()
3232 264 : .copied()
3233 264 : .unwrap_or(job_desc.gc_cutoff)
3234 264 : );
3235 0 : }
3236 264 : (false, res)
3237 : };
3238 :
3239 324 : let verification = self.get_gc_compaction_settings().gc_compaction_verification;
3240 324 :
3241 324 : info!(
3242 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
3243 0 : job_desc.selected_layers.len(),
3244 0 : job_desc.rewrite_layers.len(),
3245 : job_desc.max_layer_lsn,
3246 : job_desc.min_layer_lsn,
3247 : job_desc.gc_cutoff,
3248 : lowest_retain_lsn,
3249 : job_desc.compaction_key_range.start,
3250 : job_desc.compaction_key_range.end,
3251 : has_data_below,
3252 : );
3253 :
3254 324 : let time_analyze = timer.elapsed();
3255 324 : let timer = Instant::now();
3256 :
3257 1272 : for layer in &job_desc.selected_layers {
3258 948 : debug!("read layer: {}", layer.layer_desc().key());
3259 : }
3260 336 : for layer in &job_desc.rewrite_layers {
3261 12 : debug!("rewrite layer: {}", layer.key());
3262 : }
3263 :
3264 324 : self.check_compaction_space(&job_desc.selected_layers)
3265 324 : .await?;
3266 :
3267 : // Generate statistics for the compaction
3268 1272 : for layer in &job_desc.selected_layers {
3269 948 : let desc = layer.layer_desc();
3270 948 : if desc.is_delta() {
3271 528 : stat.visit_delta_layer(desc.file_size());
3272 528 : } else {
3273 420 : stat.visit_image_layer(desc.file_size());
3274 420 : }
3275 : }
3276 :
3277 : // Step 1: construct a k-merge iterator over all layers.
3278 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
3279 324 : let layer_names = job_desc
3280 324 : .selected_layers
3281 324 : .iter()
3282 948 : .map(|layer| layer.layer_desc().layer_name())
3283 324 : .collect_vec();
3284 324 : if let Some(err) = check_valid_layermap(&layer_names) {
3285 0 : return Err(CompactionError::Other(anyhow!(
3286 0 : "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
3287 0 : err
3288 0 : )));
3289 324 : }
3290 324 : // The maximum LSN we are processing in this compaction loop
3291 324 : let end_lsn = job_desc
3292 324 : .selected_layers
3293 324 : .iter()
3294 948 : .map(|l| l.layer_desc().lsn_range.end)
3295 324 : .max()
3296 324 : .unwrap();
3297 324 : let mut delta_layers = Vec::new();
3298 324 : let mut image_layers = Vec::new();
3299 324 : let mut downloaded_layers = Vec::new();
3300 324 : let mut total_downloaded_size = 0;
3301 324 : let mut total_layer_size = 0;
3302 1272 : for layer in &job_desc.selected_layers {
3303 948 : if layer
3304 948 : .needs_download()
3305 948 : .await
3306 948 : .context("failed to check if layer needs download")
3307 948 : .map_err(CompactionError::Other)?
3308 948 : .is_some()
3309 0 : {
3310 0 : total_downloaded_size += layer.layer_desc().file_size;
3311 948 : }
3312 948 : total_layer_size += layer.layer_desc().file_size;
3313 948 : if cancel.is_cancelled() {
3314 0 : return Err(CompactionError::ShuttingDown);
3315 948 : }
3316 948 : let should_yield = yield_for_l0
3317 0 : && self
3318 0 : .l0_compaction_trigger
3319 0 : .notified()
3320 0 : .now_or_never()
3321 0 : .is_some();
3322 948 : if should_yield {
3323 0 : tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
3324 0 : return Ok(CompactionOutcome::YieldForL0);
3325 948 : }
3326 948 : let resident_layer = layer
3327 948 : .download_and_keep_resident(ctx)
3328 948 : .await
3329 948 : .context("failed to download and keep resident layer")
3330 948 : .map_err(CompactionError::Other)?;
3331 948 : downloaded_layers.push(resident_layer);
3332 : }
3333 324 : info!(
3334 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
3335 0 : total_downloaded_size,
3336 0 : total_layer_size,
3337 0 : total_downloaded_size as f64 / total_layer_size as f64
3338 : );
3339 1272 : for resident_layer in &downloaded_layers {
3340 948 : if resident_layer.layer_desc().is_delta() {
3341 528 : let layer = resident_layer
3342 528 : .get_as_delta(ctx)
3343 528 : .await
3344 528 : .context("failed to get delta layer")
3345 528 : .map_err(CompactionError::Other)?;
3346 528 : delta_layers.push(layer);
3347 : } else {
3348 420 : let layer = resident_layer
3349 420 : .get_as_image(ctx)
3350 420 : .await
3351 420 : .context("failed to get image layer")
3352 420 : .map_err(CompactionError::Other)?;
3353 420 : image_layers.push(layer);
3354 : }
3355 : }
3356 324 : let (dense_ks, sparse_ks) = self
3357 324 : .collect_gc_compaction_keyspace()
3358 324 : .await
3359 324 : .context("failed to collect gc compaction keyspace")
3360 324 : .map_err(CompactionError::Other)?;
3361 324 : let mut merge_iter = FilterIterator::create(
3362 324 : MergeIterator::create(&delta_layers, &image_layers, ctx),
3363 324 : dense_ks,
3364 324 : sparse_ks,
3365 324 : )
3366 324 : .context("failed to create filter iterator")
3367 324 : .map_err(CompactionError::Other)?;
3368 :
3369 324 : let time_download_layer = timer.elapsed();
3370 324 : let mut timer = Instant::now();
3371 324 :
3372 324 : // Step 2: Produce images+deltas.
3373 324 : let mut accumulated_values = Vec::new();
3374 324 : let mut last_key: Option<Key> = None;
3375 :
3376 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
3377 : // when some condition meet.
3378 324 : let mut image_layer_writer = if !has_data_below {
3379 : Some(
3380 264 : SplitImageLayerWriter::new(
3381 264 : self.conf,
3382 264 : self.timeline_id,
3383 264 : self.tenant_shard_id,
3384 264 : job_desc.compaction_key_range.start,
3385 264 : lowest_retain_lsn,
3386 264 : self.get_compaction_target_size(),
3387 264 : &self.gate,
3388 264 : self.cancel.clone(),
3389 264 : ctx,
3390 264 : )
3391 264 : .await
3392 264 : .context("failed to create image layer writer")
3393 264 : .map_err(CompactionError::Other)?,
3394 : )
3395 : } else {
3396 60 : None
3397 : };
3398 :
3399 324 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
3400 324 : self.conf,
3401 324 : self.timeline_id,
3402 324 : self.tenant_shard_id,
3403 324 : lowest_retain_lsn..end_lsn,
3404 324 : self.get_compaction_target_size(),
3405 324 : &self.gate,
3406 324 : self.cancel.clone(),
3407 324 : )
3408 324 : .await
3409 324 : .context("failed to create delta layer writer")
3410 324 : .map_err(CompactionError::Other)?;
3411 :
3412 : #[derive(Default)]
3413 : struct RewritingLayers {
3414 : before: Option<DeltaLayerWriter>,
3415 : after: Option<DeltaLayerWriter>,
3416 : }
3417 324 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
3418 :
3419 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
3420 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
3421 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
3422 : ///
3423 : /// This function unifies the cases so that later code doesn't have to think about it.
3424 : ///
3425 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
3426 : /// is needed for reconstruction. This should be fixed in the future.
3427 : ///
3428 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
3429 : /// images.
3430 3840 : async fn get_ancestor_image(
3431 3840 : this_tline: &Arc<Timeline>,
3432 3840 : key: Key,
3433 3840 : ctx: &RequestContext,
3434 3840 : has_data_below: bool,
3435 3840 : history_lsn_point: Lsn,
3436 3840 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
3437 3840 : if !has_data_below {
3438 3612 : return Ok(None);
3439 228 : };
3440 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
3441 : // as much existing code as possible.
3442 228 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
3443 228 : Ok(Some((key, history_lsn_point, img)))
3444 3840 : }
3445 :
3446 : // Actually, we can decide not to write to the image layer at all at this point because
3447 : // the key and LSN range are determined. However, to keep things simple here, we still
3448 : // create this writer, and discard the writer in the end.
3449 324 : let mut time_to_first_kv_pair = None;
3450 :
3451 5952 : while let Some(((key, lsn, val), desc)) = merge_iter
3452 5952 : .next_with_trace()
3453 5952 : .await
3454 5952 : .context("failed to get next key-value pair")
3455 5952 : .map_err(CompactionError::Other)?
3456 : {
3457 5640 : if time_to_first_kv_pair.is_none() {
3458 324 : time_to_first_kv_pair = Some(timer.elapsed());
3459 324 : timer = Instant::now();
3460 5316 : }
3461 :
3462 5640 : if cancel.is_cancelled() {
3463 0 : return Err(CompactionError::ShuttingDown);
3464 5640 : }
3465 :
3466 5640 : let should_yield = yield_for_l0
3467 0 : && self
3468 0 : .l0_compaction_trigger
3469 0 : .notified()
3470 0 : .now_or_never()
3471 0 : .is_some();
3472 5640 : if should_yield {
3473 0 : tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
3474 0 : return Ok(CompactionOutcome::YieldForL0);
3475 5640 : }
3476 5640 : if self.shard_identity.is_key_disposable(&key) {
3477 : // If this shard does not need to store this key, simply skip it.
3478 : //
3479 : // This is not handled in the filter iterator because shard is determined by hash.
3480 : // Therefore, it does not give us any performance benefit to do things like skip
3481 : // a whole layer file as handling key spaces (ranges).
3482 0 : if cfg!(debug_assertions) {
3483 0 : let shard = self.shard_identity.shard_index();
3484 0 : let owner = self.shard_identity.get_shard_number(&key);
3485 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
3486 0 : }
3487 0 : continue;
3488 5640 : }
3489 5640 : if !job_desc.compaction_key_range.contains(&key) {
3490 384 : if !desc.is_delta {
3491 360 : continue;
3492 24 : }
3493 24 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
3494 24 : let rewriter = if key < job_desc.compaction_key_range.start {
3495 0 : if rewriter.before.is_none() {
3496 0 : rewriter.before = Some(
3497 0 : DeltaLayerWriter::new(
3498 0 : self.conf,
3499 0 : self.timeline_id,
3500 0 : self.tenant_shard_id,
3501 0 : desc.key_range.start,
3502 0 : desc.lsn_range.clone(),
3503 0 : &self.gate,
3504 0 : self.cancel.clone(),
3505 0 : ctx,
3506 0 : )
3507 0 : .await
3508 0 : .context("failed to create delta layer writer")
3509 0 : .map_err(CompactionError::Other)?,
3510 : );
3511 0 : }
3512 0 : rewriter.before.as_mut().unwrap()
3513 24 : } else if key >= job_desc.compaction_key_range.end {
3514 24 : if rewriter.after.is_none() {
3515 12 : rewriter.after = Some(
3516 12 : DeltaLayerWriter::new(
3517 12 : self.conf,
3518 12 : self.timeline_id,
3519 12 : self.tenant_shard_id,
3520 12 : job_desc.compaction_key_range.end,
3521 12 : desc.lsn_range.clone(),
3522 12 : &self.gate,
3523 12 : self.cancel.clone(),
3524 12 : ctx,
3525 12 : )
3526 12 : .await
3527 12 : .context("failed to create delta layer writer")
3528 12 : .map_err(CompactionError::Other)?,
3529 : );
3530 12 : }
3531 24 : rewriter.after.as_mut().unwrap()
3532 : } else {
3533 0 : unreachable!()
3534 : };
3535 24 : rewriter
3536 24 : .put_value(key, lsn, val, ctx)
3537 24 : .await
3538 24 : .context("failed to put value")
3539 24 : .map_err(CompactionError::Other)?;
3540 24 : continue;
3541 5256 : }
3542 5256 : match val {
3543 3780 : Value::Image(_) => stat.visit_image_key(&val),
3544 1476 : Value::WalRecord(_) => stat.visit_wal_key(&val),
3545 : }
3546 5256 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
3547 1728 : if last_key.is_none() {
3548 324 : last_key = Some(key);
3549 1404 : }
3550 1728 : accumulated_values.push((key, lsn, val));
3551 : } else {
3552 3528 : let last_key: &mut Key = last_key.as_mut().unwrap();
3553 3528 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
3554 3528 : let retention = self
3555 3528 : .generate_key_retention(
3556 3528 : *last_key,
3557 3528 : &accumulated_values,
3558 3528 : job_desc.gc_cutoff,
3559 3528 : &job_desc.retain_lsns_below_horizon,
3560 3528 : COMPACTION_DELTA_THRESHOLD,
3561 3528 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
3562 3528 : .await
3563 3528 : .context("failed to get ancestor image")
3564 3528 : .map_err(CompactionError::Other)?,
3565 3528 : verification,
3566 3528 : )
3567 3528 : .await
3568 3528 : .context("failed to generate key retention")
3569 3528 : .map_err(CompactionError::Other)?;
3570 3516 : retention
3571 3516 : .pipe_to(
3572 3516 : *last_key,
3573 3516 : &mut delta_layer_writer,
3574 3516 : image_layer_writer.as_mut(),
3575 3516 : &mut stat,
3576 3516 : ctx,
3577 3516 : )
3578 3516 : .await
3579 3516 : .context("failed to pipe to delta layer writer")
3580 3516 : .map_err(CompactionError::Other)?;
3581 3516 : accumulated_values.clear();
3582 3516 : *last_key = key;
3583 3516 : accumulated_values.push((key, lsn, val));
3584 : }
3585 : }
3586 :
3587 : // TODO: move the below part to the loop body
3588 312 : let Some(last_key) = last_key else {
3589 0 : return Err(CompactionError::Other(anyhow!(
3590 0 : "no keys produced during compaction"
3591 0 : )));
3592 : };
3593 312 : stat.on_unique_key_visited();
3594 :
3595 312 : let retention = self
3596 312 : .generate_key_retention(
3597 312 : last_key,
3598 312 : &accumulated_values,
3599 312 : job_desc.gc_cutoff,
3600 312 : &job_desc.retain_lsns_below_horizon,
3601 312 : COMPACTION_DELTA_THRESHOLD,
3602 312 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
3603 312 : .await
3604 312 : .context("failed to get ancestor image")
3605 312 : .map_err(CompactionError::Other)?,
3606 312 : verification,
3607 312 : )
3608 312 : .await
3609 312 : .context("failed to generate key retention")
3610 312 : .map_err(CompactionError::Other)?;
3611 312 : retention
3612 312 : .pipe_to(
3613 312 : last_key,
3614 312 : &mut delta_layer_writer,
3615 312 : image_layer_writer.as_mut(),
3616 312 : &mut stat,
3617 312 : ctx,
3618 312 : )
3619 312 : .await
3620 312 : .context("failed to pipe to delta layer writer")
3621 312 : .map_err(CompactionError::Other)?;
3622 : // end: move the above part to the loop body
3623 :
3624 312 : let time_main_loop = timer.elapsed();
3625 312 : let timer = Instant::now();
3626 312 :
3627 312 : let mut rewrote_delta_layers = Vec::new();
3628 324 : for (key, writers) in delta_layer_rewriters {
3629 12 : if let Some(delta_writer_before) = writers.before {
3630 0 : let (desc, path) = delta_writer_before
3631 0 : .finish(job_desc.compaction_key_range.start, ctx)
3632 0 : .await
3633 0 : .context("failed to finish delta layer writer")
3634 0 : .map_err(CompactionError::Other)?;
3635 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3636 0 : .context("failed to finish creating delta layer")
3637 0 : .map_err(CompactionError::Other)?;
3638 0 : rewrote_delta_layers.push(layer);
3639 12 : }
3640 12 : if let Some(delta_writer_after) = writers.after {
3641 12 : let (desc, path) = delta_writer_after
3642 12 : .finish(key.key_range.end, ctx)
3643 12 : .await
3644 12 : .context("failed to finish delta layer writer")
3645 12 : .map_err(CompactionError::Other)?;
3646 12 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3647 12 : .context("failed to finish creating delta layer")
3648 12 : .map_err(CompactionError::Other)?;
3649 12 : rewrote_delta_layers.push(layer);
3650 0 : }
3651 : }
3652 :
3653 444 : let discard = |key: &PersistentLayerKey| {
3654 444 : let key = key.clone();
3655 444 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
3656 444 : };
3657 :
3658 312 : let produced_image_layers = if let Some(writer) = image_layer_writer {
3659 252 : if !dry_run {
3660 228 : let end_key = job_desc.compaction_key_range.end;
3661 228 : writer
3662 228 : .finish_with_discard_fn(self, ctx, end_key, discard)
3663 228 : .await
3664 228 : .context("failed to finish image layer writer")
3665 228 : .map_err(CompactionError::Other)?
3666 : } else {
3667 24 : drop(writer);
3668 24 : Vec::new()
3669 : }
3670 : } else {
3671 60 : Vec::new()
3672 : };
3673 :
3674 312 : let produced_delta_layers = if !dry_run {
3675 288 : delta_layer_writer
3676 288 : .finish_with_discard_fn(self, ctx, discard)
3677 288 : .await
3678 288 : .context("failed to finish delta layer writer")
3679 288 : .map_err(CompactionError::Other)?
3680 : } else {
3681 24 : drop(delta_layer_writer);
3682 24 : Vec::new()
3683 : };
3684 :
3685 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
3686 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
3687 312 : let mut compact_to = Vec::new();
3688 312 : let mut keep_layers = HashSet::new();
3689 312 : let produced_delta_layers_len = produced_delta_layers.len();
3690 312 : let produced_image_layers_len = produced_image_layers.len();
3691 312 :
3692 312 : let layer_selection_by_key = job_desc
3693 312 : .selected_layers
3694 312 : .iter()
3695 912 : .map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
3696 312 : .collect::<HashMap<_, _>>();
3697 :
3698 528 : for action in produced_delta_layers {
3699 216 : match action {
3700 132 : BatchWriterResult::Produced(layer) => {
3701 132 : if cfg!(debug_assertions) {
3702 132 : info!("produced delta layer: {}", layer.layer_desc().key());
3703 0 : }
3704 132 : stat.produce_delta_layer(layer.layer_desc().file_size());
3705 132 : compact_to.push(layer);
3706 : }
3707 84 : BatchWriterResult::Discarded(l) => {
3708 84 : if cfg!(debug_assertions) {
3709 84 : info!("discarded delta layer: {}", l);
3710 0 : }
3711 84 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3712 84 : stat.discard_delta_layer(layer_desc.file_size());
3713 84 : } else {
3714 0 : tracing::warn!(
3715 0 : "discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3716 : l
3717 : );
3718 0 : stat.discard_delta_layer(0);
3719 : }
3720 84 : keep_layers.insert(l);
3721 : }
3722 : }
3723 : }
3724 324 : for layer in &rewrote_delta_layers {
3725 12 : debug!(
3726 0 : "produced rewritten delta layer: {}",
3727 0 : layer.layer_desc().key()
3728 : );
3729 : // For now, we include rewritten delta layer size in the "produce_delta_layer". We could
3730 : // make it a separate statistics in the future.
3731 12 : stat.produce_delta_layer(layer.layer_desc().file_size());
3732 : }
3733 312 : compact_to.extend(rewrote_delta_layers);
3734 540 : for action in produced_image_layers {
3735 228 : match action {
3736 180 : BatchWriterResult::Produced(layer) => {
3737 180 : debug!("produced image layer: {}", layer.layer_desc().key());
3738 180 : stat.produce_image_layer(layer.layer_desc().file_size());
3739 180 : compact_to.push(layer);
3740 : }
3741 48 : BatchWriterResult::Discarded(l) => {
3742 48 : debug!("discarded image layer: {}", l);
3743 48 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3744 48 : stat.discard_image_layer(layer_desc.file_size());
3745 48 : } else {
3746 0 : tracing::warn!(
3747 0 : "discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3748 : l
3749 : );
3750 0 : stat.discard_image_layer(0);
3751 : }
3752 48 : keep_layers.insert(l);
3753 : }
3754 : }
3755 : }
3756 :
3757 312 : let mut layer_selection = job_desc.selected_layers;
3758 :
3759 : // Partial compaction might select more data than it processes, e.g., if
3760 : // the compaction_key_range only partially overlaps:
3761 : //
3762 : // [---compaction_key_range---]
3763 : // [---A----][----B----][----C----][----D----]
3764 : //
3765 : // For delta layers, we will rewrite the layers so that it is cut exactly at
3766 : // the compaction key range, so we can always discard them. However, for image
3767 : // layers, as we do not rewrite them for now, we need to handle them differently.
3768 : // Assume image layers A, B, C, D are all in the `layer_selection`.
3769 : //
3770 : // The created image layers contain whatever is needed from B, C, and from
3771 : // `----]` of A, and from `[---` of D.
3772 : //
3773 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
3774 : // keep that data.
3775 : //
3776 : // The solution for now is to keep A and D completely if they are image layers.
3777 : // (layer_selection is what we'll remove from the layer map, so, retain what
3778 : // is _not_ fully covered by compaction_key_range).
3779 1224 : for layer in &layer_selection {
3780 912 : if !layer.layer_desc().is_delta() {
3781 396 : if !overlaps_with(
3782 396 : &layer.layer_desc().key_range,
3783 396 : &job_desc.compaction_key_range,
3784 396 : ) {
3785 0 : return Err(CompactionError::Other(anyhow!(
3786 0 : "violated constraint: image layer outside of compaction key range"
3787 0 : )));
3788 396 : }
3789 396 : if !fully_contains(
3790 396 : &job_desc.compaction_key_range,
3791 396 : &layer.layer_desc().key_range,
3792 396 : ) {
3793 48 : keep_layers.insert(layer.layer_desc().key());
3794 348 : }
3795 516 : }
3796 : }
3797 :
3798 912 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
3799 312 :
3800 312 : let time_final_phase = timer.elapsed();
3801 312 :
3802 312 : stat.time_final_phase_secs = time_final_phase.as_secs_f64();
3803 312 : stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
3804 312 : .unwrap_or(Duration::ZERO)
3805 312 : .as_secs_f64();
3806 312 : stat.time_main_loop_secs = time_main_loop.as_secs_f64();
3807 312 : stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
3808 312 : stat.time_download_layer_secs = time_download_layer.as_secs_f64();
3809 312 : stat.time_analyze_secs = time_analyze.as_secs_f64();
3810 312 : stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
3811 312 : stat.finalize();
3812 312 :
3813 312 : info!(
3814 0 : "gc-compaction statistics: {}",
3815 0 : serde_json::to_string(&stat)
3816 0 : .context("failed to serialize gc-compaction statistics")
3817 0 : .map_err(CompactionError::Other)?
3818 : );
3819 :
3820 312 : if dry_run {
3821 24 : return Ok(CompactionOutcome::Done);
3822 288 : }
3823 288 :
3824 288 : info!(
3825 0 : "produced {} delta layers and {} image layers, {} layers are kept",
3826 0 : produced_delta_layers_len,
3827 0 : produced_image_layers_len,
3828 0 : keep_layers.len()
3829 : );
3830 :
3831 : // Step 3: Place back to the layer map.
3832 :
3833 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
3834 288 : let all_layers = {
3835 288 : let guard = self.layers.read().await;
3836 288 : let layer_map = guard.layer_map()?;
3837 288 : layer_map.iter_historic_layers().collect_vec()
3838 288 : };
3839 288 :
3840 288 : let mut final_layers = all_layers
3841 288 : .iter()
3842 1284 : .map(|layer| layer.layer_name())
3843 288 : .collect::<HashSet<_>>();
3844 912 : for layer in &layer_selection {
3845 624 : final_layers.remove(&layer.layer_desc().layer_name());
3846 624 : }
3847 612 : for layer in &compact_to {
3848 324 : final_layers.insert(layer.layer_desc().layer_name());
3849 324 : }
3850 288 : let final_layers = final_layers.into_iter().collect_vec();
3851 :
3852 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
3853 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
3854 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
3855 288 : if let Some(err) = check_valid_layermap(&final_layers) {
3856 0 : return Err(CompactionError::Other(anyhow!(
3857 0 : "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
3858 0 : err
3859 0 : )));
3860 288 : }
3861 :
3862 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
3863 : // operate on L1 layers.
3864 : {
3865 : // Gc-compaction will rewrite the history of a key. This could happen in two ways:
3866 : //
3867 : // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
3868 : // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
3869 : // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
3870 : // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
3871 : // now gets stored in I at the compact LSN.
3872 : //
3873 : // --------------- ---------------
3874 : // delta1@LSN20 image1@LSN20
3875 : // --------------- (read path collects delta@LSN20, => --------------- (read path cannot find anything
3876 : // delta1@LSN10 yields) below LSN 20)
3877 : // ---------------
3878 : //
3879 : // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
3880 : // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
3881 : // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
3882 : //
3883 : // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
3884 : // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
3885 : // layer containing 4, yields, and we update the layer map to put the delta layer.
3886 : //
3887 : // --------------- ---------------
3888 : // delta1@LSN4 image1@LSN4
3889 : // --------------- (read path collects delta@LSN4, => --------------- (read path collects LSN4 and LSN1,
3890 : // delta1@LSN1-3 yields) delta1@LSN1 which is an invalid history)
3891 : // --------------- ---------------
3892 : //
3893 : // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
3894 : // and only allow reads to continue after the update is finished.
3895 :
3896 288 : let update_guard = self.gc_compaction_layer_update_lock.write().await;
3897 : // Acquiring the update guard ensures current read operations end and new read operations are blocked.
3898 : // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
3899 288 : let mut guard = self.layers.write().await;
3900 288 : guard
3901 288 : .open_mut()?
3902 288 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
3903 288 : drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
3904 288 : };
3905 288 :
3906 288 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
3907 288 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
3908 288 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
3909 288 : // be batched into `schedule_compaction_update`.
3910 288 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
3911 288 : self.schedule_uploads(disk_consistent_lsn, None)
3912 288 : .context("failed to schedule uploads")
3913 288 : .map_err(CompactionError::Other)?;
3914 : // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
3915 : // of `compact_from`.
3916 288 : let compact_from = {
3917 288 : let mut compact_from = Vec::new();
3918 288 : let mut compact_to_set = HashMap::new();
3919 612 : for layer in &compact_to {
3920 324 : compact_to_set.insert(layer.layer_desc().key(), layer);
3921 324 : }
3922 912 : for layer in &layer_selection {
3923 624 : if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
3924 0 : tracing::info!(
3925 0 : "skipping delete {} because found same layer key at different generation {}",
3926 : layer,
3927 : to
3928 : );
3929 624 : } else {
3930 624 : compact_from.push(layer.clone());
3931 624 : }
3932 : }
3933 288 : compact_from
3934 288 : };
3935 288 : self.remote_client
3936 288 : .schedule_compaction_update(&compact_from, &compact_to)?;
3937 :
3938 288 : drop(gc_lock);
3939 288 :
3940 288 : Ok(CompactionOutcome::Done)
3941 336 : }
3942 : }
3943 :
3944 : struct TimelineAdaptor {
3945 : timeline: Arc<Timeline>,
3946 :
3947 : keyspace: (Lsn, KeySpace),
3948 :
3949 : new_deltas: Vec<ResidentLayer>,
3950 : new_images: Vec<ResidentLayer>,
3951 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
3952 : }
3953 :
3954 : impl TimelineAdaptor {
3955 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
3956 0 : Self {
3957 0 : timeline: timeline.clone(),
3958 0 : keyspace,
3959 0 : new_images: Vec::new(),
3960 0 : new_deltas: Vec::new(),
3961 0 : layers_to_delete: Vec::new(),
3962 0 : }
3963 0 : }
3964 :
3965 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
3966 0 : let layers_to_delete = {
3967 0 : let guard = self.timeline.layers.read().await;
3968 0 : self.layers_to_delete
3969 0 : .iter()
3970 0 : .map(|x| guard.get_from_desc(x))
3971 0 : .collect::<Vec<Layer>>()
3972 0 : };
3973 0 : self.timeline
3974 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
3975 0 : .await?;
3976 :
3977 0 : self.timeline
3978 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
3979 :
3980 0 : self.new_deltas.clear();
3981 0 : self.layers_to_delete.clear();
3982 0 : Ok(())
3983 0 : }
3984 : }
3985 :
3986 : #[derive(Clone)]
3987 : struct ResidentDeltaLayer(ResidentLayer);
3988 : #[derive(Clone)]
3989 : struct ResidentImageLayer(ResidentLayer);
3990 :
3991 : impl CompactionJobExecutor for TimelineAdaptor {
3992 : type Key = pageserver_api::key::Key;
3993 :
3994 : type Layer = OwnArc<PersistentLayerDesc>;
3995 : type DeltaLayer = ResidentDeltaLayer;
3996 : type ImageLayer = ResidentImageLayer;
3997 :
3998 : type RequestContext = crate::context::RequestContext;
3999 :
4000 0 : fn get_shard_identity(&self) -> &ShardIdentity {
4001 0 : self.timeline.get_shard_identity()
4002 0 : }
4003 :
4004 0 : async fn get_layers(
4005 0 : &mut self,
4006 0 : key_range: &Range<Key>,
4007 0 : lsn_range: &Range<Lsn>,
4008 0 : _ctx: &RequestContext,
4009 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
4010 0 : self.flush_updates().await?;
4011 :
4012 0 : let guard = self.timeline.layers.read().await;
4013 0 : let layer_map = guard.layer_map()?;
4014 :
4015 0 : let result = layer_map
4016 0 : .iter_historic_layers()
4017 0 : .filter(|l| {
4018 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
4019 0 : })
4020 0 : .map(OwnArc)
4021 0 : .collect();
4022 0 : Ok(result)
4023 0 : }
4024 :
4025 0 : async fn get_keyspace(
4026 0 : &mut self,
4027 0 : key_range: &Range<Key>,
4028 0 : lsn: Lsn,
4029 0 : _ctx: &RequestContext,
4030 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
4031 0 : if lsn == self.keyspace.0 {
4032 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
4033 0 : &self.keyspace.1.ranges,
4034 0 : key_range,
4035 0 : ))
4036 : } else {
4037 : // The current compaction implementation only ever requests the key space
4038 : // at the compaction end LSN.
4039 0 : anyhow::bail!("keyspace not available for requested lsn");
4040 : }
4041 0 : }
4042 :
4043 0 : async fn downcast_delta_layer(
4044 0 : &self,
4045 0 : layer: &OwnArc<PersistentLayerDesc>,
4046 0 : ctx: &RequestContext,
4047 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
4048 0 : // this is a lot more complex than a simple downcast...
4049 0 : if layer.is_delta() {
4050 0 : let l = {
4051 0 : let guard = self.timeline.layers.read().await;
4052 0 : guard.get_from_desc(layer)
4053 : };
4054 0 : let result = l.download_and_keep_resident(ctx).await?;
4055 :
4056 0 : Ok(Some(ResidentDeltaLayer(result)))
4057 : } else {
4058 0 : Ok(None)
4059 : }
4060 0 : }
4061 :
4062 0 : async fn create_image(
4063 0 : &mut self,
4064 0 : lsn: Lsn,
4065 0 : key_range: &Range<Key>,
4066 0 : ctx: &RequestContext,
4067 0 : ) -> anyhow::Result<()> {
4068 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
4069 0 : }
4070 :
4071 0 : async fn create_delta(
4072 0 : &mut self,
4073 0 : lsn_range: &Range<Lsn>,
4074 0 : key_range: &Range<Key>,
4075 0 : input_layers: &[ResidentDeltaLayer],
4076 0 : ctx: &RequestContext,
4077 0 : ) -> anyhow::Result<()> {
4078 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
4079 :
4080 0 : let mut all_entries = Vec::new();
4081 0 : for dl in input_layers.iter() {
4082 0 : all_entries.extend(dl.load_keys(ctx).await?);
4083 : }
4084 :
4085 : // The current stdlib sorting implementation is designed in a way where it is
4086 : // particularly fast where the slice is made up of sorted sub-ranges.
4087 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
4088 :
4089 0 : let mut writer = DeltaLayerWriter::new(
4090 0 : self.timeline.conf,
4091 0 : self.timeline.timeline_id,
4092 0 : self.timeline.tenant_shard_id,
4093 0 : key_range.start,
4094 0 : lsn_range.clone(),
4095 0 : &self.timeline.gate,
4096 0 : self.timeline.cancel.clone(),
4097 0 : ctx,
4098 0 : )
4099 0 : .await?;
4100 :
4101 0 : let mut dup_values = 0;
4102 0 :
4103 0 : // This iterator walks through all key-value pairs from all the layers
4104 0 : // we're compacting, in key, LSN order.
4105 0 : let mut prev: Option<(Key, Lsn)> = None;
4106 : for &DeltaEntry {
4107 0 : key, lsn, ref val, ..
4108 0 : } in all_entries.iter()
4109 : {
4110 0 : if prev == Some((key, lsn)) {
4111 : // This is a duplicate. Skip it.
4112 : //
4113 : // It can happen if compaction is interrupted after writing some
4114 : // layers but not all, and we are compacting the range again.
4115 : // The calculations in the algorithm assume that there are no
4116 : // duplicates, so the math on targeted file size is likely off,
4117 : // and we will create smaller files than expected.
4118 0 : dup_values += 1;
4119 0 : continue;
4120 0 : }
4121 :
4122 0 : let value = val.load(ctx).await?;
4123 :
4124 0 : writer.put_value(key, lsn, value, ctx).await?;
4125 :
4126 0 : prev = Some((key, lsn));
4127 : }
4128 :
4129 0 : if dup_values > 0 {
4130 0 : warn!("delta layer created with {} duplicate values", dup_values);
4131 0 : }
4132 :
4133 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
4134 0 : Err(anyhow::anyhow!(
4135 0 : "failpoint delta-layer-writer-fail-before-finish"
4136 0 : ))
4137 0 : });
4138 :
4139 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
4140 0 : let new_delta_layer =
4141 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4142 :
4143 0 : self.new_deltas.push(new_delta_layer);
4144 0 : Ok(())
4145 0 : }
4146 :
4147 0 : async fn delete_layer(
4148 0 : &mut self,
4149 0 : layer: &OwnArc<PersistentLayerDesc>,
4150 0 : _ctx: &RequestContext,
4151 0 : ) -> anyhow::Result<()> {
4152 0 : self.layers_to_delete.push(layer.clone().0);
4153 0 : Ok(())
4154 0 : }
4155 : }
4156 :
4157 : impl TimelineAdaptor {
4158 0 : async fn create_image_impl(
4159 0 : &mut self,
4160 0 : lsn: Lsn,
4161 0 : key_range: &Range<Key>,
4162 0 : ctx: &RequestContext,
4163 0 : ) -> Result<(), CreateImageLayersError> {
4164 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
4165 :
4166 0 : let image_layer_writer = ImageLayerWriter::new(
4167 0 : self.timeline.conf,
4168 0 : self.timeline.timeline_id,
4169 0 : self.timeline.tenant_shard_id,
4170 0 : key_range,
4171 0 : lsn,
4172 0 : &self.timeline.gate,
4173 0 : self.timeline.cancel.clone(),
4174 0 : ctx,
4175 0 : )
4176 0 : .await?;
4177 :
4178 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
4179 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
4180 0 : "failpoint image-layer-writer-fail-before-finish"
4181 0 : )))
4182 0 : });
4183 :
4184 0 : let keyspace = KeySpace {
4185 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
4186 : };
4187 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
4188 0 : let outcome = self
4189 0 : .timeline
4190 0 : .create_image_layer_for_rel_blocks(
4191 0 : &keyspace,
4192 0 : image_layer_writer,
4193 0 : lsn,
4194 0 : ctx,
4195 0 : key_range.clone(),
4196 0 : IoConcurrency::sequential(),
4197 0 : )
4198 0 : .await?;
4199 :
4200 : if let ImageLayerCreationOutcome::Generated {
4201 0 : unfinished_image_layer,
4202 0 : } = outcome
4203 : {
4204 0 : let (desc, path) = unfinished_image_layer.finish(ctx).await?;
4205 0 : let image_layer =
4206 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4207 0 : self.new_images.push(image_layer);
4208 0 : }
4209 :
4210 0 : timer.stop_and_record();
4211 0 :
4212 0 : Ok(())
4213 0 : }
4214 : }
4215 :
4216 : impl CompactionRequestContext for crate::context::RequestContext {}
4217 :
4218 : #[derive(Debug, Clone)]
4219 : pub struct OwnArc<T>(pub Arc<T>);
4220 :
4221 : impl<T> Deref for OwnArc<T> {
4222 : type Target = <Arc<T> as Deref>::Target;
4223 0 : fn deref(&self) -> &Self::Target {
4224 0 : &self.0
4225 0 : }
4226 : }
4227 :
4228 : impl<T> AsRef<T> for OwnArc<T> {
4229 0 : fn as_ref(&self) -> &T {
4230 0 : self.0.as_ref()
4231 0 : }
4232 : }
4233 :
4234 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
4235 0 : fn key_range(&self) -> &Range<Key> {
4236 0 : &self.key_range
4237 0 : }
4238 0 : fn lsn_range(&self) -> &Range<Lsn> {
4239 0 : &self.lsn_range
4240 0 : }
4241 0 : fn file_size(&self) -> u64 {
4242 0 : self.file_size
4243 0 : }
4244 0 : fn short_id(&self) -> std::string::String {
4245 0 : self.as_ref().short_id().to_string()
4246 0 : }
4247 0 : fn is_delta(&self) -> bool {
4248 0 : self.as_ref().is_delta()
4249 0 : }
4250 : }
4251 :
4252 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
4253 0 : fn key_range(&self) -> &Range<Key> {
4254 0 : &self.layer_desc().key_range
4255 0 : }
4256 0 : fn lsn_range(&self) -> &Range<Lsn> {
4257 0 : &self.layer_desc().lsn_range
4258 0 : }
4259 0 : fn file_size(&self) -> u64 {
4260 0 : self.layer_desc().file_size
4261 0 : }
4262 0 : fn short_id(&self) -> std::string::String {
4263 0 : self.layer_desc().short_id().to_string()
4264 0 : }
4265 0 : fn is_delta(&self) -> bool {
4266 0 : true
4267 0 : }
4268 : }
4269 :
4270 : impl CompactionLayer<Key> for ResidentDeltaLayer {
4271 0 : fn key_range(&self) -> &Range<Key> {
4272 0 : &self.0.layer_desc().key_range
4273 0 : }
4274 0 : fn lsn_range(&self) -> &Range<Lsn> {
4275 0 : &self.0.layer_desc().lsn_range
4276 0 : }
4277 0 : fn file_size(&self) -> u64 {
4278 0 : self.0.layer_desc().file_size
4279 0 : }
4280 0 : fn short_id(&self) -> std::string::String {
4281 0 : self.0.layer_desc().short_id().to_string()
4282 0 : }
4283 0 : fn is_delta(&self) -> bool {
4284 0 : true
4285 0 : }
4286 : }
4287 :
4288 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
4289 : type DeltaEntry<'a> = DeltaEntry<'a>;
4290 :
4291 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
4292 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
4293 0 : }
4294 : }
4295 :
4296 : impl CompactionLayer<Key> for ResidentImageLayer {
4297 0 : fn key_range(&self) -> &Range<Key> {
4298 0 : &self.0.layer_desc().key_range
4299 0 : }
4300 0 : fn lsn_range(&self) -> &Range<Lsn> {
4301 0 : &self.0.layer_desc().lsn_range
4302 0 : }
4303 0 : fn file_size(&self) -> u64 {
4304 0 : self.0.layer_desc().file_size
4305 0 : }
4306 0 : fn short_id(&self) -> std::string::String {
4307 0 : self.0.layer_desc().short_id().to_string()
4308 0 : }
4309 0 : fn is_delta(&self) -> bool {
4310 0 : false
4311 0 : }
4312 : }
4313 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|