Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 : use std::time::{Duration, Instant};
11 :
12 : use super::layer_manager::LayerManager;
13 : use super::{
14 : CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
15 : GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
16 : Timeline,
17 : };
18 :
19 : use crate::tenant::timeline::DeltaEntry;
20 : use crate::walredo::RedoAttemptType;
21 : use anyhow::{Context, anyhow};
22 : use bytes::Bytes;
23 : use enumset::EnumSet;
24 : use fail::fail_point;
25 : use futures::FutureExt;
26 : use itertools::Itertools;
27 : use once_cell::sync::Lazy;
28 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
29 : use pageserver_api::key::{KEY_SIZE, Key};
30 : use pageserver_api::keyspace::{KeySpace, ShardedRange};
31 : use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
32 : use pageserver_api::record::NeonWalRecord;
33 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
34 : use pageserver_api::value::Value;
35 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
36 : use pageserver_compaction::interface::*;
37 : use serde::Serialize;
38 : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
41 : use utils::critical;
42 : use utils::id::TimelineId;
43 : use utils::lsn::Lsn;
44 :
45 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
46 : use crate::page_cache;
47 : use crate::statvfs::Statvfs;
48 : use crate::tenant::checks::check_valid_layermap;
49 : use crate::tenant::gc_block::GcBlock;
50 : use crate::tenant::layer_map::LayerMap;
51 : use crate::tenant::remote_timeline_client::WaitCompletionError;
52 : use crate::tenant::remote_timeline_client::index::GcCompactionState;
53 : use crate::tenant::storage_layer::batch_split_writer::{
54 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
55 : };
56 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
57 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
58 : use crate::tenant::storage_layer::{
59 : AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
60 : ValueReconstructState,
61 : };
62 : use crate::tenant::tasks::log_compaction_error;
63 : use crate::tenant::timeline::{
64 : DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
65 : ResidentLayer, drop_rlock,
66 : };
67 : use crate::tenant::{DeltaLayer, MaybeOffloaded};
68 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
69 :
70 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
71 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
72 :
73 : /// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
74 : /// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
75 : ///
76 : /// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
77 : /// shard split, which gets expensive for large tenants.
78 : const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
79 :
80 : #[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
81 : pub struct GcCompactionJobId(pub usize);
82 :
83 : impl std::fmt::Display for GcCompactionJobId {
84 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 0 : write!(f, "{}", self.0)
86 0 : }
87 : }
88 :
89 : pub struct GcCompactionCombinedSettings {
90 : pub gc_compaction_enabled: bool,
91 : pub gc_compaction_verification: bool,
92 : pub gc_compaction_initial_threshold_kb: u64,
93 : pub gc_compaction_ratio_percent: u64,
94 : }
95 :
96 : #[derive(Debug, Clone)]
97 : pub enum GcCompactionQueueItem {
98 : MetaJob {
99 : /// Compaction options
100 : options: CompactOptions,
101 : /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
102 : auto: bool,
103 : },
104 : SubCompactionJob(CompactOptions),
105 : Notify(GcCompactionJobId, Option<Lsn>),
106 : }
107 :
108 : /// Statistics for gc-compaction meta jobs, which contains several sub compaction jobs.
109 : #[derive(Debug, Clone, Serialize, Default)]
110 : pub struct GcCompactionMetaStatistics {
111 : /// The total number of sub compaction jobs.
112 : pub total_sub_compaction_jobs: usize,
113 : /// The total number of sub compaction jobs that failed.
114 : pub failed_sub_compaction_jobs: usize,
115 : /// The total number of sub compaction jobs that succeeded.
116 : pub succeeded_sub_compaction_jobs: usize,
117 : /// The layer size before compaction.
118 : pub before_compaction_layer_size: u64,
119 : /// The layer size after compaction.
120 : pub after_compaction_layer_size: u64,
121 : /// The start time of the meta job.
122 : pub start_time: Option<chrono::DateTime<chrono::Utc>>,
123 : /// The end time of the meta job.
124 : pub end_time: Option<chrono::DateTime<chrono::Utc>>,
125 : /// The duration of the meta job.
126 : pub duration_secs: f64,
127 : /// The id of the meta job.
128 : pub meta_job_id: GcCompactionJobId,
129 : /// The LSN below which the layers are compacted, used to compute the statistics.
130 : pub below_lsn: Lsn,
131 : /// The retention ratio of the meta job (after_compaction_layer_size / before_compaction_layer_size)
132 : pub retention_ratio: f64,
133 : }
134 :
135 : impl GcCompactionMetaStatistics {
136 0 : fn finalize(&mut self) {
137 0 : let end_time = chrono::Utc::now();
138 0 : if let Some(start_time) = self.start_time {
139 0 : if end_time > start_time {
140 0 : let delta = end_time - start_time;
141 0 : if let Ok(std_dur) = delta.to_std() {
142 0 : self.duration_secs = std_dur.as_secs_f64();
143 0 : }
144 0 : }
145 0 : }
146 0 : self.retention_ratio = self.after_compaction_layer_size as f64
147 0 : / (self.before_compaction_layer_size as f64 + 1.0);
148 0 : self.end_time = Some(end_time);
149 0 : }
150 : }
151 :
152 : impl GcCompactionQueueItem {
153 0 : pub fn into_compact_info_resp(
154 0 : self,
155 0 : id: GcCompactionJobId,
156 0 : running: bool,
157 0 : ) -> Option<CompactInfoResponse> {
158 0 : match self {
159 0 : GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
160 0 : compact_key_range: options.compact_key_range,
161 0 : compact_lsn_range: options.compact_lsn_range,
162 0 : sub_compaction: options.sub_compaction,
163 0 : running,
164 0 : job_id: id.0,
165 0 : }),
166 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
167 0 : compact_key_range: options.compact_key_range,
168 0 : compact_lsn_range: options.compact_lsn_range,
169 0 : sub_compaction: options.sub_compaction,
170 0 : running,
171 0 : job_id: id.0,
172 0 : }),
173 0 : GcCompactionQueueItem::Notify(_, _) => None,
174 : }
175 0 : }
176 : }
177 :
178 : #[derive(Default)]
179 : struct GcCompactionGuardItems {
180 : notify: Option<tokio::sync::oneshot::Sender<()>>,
181 : permit: Option<OwnedSemaphorePermit>,
182 : }
183 :
184 : struct GcCompactionQueueInner {
185 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
186 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
187 : guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
188 : last_id: GcCompactionJobId,
189 : meta_statistics: Option<GcCompactionMetaStatistics>,
190 : }
191 :
192 : impl GcCompactionQueueInner {
193 0 : fn next_id(&mut self) -> GcCompactionJobId {
194 0 : let id = self.last_id;
195 0 : self.last_id = GcCompactionJobId(id.0 + 1);
196 0 : id
197 0 : }
198 : }
199 :
200 : /// A structure to store gc_compaction jobs.
201 : pub struct GcCompactionQueue {
202 : /// All items in the queue, and the currently-running job.
203 : inner: std::sync::Mutex<GcCompactionQueueInner>,
204 : /// Ensure only one thread is consuming the queue.
205 : consumer_lock: tokio::sync::Mutex<()>,
206 : }
207 :
208 0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
209 0 : // Only allow two timelines on one pageserver to run gc compaction at a time.
210 0 : Arc::new(Semaphore::new(2))
211 0 : });
212 :
213 : impl GcCompactionQueue {
214 0 : pub fn new() -> Self {
215 0 : GcCompactionQueue {
216 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
217 0 : running: None,
218 0 : queued: VecDeque::new(),
219 0 : guards: HashMap::new(),
220 0 : last_id: GcCompactionJobId(0),
221 0 : meta_statistics: None,
222 0 : }),
223 0 : consumer_lock: tokio::sync::Mutex::new(()),
224 0 : }
225 0 : }
226 :
227 0 : pub fn cancel_scheduled(&self) {
228 0 : let mut guard = self.inner.lock().unwrap();
229 0 : guard.queued.clear();
230 0 : // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
231 0 : // API is only used for testing purposes, so we can drop everything here.
232 0 : guard.guards.clear();
233 0 : }
234 :
235 : /// Schedule a manual compaction job.
236 0 : pub fn schedule_manual_compaction(
237 0 : &self,
238 0 : options: CompactOptions,
239 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
240 0 : ) -> GcCompactionJobId {
241 0 : let mut guard = self.inner.lock().unwrap();
242 0 : let id = guard.next_id();
243 0 : guard.queued.push_back((
244 0 : id,
245 0 : GcCompactionQueueItem::MetaJob {
246 0 : options,
247 0 : auto: false,
248 0 : },
249 0 : ));
250 0 : guard.guards.entry(id).or_default().notify = notify;
251 0 : info!("scheduled compaction job id={}", id);
252 0 : id
253 0 : }
254 :
255 : /// Schedule an auto compaction job.
256 0 : fn schedule_auto_compaction(
257 0 : &self,
258 0 : options: CompactOptions,
259 0 : permit: OwnedSemaphorePermit,
260 0 : ) -> GcCompactionJobId {
261 0 : let mut guard = self.inner.lock().unwrap();
262 0 : let id = guard.next_id();
263 0 : guard.queued.push_back((
264 0 : id,
265 0 : GcCompactionQueueItem::MetaJob {
266 0 : options,
267 0 : auto: true,
268 0 : },
269 0 : ));
270 0 : guard.guards.entry(id).or_default().permit = Some(permit);
271 0 : id
272 0 : }
273 :
274 : /// Trigger an auto compaction.
275 0 : pub async fn trigger_auto_compaction(
276 0 : &self,
277 0 : timeline: &Arc<Timeline>,
278 0 : ) -> Result<(), CompactionError> {
279 0 : let GcCompactionCombinedSettings {
280 0 : gc_compaction_enabled,
281 0 : gc_compaction_initial_threshold_kb,
282 0 : gc_compaction_ratio_percent,
283 0 : ..
284 0 : } = timeline.get_gc_compaction_settings();
285 0 : if !gc_compaction_enabled {
286 0 : return Ok(());
287 0 : }
288 0 : if self.remaining_jobs_num() > 0 {
289 : // Only schedule auto compaction when the queue is empty
290 0 : return Ok(());
291 0 : }
292 0 : if timeline.ancestor_timeline().is_some() {
293 : // Do not trigger auto compaction for child timelines. We haven't tested
294 : // it enough in staging yet.
295 0 : return Ok(());
296 0 : }
297 0 : if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
298 : // If the gc watermark is not set, we don't need to trigger auto compaction.
299 : // This check is the same as in `gc_compaction_split_jobs` but we don't log
300 : // here and we can also skip the computation of the trigger condition earlier.
301 0 : return Ok(());
302 0 : }
303 :
304 0 : let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
305 : // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
306 : // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
307 : // to ensure the fairness while avoid starving other tasks.
308 0 : return Ok(());
309 : };
310 :
311 0 : let gc_compaction_state = timeline.get_gc_compaction_state();
312 0 : let l2_lsn = gc_compaction_state
313 0 : .map(|x| x.last_completed_lsn)
314 0 : .unwrap_or(Lsn::INVALID);
315 :
316 0 : let layers = {
317 0 : let guard = timeline.layers.read().await;
318 0 : let layer_map = guard.layer_map()?;
319 0 : layer_map.iter_historic_layers().collect_vec()
320 0 : };
321 0 : let mut l2_size: u64 = 0;
322 0 : let mut l1_size = 0;
323 0 : let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
324 0 : for layer in layers {
325 0 : if layer.lsn_range.start <= l2_lsn {
326 0 : l2_size += layer.file_size();
327 0 : } else if layer.lsn_range.start <= gc_cutoff {
328 0 : l1_size += layer.file_size();
329 0 : }
330 : }
331 :
332 0 : fn trigger_compaction(
333 0 : l1_size: u64,
334 0 : l2_size: u64,
335 0 : gc_compaction_initial_threshold_kb: u64,
336 0 : gc_compaction_ratio_percent: u64,
337 0 : ) -> bool {
338 : const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
339 0 : if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
340 : // Do not auto-trigger when physical size >= 150GB
341 0 : return false;
342 0 : }
343 0 : // initial trigger
344 0 : if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
345 0 : info!(
346 0 : "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
347 : l1_size, gc_compaction_initial_threshold_kb
348 : );
349 0 : return true;
350 0 : }
351 0 : // size ratio trigger
352 0 : if l2_size == 0 {
353 0 : return false;
354 0 : }
355 0 : if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
356 0 : info!(
357 0 : "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
358 : l1_size, l2_size, gc_compaction_ratio_percent
359 : );
360 0 : return true;
361 0 : }
362 0 : false
363 0 : }
364 :
365 0 : if trigger_compaction(
366 0 : l1_size,
367 0 : l2_size,
368 0 : gc_compaction_initial_threshold_kb,
369 0 : gc_compaction_ratio_percent,
370 0 : ) {
371 0 : self.schedule_auto_compaction(
372 0 : CompactOptions {
373 0 : flags: {
374 0 : let mut flags = EnumSet::new();
375 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
376 0 : if timeline.get_compaction_l0_first() {
377 0 : flags |= CompactFlags::YieldForL0;
378 0 : }
379 0 : flags
380 0 : },
381 0 : sub_compaction: true,
382 0 : // Only auto-trigger gc-compaction over the data keyspace due to concerns in
383 0 : // https://github.com/neondatabase/neon/issues/11318.
384 0 : compact_key_range: Some(CompactKeyRange {
385 0 : start: Key::MIN,
386 0 : end: Key::metadata_key_range().start,
387 0 : }),
388 0 : compact_lsn_range: None,
389 0 : sub_compaction_max_job_size_mb: None,
390 0 : },
391 0 : permit,
392 0 : );
393 0 : info!(
394 0 : "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
395 : l1_size, l2_size, l2_lsn, gc_cutoff
396 : );
397 : } else {
398 0 : debug!(
399 0 : "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
400 : l1_size, l2_size, l2_lsn, gc_cutoff
401 : );
402 : }
403 0 : Ok(())
404 0 : }
405 :
406 0 : async fn collect_layer_below_lsn(
407 0 : &self,
408 0 : timeline: &Arc<Timeline>,
409 0 : lsn: Lsn,
410 0 : ) -> Result<u64, CompactionError> {
411 0 : let guard = timeline.layers.read().await;
412 0 : let layer_map = guard.layer_map()?;
413 0 : let layers = layer_map.iter_historic_layers().collect_vec();
414 0 : let mut size = 0;
415 0 : for layer in layers {
416 0 : if layer.lsn_range.start <= lsn {
417 0 : size += layer.file_size();
418 0 : }
419 : }
420 0 : Ok(size)
421 0 : }
422 :
423 : /// Notify the caller the job has finished and unblock GC.
424 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
425 0 : info!("compaction job id={} finished", id);
426 0 : let mut guard = self.inner.lock().unwrap();
427 0 : if let Some(items) = guard.guards.remove(&id) {
428 0 : if let Some(tx) = items.notify {
429 0 : let _ = tx.send(());
430 0 : }
431 0 : }
432 0 : if let Some(ref meta_statistics) = guard.meta_statistics {
433 0 : if meta_statistics.meta_job_id == id {
434 0 : if let Ok(stats) = serde_json::to_string(&meta_statistics) {
435 0 : info!(
436 0 : "gc-compaction meta statistics for job id = {}: {}",
437 : id, stats
438 : );
439 0 : }
440 0 : }
441 0 : }
442 0 : }
443 :
444 0 : fn clear_running_job(&self) {
445 0 : let mut guard = self.inner.lock().unwrap();
446 0 : guard.running = None;
447 0 : }
448 :
449 0 : async fn handle_sub_compaction(
450 0 : &self,
451 0 : id: GcCompactionJobId,
452 0 : options: CompactOptions,
453 0 : timeline: &Arc<Timeline>,
454 0 : auto: bool,
455 0 : ) -> Result<(), CompactionError> {
456 0 : info!(
457 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
458 : );
459 0 : let res = timeline
460 0 : .gc_compaction_split_jobs(
461 0 : GcCompactJob::from_compact_options(options.clone()),
462 0 : options.sub_compaction_max_job_size_mb,
463 0 : )
464 0 : .await;
465 0 : let jobs = match res {
466 0 : Ok(jobs) => jobs,
467 0 : Err(err) => {
468 0 : warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
469 0 : self.notify_and_unblock(id);
470 0 : return Err(err);
471 : }
472 : };
473 0 : if jobs.is_empty() {
474 0 : info!("no jobs to run, skipping scheduled compaction task");
475 0 : self.notify_and_unblock(id);
476 : } else {
477 0 : let jobs_len = jobs.len();
478 0 : let mut pending_tasks = Vec::new();
479 0 : // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
480 0 : // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
481 0 : let expected_l2_lsn = jobs
482 0 : .iter()
483 0 : .map(|job| job.compact_lsn_range.end)
484 0 : .max()
485 0 : .unwrap();
486 0 : for job in jobs {
487 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
488 : // until we do further refactors to allow directly call `compact_with_gc`.
489 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
490 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
491 0 : if job.dry_run {
492 0 : flags |= CompactFlags::DryRun;
493 0 : }
494 0 : if options.flags.contains(CompactFlags::YieldForL0) {
495 0 : flags |= CompactFlags::YieldForL0;
496 0 : }
497 0 : let options = CompactOptions {
498 0 : flags,
499 0 : sub_compaction: false,
500 0 : compact_key_range: Some(job.compact_key_range.into()),
501 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
502 0 : sub_compaction_max_job_size_mb: None,
503 0 : };
504 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
505 : }
506 :
507 0 : if !auto {
508 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
509 0 : } else {
510 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, Some(expected_l2_lsn)));
511 0 : }
512 :
513 0 : let layer_size = self
514 0 : .collect_layer_below_lsn(timeline, expected_l2_lsn)
515 0 : .await?;
516 :
517 : {
518 0 : let mut guard = self.inner.lock().unwrap();
519 0 : let mut tasks = Vec::new();
520 0 : for task in pending_tasks {
521 0 : let id = guard.next_id();
522 0 : tasks.push((id, task));
523 0 : }
524 0 : tasks.reverse();
525 0 : for item in tasks {
526 0 : guard.queued.push_front(item);
527 0 : }
528 0 : guard.meta_statistics = Some(GcCompactionMetaStatistics {
529 0 : meta_job_id: id,
530 0 : start_time: Some(chrono::Utc::now()),
531 0 : before_compaction_layer_size: layer_size,
532 0 : below_lsn: expected_l2_lsn,
533 0 : total_sub_compaction_jobs: jobs_len,
534 0 : ..Default::default()
535 0 : });
536 0 : }
537 0 :
538 0 : info!(
539 0 : "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
540 : jobs_len
541 : );
542 : }
543 0 : Ok(())
544 0 : }
545 :
546 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
547 0 : pub async fn iteration(
548 0 : &self,
549 0 : cancel: &CancellationToken,
550 0 : ctx: &RequestContext,
551 0 : gc_block: &GcBlock,
552 0 : timeline: &Arc<Timeline>,
553 0 : ) -> Result<CompactionOutcome, CompactionError> {
554 0 : let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
555 0 : if let Err(err) = &res {
556 0 : log_compaction_error(err, None, cancel.is_cancelled(), true);
557 0 : }
558 0 : match res {
559 0 : Ok(res) => Ok(res),
560 0 : Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
561 : Err(_) => {
562 : // There are some cases where traditional gc might collect some layer
563 : // files causing gc-compaction cannot read the full history of the key.
564 : // This needs to be resolved in the long-term by improving the compaction
565 : // process. For now, let's simply avoid such errors triggering the
566 : // circuit breaker.
567 0 : Ok(CompactionOutcome::Skipped)
568 : }
569 : }
570 0 : }
571 :
572 0 : async fn iteration_inner(
573 0 : &self,
574 0 : cancel: &CancellationToken,
575 0 : ctx: &RequestContext,
576 0 : gc_block: &GcBlock,
577 0 : timeline: &Arc<Timeline>,
578 0 : ) -> Result<CompactionOutcome, CompactionError> {
579 0 : let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
580 0 : return Err(CompactionError::AlreadyRunning(
581 0 : "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
582 0 : ));
583 : };
584 : let has_pending_tasks;
585 0 : let mut yield_for_l0 = false;
586 0 : let Some((id, item)) = ({
587 0 : let mut guard = self.inner.lock().unwrap();
588 0 : if let Some((id, item)) = guard.queued.pop_front() {
589 0 : guard.running = Some((id, item.clone()));
590 0 : has_pending_tasks = !guard.queued.is_empty();
591 0 : Some((id, item))
592 : } else {
593 0 : has_pending_tasks = false;
594 0 : None
595 : }
596 : }) else {
597 0 : self.trigger_auto_compaction(timeline).await?;
598 : // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
599 : // have not implemented preemption mechanism yet. We always want to yield it to more important
600 : // tasks if there is one.
601 0 : return Ok(CompactionOutcome::Done);
602 : };
603 0 : match item {
604 0 : GcCompactionQueueItem::MetaJob { options, auto } => {
605 0 : if !options
606 0 : .flags
607 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
608 : {
609 0 : warn!(
610 0 : "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
611 : options
612 : );
613 0 : } else if options.sub_compaction {
614 0 : info!(
615 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
616 : );
617 0 : self.handle_sub_compaction(id, options, timeline, auto)
618 0 : .await?;
619 : } else {
620 : // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
621 : // in this branch.
622 0 : let _gc_guard = match gc_block.start().await {
623 0 : Ok(guard) => guard,
624 0 : Err(e) => {
625 0 : self.notify_and_unblock(id);
626 0 : self.clear_running_job();
627 0 : return Err(CompactionError::Other(anyhow!(
628 0 : "cannot run gc-compaction because gc is blocked: {}",
629 0 : e
630 0 : )));
631 : }
632 : };
633 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
634 0 : let compaction_result = match res {
635 0 : Ok(res) => res,
636 0 : Err(err) => {
637 0 : warn!(%err, "failed to run gc-compaction");
638 0 : self.notify_and_unblock(id);
639 0 : self.clear_running_job();
640 0 : return Err(err);
641 : }
642 : };
643 0 : if compaction_result == CompactionOutcome::YieldForL0 {
644 0 : yield_for_l0 = true;
645 0 : }
646 : }
647 : }
648 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
649 : // TODO: error handling, clear the queue if any task fails?
650 0 : let _gc_guard = match gc_block.start().await {
651 0 : Ok(guard) => guard,
652 0 : Err(e) => {
653 0 : self.clear_running_job();
654 0 : return Err(CompactionError::Other(anyhow!(
655 0 : "cannot run gc-compaction because gc is blocked: {}",
656 0 : e
657 0 : )));
658 : }
659 : };
660 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
661 0 : let compaction_result = match res {
662 0 : Ok(res) => res,
663 0 : Err(err) => {
664 0 : warn!(%err, "failed to run gc-compaction subcompaction job");
665 0 : self.clear_running_job();
666 0 : let mut guard = self.inner.lock().unwrap();
667 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
668 0 : meta_statistics.failed_sub_compaction_jobs += 1;
669 0 : }
670 0 : return Err(err);
671 : }
672 : };
673 0 : if compaction_result == CompactionOutcome::YieldForL0 {
674 0 : // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
675 0 : // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
676 0 : // we need to clean things up before returning from the function.
677 0 : yield_for_l0 = true;
678 0 : }
679 : {
680 0 : let mut guard = self.inner.lock().unwrap();
681 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
682 0 : meta_statistics.succeeded_sub_compaction_jobs += 1;
683 0 : }
684 : }
685 : }
686 0 : GcCompactionQueueItem::Notify(id, l2_lsn) => {
687 0 : let below_lsn = {
688 0 : let mut guard = self.inner.lock().unwrap();
689 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
690 0 : meta_statistics.below_lsn
691 : } else {
692 0 : Lsn::INVALID
693 : }
694 : };
695 0 : let layer_size = if below_lsn != Lsn::INVALID {
696 0 : self.collect_layer_below_lsn(timeline, below_lsn).await?
697 : } else {
698 0 : 0
699 : };
700 : {
701 0 : let mut guard = self.inner.lock().unwrap();
702 0 : if let Some(ref mut meta_statistics) = guard.meta_statistics {
703 0 : meta_statistics.after_compaction_layer_size = layer_size;
704 0 : meta_statistics.finalize();
705 0 : }
706 : }
707 0 : self.notify_and_unblock(id);
708 0 : if let Some(l2_lsn) = l2_lsn {
709 0 : let current_l2_lsn = timeline
710 0 : .get_gc_compaction_state()
711 0 : .map(|x| x.last_completed_lsn)
712 0 : .unwrap_or(Lsn::INVALID);
713 0 : if l2_lsn >= current_l2_lsn {
714 0 : info!("l2_lsn updated to {}", l2_lsn);
715 0 : timeline
716 0 : .update_gc_compaction_state(GcCompactionState {
717 0 : last_completed_lsn: l2_lsn,
718 0 : })
719 0 : .map_err(CompactionError::Other)?;
720 : } else {
721 0 : warn!(
722 0 : "l2_lsn updated to {} but it is less than the current l2_lsn {}",
723 : l2_lsn, current_l2_lsn
724 : );
725 : }
726 0 : }
727 : }
728 : }
729 0 : self.clear_running_job();
730 0 : Ok(if yield_for_l0 {
731 0 : tracing::info!("give up gc-compaction: yield for L0 compaction");
732 0 : CompactionOutcome::YieldForL0
733 0 : } else if has_pending_tasks {
734 0 : CompactionOutcome::Pending
735 : } else {
736 0 : CompactionOutcome::Done
737 : })
738 0 : }
739 :
740 : #[allow(clippy::type_complexity)]
741 0 : pub fn remaining_jobs(
742 0 : &self,
743 0 : ) -> (
744 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
745 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
746 0 : ) {
747 0 : let guard = self.inner.lock().unwrap();
748 0 : (guard.running.clone(), guard.queued.clone())
749 0 : }
750 :
751 0 : pub fn remaining_jobs_num(&self) -> usize {
752 0 : let guard = self.inner.lock().unwrap();
753 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
754 0 : }
755 : }
756 :
757 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
758 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
759 : /// called.
760 : #[derive(Debug, Clone)]
761 : pub(crate) struct GcCompactJob {
762 : pub dry_run: bool,
763 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
764 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
765 : pub compact_key_range: Range<Key>,
766 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
767 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
768 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
769 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
770 : pub compact_lsn_range: Range<Lsn>,
771 : }
772 :
773 : impl GcCompactJob {
774 28 : pub fn from_compact_options(options: CompactOptions) -> Self {
775 28 : GcCompactJob {
776 28 : dry_run: options.flags.contains(CompactFlags::DryRun),
777 28 : compact_key_range: options
778 28 : .compact_key_range
779 28 : .map(|x| x.into())
780 28 : .unwrap_or(Key::MIN..Key::MAX),
781 28 : compact_lsn_range: options
782 28 : .compact_lsn_range
783 28 : .map(|x| x.into())
784 28 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
785 28 : }
786 28 : }
787 : }
788 :
789 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
790 : /// and contains the exact layers we want to compact.
791 : pub struct GcCompactionJobDescription {
792 : /// All layers to read in the compaction job
793 : selected_layers: Vec<Layer>,
794 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
795 : /// keep all deltas <= this LSN or generate an image == this LSN.
796 : gc_cutoff: Lsn,
797 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
798 : /// generate an image == this LSN.
799 : retain_lsns_below_horizon: Vec<Lsn>,
800 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
801 : /// \>= this LSN will be kept and will not be rewritten.
802 : max_layer_lsn: Lsn,
803 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
804 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
805 : /// k-merge within gc-compaction.
806 : min_layer_lsn: Lsn,
807 : /// Only compact layers overlapping with this range.
808 : compaction_key_range: Range<Key>,
809 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
810 : /// This field is here solely for debugging. The field will not be read once the compaction
811 : /// description is generated.
812 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
813 : }
814 :
815 : /// The result of bottom-most compaction for a single key at each LSN.
816 : #[derive(Debug)]
817 : #[cfg_attr(test, derive(PartialEq))]
818 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
819 :
820 : /// The result of bottom-most compaction.
821 : #[derive(Debug)]
822 : #[cfg_attr(test, derive(PartialEq))]
823 : pub(crate) struct KeyHistoryRetention {
824 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
825 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
826 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
827 : pub(crate) above_horizon: KeyLogAtLsn,
828 : }
829 :
830 : impl KeyHistoryRetention {
831 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
832 : ///
833 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
834 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
835 : /// And we have branches at LSN 0x10, 0x20, 0x30.
836 : /// Then we delete branch @ 0x20.
837 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
838 : /// And that wouldnt' change the shape of the layer.
839 : ///
840 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
841 : ///
842 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
843 37 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
844 37 : if dry_run {
845 0 : return true;
846 37 : }
847 37 : if LayerMap::is_l0(&key.key_range, key.is_delta) {
848 : // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
849 : // We should ignore such layers.
850 0 : return true;
851 37 : }
852 : let layer_generation;
853 : {
854 37 : let guard = tline.layers.read().await;
855 37 : if !guard.contains_key(key) {
856 26 : return false;
857 11 : }
858 11 : layer_generation = guard.get_from_key(key).metadata().generation;
859 11 : }
860 11 : if layer_generation == tline.generation {
861 11 : info!(
862 : key=%key,
863 : ?layer_generation,
864 0 : "discard layer due to duplicated layer key in the same generation",
865 : );
866 11 : true
867 : } else {
868 0 : false
869 : }
870 37 : }
871 :
872 : /// Pipe a history of a single key to the writers.
873 : ///
874 : /// If `image_writer` is none, the images will be placed into the delta layers.
875 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
876 : #[allow(clippy::too_many_arguments)]
877 319 : async fn pipe_to(
878 319 : self,
879 319 : key: Key,
880 319 : delta_writer: &mut SplitDeltaLayerWriter<'_>,
881 319 : mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
882 319 : stat: &mut CompactionStatistics,
883 319 : ctx: &RequestContext,
884 319 : ) -> anyhow::Result<()> {
885 319 : let mut first_batch = true;
886 1022 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
887 703 : if first_batch {
888 319 : if logs.len() == 1 && logs[0].1.is_image() {
889 300 : let Value::Image(img) = &logs[0].1 else {
890 0 : unreachable!()
891 : };
892 300 : stat.produce_image_key(img);
893 300 : if let Some(image_writer) = image_writer.as_mut() {
894 300 : image_writer.put_image(key, img.clone(), ctx).await?;
895 : } else {
896 0 : delta_writer
897 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
898 0 : .await?;
899 : }
900 : } else {
901 33 : for (lsn, val) in logs {
902 14 : stat.produce_key(&val);
903 14 : delta_writer.put_value(key, lsn, val, ctx).await?;
904 : }
905 : }
906 319 : first_batch = false;
907 : } else {
908 442 : for (lsn, val) in logs {
909 58 : stat.produce_key(&val);
910 58 : delta_writer.put_value(key, lsn, val, ctx).await?;
911 : }
912 : }
913 : }
914 319 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
915 348 : for (lsn, val) in above_horizon_logs {
916 29 : stat.produce_key(&val);
917 29 : delta_writer.put_value(key, lsn, val, ctx).await?;
918 : }
919 319 : Ok(())
920 319 : }
921 :
922 : /// Verify if every key in the retention is readable by replaying the logs.
923 323 : async fn verify(
924 323 : &self,
925 323 : key: Key,
926 323 : base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
927 323 : full_history: &[(Key, Lsn, Value)],
928 323 : tline: &Arc<Timeline>,
929 323 : ) -> anyhow::Result<()> {
930 : // Usually the min_lsn should be the first record but we do a full iteration to be safe.
931 458 : let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
932 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
933 0 : return Ok(());
934 : };
935 458 : let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
936 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
937 0 : return Ok(());
938 : };
939 323 : let mut base_img = base_img_from_ancestor
940 323 : .as_ref()
941 323 : .map(|(_, lsn, img)| (*lsn, img));
942 323 : let mut history = Vec::new();
943 :
944 1027 : async fn collect_and_verify(
945 1027 : key: Key,
946 1027 : lsn: Lsn,
947 1027 : base_img: &Option<(Lsn, &Bytes)>,
948 1027 : history: &[(Lsn, &NeonWalRecord)],
949 1027 : tline: &Arc<Timeline>,
950 1027 : skip_empty: bool,
951 1027 : ) -> anyhow::Result<()> {
952 1027 : if base_img.is_none() && history.is_empty() {
953 0 : if skip_empty {
954 0 : return Ok(());
955 0 : }
956 0 : anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
957 1027 : };
958 1027 :
959 1027 : let mut records = history
960 1027 : .iter()
961 1027 : .map(|(lsn, val)| (*lsn, (*val).clone()))
962 1027 : .collect::<Vec<_>>();
963 1027 :
964 1027 : // WAL redo requires records in the reverse LSN order
965 1027 : records.reverse();
966 1027 : let data = ValueReconstructState {
967 1027 : img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
968 1027 : records,
969 1027 : };
970 1027 :
971 1027 : tline
972 1027 : .reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
973 1027 : .await
974 1027 : .with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
975 :
976 1027 : Ok(())
977 1027 : }
978 :
979 1036 : for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
980 1096 : for (lsn, val) in logs {
981 76 : match val {
982 307 : Value::Image(img) => {
983 307 : base_img = Some((*lsn, img));
984 307 : history.clear();
985 307 : }
986 76 : Value::WalRecord(rec) if val.will_init() => {
987 0 : base_img = None;
988 0 : history.clear();
989 0 : history.push((*lsn, rec));
990 0 : }
991 76 : Value::WalRecord(rec) => {
992 76 : history.push((*lsn, rec));
993 76 : }
994 : }
995 : }
996 713 : if *retain_lsn >= min_lsn {
997 : // Only verify after the key appears in the full history for the first time.
998 :
999 : // We don't modify history: in theory, we could replace the history with a single
1000 : // image as in `generate_key_retention` to make redos at later LSNs faster. But we
1001 : // want to verify everything as if they are read from the real layer map.
1002 699 : collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
1003 699 : .await
1004 699 : .context("below horizon retain_lsn")?;
1005 14 : }
1006 : }
1007 :
1008 360 : for (lsn, val) in &self.above_horizon.0 {
1009 32 : match val {
1010 5 : Value::Image(img) => {
1011 5 : // Above the GC horizon, we verify every time we see an image.
1012 5 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
1013 5 : .await
1014 5 : .context("above horizon full image")?;
1015 5 : base_img = Some((*lsn, img));
1016 5 : history.clear();
1017 : }
1018 32 : Value::WalRecord(rec) if val.will_init() => {
1019 0 : // Above the GC horizon, we verify every time we see an init record.
1020 0 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
1021 0 : .await
1022 0 : .context("above horizon init record")?;
1023 0 : base_img = None;
1024 0 : history.clear();
1025 0 : history.push((*lsn, rec));
1026 : }
1027 32 : Value::WalRecord(rec) => {
1028 32 : history.push((*lsn, rec));
1029 32 : }
1030 : }
1031 : }
1032 : // Ensure the latest record is readable.
1033 323 : collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
1034 323 : .await
1035 323 : .context("latest record")?;
1036 323 : Ok(())
1037 323 : }
1038 : }
1039 :
1040 : #[derive(Debug, Serialize, Default)]
1041 : struct CompactionStatisticsNumSize {
1042 : num: u64,
1043 : size: u64,
1044 : }
1045 :
1046 : #[derive(Debug, Serialize, Default)]
1047 : pub struct CompactionStatistics {
1048 : /// Delta layer visited (maybe compressed, physical size)
1049 : delta_layer_visited: CompactionStatisticsNumSize,
1050 : /// Image layer visited (maybe compressed, physical size)
1051 : image_layer_visited: CompactionStatisticsNumSize,
1052 : /// Delta layer produced (maybe compressed, physical size)
1053 : delta_layer_produced: CompactionStatisticsNumSize,
1054 : /// Image layer produced (maybe compressed, physical size)
1055 : image_layer_produced: CompactionStatisticsNumSize,
1056 : /// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
1057 : delta_layer_discarded: CompactionStatisticsNumSize,
1058 : /// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
1059 : image_layer_discarded: CompactionStatisticsNumSize,
1060 : num_unique_keys_visited: usize,
1061 : /// Delta visited (uncompressed, original size)
1062 : wal_keys_visited: CompactionStatisticsNumSize,
1063 : /// Image visited (uncompressed, original size)
1064 : image_keys_visited: CompactionStatisticsNumSize,
1065 : /// Delta produced (uncompressed, original size)
1066 : wal_produced: CompactionStatisticsNumSize,
1067 : /// Image produced (uncompressed, original size)
1068 : image_produced: CompactionStatisticsNumSize,
1069 :
1070 : // Time spent in each phase
1071 : time_acquire_lock_secs: f64,
1072 : time_analyze_secs: f64,
1073 : time_download_layer_secs: f64,
1074 : time_to_first_kv_pair_secs: f64,
1075 : time_main_loop_secs: f64,
1076 : time_final_phase_secs: f64,
1077 : time_total_secs: f64,
1078 :
1079 : // Summary
1080 : /// Ratio of the key-value size after/before gc-compaction.
1081 : uncompressed_retention_ratio: f64,
1082 : /// Ratio of the physical size after/before gc-compaction.
1083 : compressed_retention_ratio: f64,
1084 : }
1085 :
1086 : impl CompactionStatistics {
1087 534 : fn estimated_size_of_value(val: &Value) -> usize {
1088 219 : match val {
1089 315 : Value::Image(img) => img.len(),
1090 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
1091 219 : _ => std::mem::size_of::<NeonWalRecord>(),
1092 : }
1093 534 : }
1094 839 : fn estimated_size_of_key() -> usize {
1095 839 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
1096 839 : }
1097 44 : fn visit_delta_layer(&mut self, size: u64) {
1098 44 : self.delta_layer_visited.num += 1;
1099 44 : self.delta_layer_visited.size += size;
1100 44 : }
1101 35 : fn visit_image_layer(&mut self, size: u64) {
1102 35 : self.image_layer_visited.num += 1;
1103 35 : self.image_layer_visited.size += size;
1104 35 : }
1105 320 : fn on_unique_key_visited(&mut self) {
1106 320 : self.num_unique_keys_visited += 1;
1107 320 : }
1108 123 : fn visit_wal_key(&mut self, val: &Value) {
1109 123 : self.wal_keys_visited.num += 1;
1110 123 : self.wal_keys_visited.size +=
1111 123 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1112 123 : }
1113 315 : fn visit_image_key(&mut self, val: &Value) {
1114 315 : self.image_keys_visited.num += 1;
1115 315 : self.image_keys_visited.size +=
1116 315 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1117 315 : }
1118 101 : fn produce_key(&mut self, val: &Value) {
1119 101 : match val {
1120 5 : Value::Image(img) => self.produce_image_key(img),
1121 96 : Value::WalRecord(_) => self.produce_wal_key(val),
1122 : }
1123 101 : }
1124 96 : fn produce_wal_key(&mut self, val: &Value) {
1125 96 : self.wal_produced.num += 1;
1126 96 : self.wal_produced.size +=
1127 96 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1128 96 : }
1129 305 : fn produce_image_key(&mut self, val: &Bytes) {
1130 305 : self.image_produced.num += 1;
1131 305 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
1132 305 : }
1133 7 : fn discard_delta_layer(&mut self, original_size: u64) {
1134 7 : self.delta_layer_discarded.num += 1;
1135 7 : self.delta_layer_discarded.size += original_size;
1136 7 : }
1137 4 : fn discard_image_layer(&mut self, original_size: u64) {
1138 4 : self.image_layer_discarded.num += 1;
1139 4 : self.image_layer_discarded.size += original_size;
1140 4 : }
1141 12 : fn produce_delta_layer(&mut self, size: u64) {
1142 12 : self.delta_layer_produced.num += 1;
1143 12 : self.delta_layer_produced.size += size;
1144 12 : }
1145 15 : fn produce_image_layer(&mut self, size: u64) {
1146 15 : self.image_layer_produced.num += 1;
1147 15 : self.image_layer_produced.size += size;
1148 15 : }
1149 26 : fn finalize(&mut self) {
1150 26 : let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
1151 26 : let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
1152 26 : self.uncompressed_retention_ratio =
1153 26 : produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
1154 26 : let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
1155 26 : let produced_physical_size = self.image_layer_produced.size
1156 26 : + self.delta_layer_produced.size
1157 26 : + self.image_layer_discarded.size
1158 26 : + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
1159 26 : self.compressed_retention_ratio =
1160 26 : produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
1161 26 : }
1162 : }
1163 :
1164 : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
1165 : pub enum CompactionOutcome {
1166 : #[default]
1167 : /// No layers need to be compacted after this round. Compaction doesn't need
1168 : /// to be immediately scheduled.
1169 : Done,
1170 : /// Still has pending layers to be compacted after this round. Ideally, the scheduler
1171 : /// should immediately schedule another compaction.
1172 : Pending,
1173 : /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
1174 : /// guaranteed when `compaction_l0_first` is enabled).
1175 : YieldForL0,
1176 : /// Compaction was skipped, because the timeline is ineligible for compaction.
1177 : Skipped,
1178 : }
1179 :
1180 : impl Timeline {
1181 : /// TODO: cancellation
1182 : ///
1183 : /// Returns whether the compaction has pending tasks.
1184 182 : pub(crate) async fn compact_legacy(
1185 182 : self: &Arc<Self>,
1186 182 : cancel: &CancellationToken,
1187 182 : options: CompactOptions,
1188 182 : ctx: &RequestContext,
1189 182 : ) -> Result<CompactionOutcome, CompactionError> {
1190 182 : if options
1191 182 : .flags
1192 182 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
1193 : {
1194 0 : self.compact_with_gc(cancel, options, ctx).await?;
1195 0 : return Ok(CompactionOutcome::Done);
1196 182 : }
1197 182 :
1198 182 : if options.flags.contains(CompactFlags::DryRun) {
1199 0 : return Err(CompactionError::Other(anyhow!(
1200 0 : "dry-run mode is not supported for legacy compaction for now"
1201 0 : )));
1202 182 : }
1203 182 :
1204 182 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
1205 : // maybe useful in the future? could implement this at some point
1206 0 : return Err(CompactionError::Other(anyhow!(
1207 0 : "compaction range is not supported for legacy compaction for now"
1208 0 : )));
1209 182 : }
1210 182 :
1211 182 : // High level strategy for compaction / image creation:
1212 182 : //
1213 182 : // 1. First, do a L0 compaction to ensure we move the L0
1214 182 : // layers into the historic layer map get flat levels of
1215 182 : // layers. If we did not compact all L0 layers, we will
1216 182 : // prioritize compacting the timeline again and not do
1217 182 : // any of the compactions below.
1218 182 : //
1219 182 : // 2. Then, calculate the desired "partitioning" of the
1220 182 : // currently in-use key space. The goal is to partition the
1221 182 : // key space into roughly fixed-size chunks, but also take into
1222 182 : // account any existing image layers, and try to align the
1223 182 : // chunk boundaries with the existing image layers to avoid
1224 182 : // too much churn. Also try to align chunk boundaries with
1225 182 : // relation boundaries. In principle, we don't know about
1226 182 : // relation boundaries here, we just deal with key-value
1227 182 : // pairs, and the code in pgdatadir_mapping.rs knows how to
1228 182 : // map relations into key-value pairs. But in practice we know
1229 182 : // that 'field6' is the block number, and the fields 1-5
1230 182 : // identify a relation. This is just an optimization,
1231 182 : // though.
1232 182 : //
1233 182 : // 3. Once we know the partitioning, for each partition,
1234 182 : // decide if it's time to create a new image layer. The
1235 182 : // criteria is: there has been too much "churn" since the last
1236 182 : // image layer? The "churn" is fuzzy concept, it's a
1237 182 : // combination of too many delta files, or too much WAL in
1238 182 : // total in the delta file. Or perhaps: if creating an image
1239 182 : // file would allow to delete some older files.
1240 182 : //
1241 182 : // 4. In the end, if the tenant gets auto-sharded, we will run
1242 182 : // a shard-ancestor compaction.
1243 182 :
1244 182 : // Is the timeline being deleted?
1245 182 : if self.is_stopping() {
1246 0 : trace!("Dropping out of compaction on timeline shutdown");
1247 0 : return Err(CompactionError::ShuttingDown);
1248 182 : }
1249 182 :
1250 182 : let target_file_size = self.get_checkpoint_distance();
1251 :
1252 : // Define partitioning schema if needed
1253 :
1254 : // 1. L0 Compact
1255 182 : let l0_outcome = {
1256 182 : let timer = self.metrics.compact_time_histo.start_timer();
1257 182 : let l0_outcome = self
1258 182 : .compact_level0(
1259 182 : target_file_size,
1260 182 : options.flags.contains(CompactFlags::ForceL0Compaction),
1261 182 : ctx,
1262 182 : )
1263 182 : .await?;
1264 182 : timer.stop_and_record();
1265 182 : l0_outcome
1266 182 : };
1267 182 :
1268 182 : if options.flags.contains(CompactFlags::OnlyL0Compaction) {
1269 0 : return Ok(l0_outcome);
1270 182 : }
1271 182 :
1272 182 : // Yield if we have pending L0 compaction. The scheduler will do another pass.
1273 182 : if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
1274 0 : && options.flags.contains(CompactFlags::YieldForL0)
1275 : {
1276 0 : info!("image/ancestor compaction yielding for L0 compaction");
1277 0 : return Ok(CompactionOutcome::YieldForL0);
1278 182 : }
1279 182 :
1280 182 : let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
1281 182 :
1282 182 : // 2. Repartition and create image layers if necessary
1283 182 : match self
1284 182 : .repartition(
1285 182 : self.get_last_record_lsn(),
1286 182 : self.get_compaction_target_size(),
1287 182 : options.flags,
1288 182 : ctx,
1289 182 : )
1290 182 : .await
1291 : {
1292 182 : Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
1293 182 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
1294 182 : let image_ctx = RequestContextBuilder::from(ctx)
1295 182 : .access_stats_behavior(AccessStatsBehavior::Skip)
1296 182 : .attached_child();
1297 182 :
1298 182 : let mut partitioning = dense_partitioning;
1299 182 : partitioning
1300 182 : .parts
1301 182 : .extend(sparse_partitioning.into_dense().parts);
1302 :
1303 : // 3. Create new image layers for partitions that have been modified "enough".
1304 182 : let (image_layers, outcome) = self
1305 182 : .create_image_layers(
1306 182 : &partitioning,
1307 182 : lsn,
1308 182 : if options
1309 182 : .flags
1310 182 : .contains(CompactFlags::ForceImageLayerCreation)
1311 : {
1312 7 : ImageLayerCreationMode::Force
1313 : } else {
1314 175 : ImageLayerCreationMode::Try
1315 : },
1316 182 : &image_ctx,
1317 182 : self.last_image_layer_creation_status
1318 182 : .load()
1319 182 : .as_ref()
1320 182 : .clone(),
1321 182 : options.flags.contains(CompactFlags::YieldForL0),
1322 182 : )
1323 182 : .await
1324 182 : .inspect_err(|err| {
1325 : if let CreateImageLayersError::GetVectoredError(
1326 : GetVectoredError::MissingKey(_),
1327 0 : ) = err
1328 : {
1329 0 : critical!("missing key during compaction: {err:?}");
1330 0 : }
1331 182 : })?;
1332 :
1333 182 : self.last_image_layer_creation_status
1334 182 : .store(Arc::new(outcome.clone()));
1335 182 :
1336 182 : self.upload_new_image_layers(image_layers)?;
1337 182 : if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
1338 : // Yield and do not do any other kind of compaction.
1339 0 : info!(
1340 0 : "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
1341 : );
1342 0 : return Ok(CompactionOutcome::YieldForL0);
1343 182 : }
1344 : }
1345 :
1346 : Ok(_) => {
1347 0 : info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
1348 : }
1349 :
1350 : // Suppress errors when cancelled.
1351 0 : Err(_) if self.cancel.is_cancelled() => {}
1352 0 : Err(err) if err.is_cancel() => {}
1353 :
1354 : // Alert on critical errors that indicate data corruption.
1355 0 : Err(err) if err.is_critical() => {
1356 0 : critical!("could not compact, repartitioning keyspace failed: {err:?}");
1357 : }
1358 :
1359 : // Log other errors. No partitioning? This is normal, if the timeline was just created
1360 : // as an empty timeline. Also in unit tests, when we use the timeline as a simple
1361 : // key-value store, ignoring the datadir layout. Log the error but continue.
1362 0 : Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
1363 : };
1364 :
1365 182 : let partition_count = self.partitioning.read().0.0.parts.len();
1366 182 :
1367 182 : // 4. Shard ancestor compaction
1368 182 : if self.get_compaction_shard_ancestor() && self.shard_identity.count >= ShardCount::new(2) {
1369 : // Limit the number of layer rewrites to the number of partitions: this means its
1370 : // runtime should be comparable to a full round of image layer creations, rather than
1371 : // being potentially much longer.
1372 0 : let rewrite_max = partition_count;
1373 :
1374 0 : let outcome = self
1375 0 : .compact_shard_ancestors(
1376 0 : rewrite_max,
1377 0 : options.flags.contains(CompactFlags::YieldForL0),
1378 0 : ctx,
1379 0 : )
1380 0 : .await?;
1381 0 : match outcome {
1382 0 : CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
1383 0 : CompactionOutcome::Done | CompactionOutcome::Skipped => {}
1384 : }
1385 182 : }
1386 :
1387 182 : Ok(CompactionOutcome::Done)
1388 182 : }
1389 :
1390 : /// Check for layers that are elegible to be rewritten:
1391 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
1392 : /// we don't indefinitely retain keys in this shard that aren't needed.
1393 : /// - For future use: layers beyond pitr_interval that are in formats we would
1394 : /// rather not maintain compatibility with indefinitely.
1395 : ///
1396 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
1397 : /// how much work it will try to do in each compaction pass.
1398 0 : async fn compact_shard_ancestors(
1399 0 : self: &Arc<Self>,
1400 0 : rewrite_max: usize,
1401 0 : yield_for_l0: bool,
1402 0 : ctx: &RequestContext,
1403 0 : ) -> Result<CompactionOutcome, CompactionError> {
1404 0 : let mut outcome = CompactionOutcome::Done;
1405 0 : let mut drop_layers = Vec::new();
1406 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
1407 0 :
1408 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
1409 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
1410 0 : // pitr_interval, for example because a branchpoint references it.
1411 0 : //
1412 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
1413 0 : // are rewriting layers.
1414 0 : let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
1415 0 : let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
1416 :
1417 0 : let layers = self.layers.read().await;
1418 0 : let layers_iter = layers.layer_map()?.iter_historic_layers();
1419 0 : let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
1420 0 : for layer_desc in layers_iter {
1421 0 : layers_checked += 1;
1422 0 : let layer = layers.get_from_desc(&layer_desc);
1423 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
1424 : // This layer does not belong to a historic ancestor, no need to re-image it.
1425 0 : continue;
1426 0 : }
1427 0 :
1428 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
1429 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
1430 0 : let layer_local_page_count = sharded_range.page_count();
1431 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
1432 0 : if layer_local_page_count == 0 {
1433 : // This ancestral layer only covers keys that belong to other shards.
1434 : // We include the full metadata in the log: if we had some critical bug that caused
1435 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
1436 0 : debug!(%layer, old_metadata=?layer.metadata(),
1437 0 : "dropping layer after shard split, contains no keys for this shard",
1438 : );
1439 :
1440 0 : if cfg!(debug_assertions) {
1441 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
1442 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
1443 : // should be !is_key_disposable()
1444 : // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
1445 0 : let range = layer_desc.get_key_range();
1446 0 : let mut key = range.start;
1447 0 : while key < range.end {
1448 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
1449 0 : key = key.next();
1450 : }
1451 0 : }
1452 :
1453 0 : drop_layers.push(layer);
1454 0 : continue;
1455 0 : } else if layer_local_page_count != u32::MAX
1456 0 : && layer_local_page_count == layer_raw_page_count
1457 : {
1458 0 : debug!(%layer,
1459 0 : "layer is entirely shard local ({} keys), no need to filter it",
1460 : layer_local_page_count
1461 : );
1462 0 : continue;
1463 0 : }
1464 0 :
1465 0 : // Only rewrite a layer if we can reclaim significant space.
1466 0 : if layer_local_page_count != u32::MAX
1467 0 : && layer_local_page_count as f64 / layer_raw_page_count as f64
1468 0 : <= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
1469 : {
1470 0 : debug!(%layer,
1471 0 : "layer has a large share of local pages \
1472 0 : ({layer_local_page_count}/{layer_raw_page_count} > \
1473 0 : {ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
1474 : );
1475 0 : }
1476 :
1477 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
1478 : // without incurring the I/O cost of a rewrite.
1479 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
1480 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
1481 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
1482 0 : continue;
1483 0 : }
1484 0 :
1485 0 : // We do not yet implement rewrite of delta layers.
1486 0 : if layer_desc.is_delta() {
1487 0 : debug!(%layer, "Skipping rewrite of delta layer");
1488 0 : continue;
1489 0 : }
1490 0 :
1491 0 : // We don't bother rewriting layers that aren't visible, since these won't be needed by
1492 0 : // reads and will likely be garbage collected soon.
1493 0 : if layer.visibility() != LayerVisibilityHint::Visible {
1494 0 : debug!(%layer, "Skipping rewrite of invisible layer");
1495 0 : continue;
1496 0 : }
1497 0 :
1498 0 : // Only rewrite layers if their generations differ. This guarantees:
1499 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
1500 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
1501 0 : if layer.metadata().generation == self.generation {
1502 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
1503 0 : continue;
1504 0 : }
1505 0 :
1506 0 : if layers_to_rewrite.len() >= rewrite_max {
1507 0 : debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
1508 0 : layers_to_rewrite.len()
1509 : );
1510 0 : outcome = CompactionOutcome::Pending;
1511 0 : break;
1512 0 : }
1513 0 :
1514 0 : // Fall through: all our conditions for doing a rewrite passed.
1515 0 : layers_to_rewrite.push(layer);
1516 : }
1517 :
1518 : // Drop read lock on layer map before we start doing time-consuming I/O.
1519 0 : drop(layers);
1520 0 :
1521 0 : // Drop out early if there's nothing to do.
1522 0 : if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
1523 0 : return Ok(CompactionOutcome::Done);
1524 0 : }
1525 0 :
1526 0 : info!(
1527 0 : "starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
1528 0 : checked {layers_checked}/{layers_total} layers \
1529 0 : (latest_gc_cutoff={} pitr_cutoff={:?})",
1530 0 : layers_to_rewrite.len(),
1531 0 : drop_layers.len(),
1532 0 : *latest_gc_cutoff,
1533 : pitr_cutoff,
1534 : );
1535 0 : let started = Instant::now();
1536 0 :
1537 0 : let mut replace_image_layers = Vec::new();
1538 :
1539 0 : for layer in layers_to_rewrite {
1540 0 : if self.cancel.is_cancelled() {
1541 0 : return Err(CompactionError::ShuttingDown);
1542 0 : }
1543 0 :
1544 0 : info!(layer=%layer, "rewriting layer after shard split");
1545 0 : let mut image_layer_writer = ImageLayerWriter::new(
1546 0 : self.conf,
1547 0 : self.timeline_id,
1548 0 : self.tenant_shard_id,
1549 0 : &layer.layer_desc().key_range,
1550 0 : layer.layer_desc().image_layer_lsn(),
1551 0 : &self.gate,
1552 0 : self.cancel.clone(),
1553 0 : ctx,
1554 0 : )
1555 0 : .await
1556 0 : .map_err(CompactionError::Other)?;
1557 :
1558 : // Safety of layer rewrites:
1559 : // - We are writing to a different local file path than we are reading from, so the old Layer
1560 : // cannot interfere with the new one.
1561 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
1562 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
1563 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
1564 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
1565 : // - Any readers that have a reference to the old layer will keep it alive until they are done
1566 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
1567 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
1568 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
1569 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
1570 : // - ingestion, which only inserts layers, therefore cannot collide with us.
1571 0 : let resident = layer.download_and_keep_resident(ctx).await?;
1572 :
1573 0 : let keys_written = resident
1574 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
1575 0 : .await?;
1576 :
1577 0 : if keys_written > 0 {
1578 0 : let (desc, path) = image_layer_writer
1579 0 : .finish(ctx)
1580 0 : .await
1581 0 : .map_err(CompactionError::Other)?;
1582 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
1583 0 : .map_err(CompactionError::Other)?;
1584 0 : info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
1585 0 : layer.metadata().file_size,
1586 0 : new_layer.metadata().file_size);
1587 :
1588 0 : replace_image_layers.push((layer, new_layer));
1589 0 : } else {
1590 0 : // Drop the old layer. Usually for this case we would already have noticed that
1591 0 : // the layer has no data for us with the ShardedRange check above, but
1592 0 : drop_layers.push(layer);
1593 0 : }
1594 :
1595 : // Yield for L0 compaction if necessary, but make sure we update the layer map below
1596 : // with the work we've already done.
1597 0 : if yield_for_l0
1598 0 : && self
1599 0 : .l0_compaction_trigger
1600 0 : .notified()
1601 0 : .now_or_never()
1602 0 : .is_some()
1603 : {
1604 0 : info!("shard ancestor compaction yielding for L0 compaction");
1605 0 : outcome = CompactionOutcome::YieldForL0;
1606 0 : break;
1607 0 : }
1608 : }
1609 :
1610 0 : for layer in &drop_layers {
1611 0 : info!(%layer, old_metadata=?layer.metadata(),
1612 0 : "dropping layer after shard split (no keys for this shard)",
1613 : );
1614 : }
1615 :
1616 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
1617 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
1618 : // to remote index) and be removed. This is inefficient but safe.
1619 0 : fail::fail_point!("compact-shard-ancestors-localonly");
1620 0 :
1621 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
1622 0 : self.rewrite_layers(replace_image_layers, drop_layers)
1623 0 : .await?;
1624 :
1625 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
1626 0 :
1627 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
1628 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
1629 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
1630 0 : // load.
1631 0 : if outcome != CompactionOutcome::YieldForL0 {
1632 0 : info!("shard ancestor compaction waiting for uploads");
1633 0 : tokio::select! {
1634 0 : result = self.remote_client.wait_completion() => match result {
1635 0 : Ok(()) => {},
1636 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
1637 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
1638 0 : return Err(CompactionError::ShuttingDown);
1639 : }
1640 : },
1641 : // Don't wait if there's L0 compaction to do. We don't need to update the outcome
1642 : // here, because we've already done the actual work.
1643 0 : _ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
1644 : }
1645 0 : }
1646 :
1647 0 : info!(
1648 0 : "shard ancestor compaction done in {:.3}s{}",
1649 0 : started.elapsed().as_secs_f64(),
1650 0 : match outcome {
1651 : CompactionOutcome::Pending =>
1652 0 : format!(", with pending work (rewrite_max={rewrite_max})"),
1653 0 : CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
1654 0 : CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
1655 : }
1656 : );
1657 :
1658 0 : fail::fail_point!("compact-shard-ancestors-persistent");
1659 0 :
1660 0 : Ok(outcome)
1661 0 : }
1662 :
1663 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
1664 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
1665 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
1666 : ///
1667 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
1668 : /// that we know won't be needed for reads.
1669 121 : pub(crate) async fn update_layer_visibility(
1670 121 : &self,
1671 121 : ) -> Result<(), super::layer_manager::Shutdown> {
1672 121 : let head_lsn = self.get_last_record_lsn();
1673 :
1674 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
1675 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
1676 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
1677 : // they will be subject to L0->L1 compaction in the near future.
1678 121 : let layer_manager = self.layers.read().await;
1679 121 : let layer_map = layer_manager.layer_map()?;
1680 :
1681 121 : let readable_points = {
1682 121 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
1683 121 :
1684 121 : let mut readable_points = Vec::with_capacity(children.len() + 1);
1685 121 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
1686 0 : if *is_offloaded == MaybeOffloaded::Yes {
1687 0 : continue;
1688 0 : }
1689 0 : readable_points.push(*child_lsn);
1690 : }
1691 121 : readable_points.push(head_lsn);
1692 121 : readable_points
1693 121 : };
1694 121 :
1695 121 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
1696 304 : for (layer_desc, visibility) in layer_visibility {
1697 183 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
1698 183 : let layer = layer_manager.get_from_desc(&layer_desc);
1699 183 : layer.set_visibility(visibility);
1700 183 : }
1701 :
1702 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
1703 : // avoid assuming that everything at a branch point is visible.
1704 121 : drop(covered);
1705 121 : Ok(())
1706 121 : }
1707 :
1708 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1709 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1710 182 : async fn compact_level0(
1711 182 : self: &Arc<Self>,
1712 182 : target_file_size: u64,
1713 182 : force_compaction_ignore_threshold: bool,
1714 182 : ctx: &RequestContext,
1715 182 : ) -> Result<CompactionOutcome, CompactionError> {
1716 : let CompactLevel0Phase1Result {
1717 182 : new_layers,
1718 182 : deltas_to_compact,
1719 182 : outcome,
1720 : } = {
1721 182 : let phase1_span = info_span!("compact_level0_phase1");
1722 182 : let ctx = ctx.attached_child();
1723 182 : let mut stats = CompactLevel0Phase1StatsBuilder {
1724 182 : version: Some(2),
1725 182 : tenant_id: Some(self.tenant_shard_id),
1726 182 : timeline_id: Some(self.timeline_id),
1727 182 : ..Default::default()
1728 182 : };
1729 182 :
1730 182 : let begin = tokio::time::Instant::now();
1731 182 : let phase1_layers_locked = self.layers.read().await;
1732 182 : let now = tokio::time::Instant::now();
1733 182 : stats.read_lock_acquisition_micros =
1734 182 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1735 182 : self.compact_level0_phase1(
1736 182 : phase1_layers_locked,
1737 182 : stats,
1738 182 : target_file_size,
1739 182 : force_compaction_ignore_threshold,
1740 182 : &ctx,
1741 182 : )
1742 182 : .instrument(phase1_span)
1743 182 : .await?
1744 : };
1745 :
1746 182 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1747 : // nothing to do
1748 168 : return Ok(CompactionOutcome::Done);
1749 14 : }
1750 14 :
1751 14 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1752 14 : .await?;
1753 14 : Ok(outcome)
1754 182 : }
1755 :
1756 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1757 182 : async fn compact_level0_phase1<'a>(
1758 182 : self: &'a Arc<Self>,
1759 182 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1760 182 : mut stats: CompactLevel0Phase1StatsBuilder,
1761 182 : target_file_size: u64,
1762 182 : force_compaction_ignore_threshold: bool,
1763 182 : ctx: &RequestContext,
1764 182 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1765 182 : stats.read_lock_held_spawn_blocking_startup_micros =
1766 182 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1767 182 : let layers = guard.layer_map()?;
1768 182 : let level0_deltas = layers.level0_deltas();
1769 182 : stats.level0_deltas_count = Some(level0_deltas.len());
1770 182 :
1771 182 : // Only compact if enough layers have accumulated.
1772 182 : let threshold = self.get_compaction_threshold();
1773 182 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1774 168 : if force_compaction_ignore_threshold {
1775 0 : if !level0_deltas.is_empty() {
1776 0 : info!(
1777 0 : level0_deltas = level0_deltas.len(),
1778 0 : threshold, "too few deltas to compact, but forcing compaction"
1779 : );
1780 : } else {
1781 0 : info!(
1782 0 : level0_deltas = level0_deltas.len(),
1783 0 : threshold, "too few deltas to compact, cannot force compaction"
1784 : );
1785 0 : return Ok(CompactLevel0Phase1Result::default());
1786 : }
1787 : } else {
1788 168 : debug!(
1789 0 : level0_deltas = level0_deltas.len(),
1790 0 : threshold, "too few deltas to compact"
1791 : );
1792 168 : return Ok(CompactLevel0Phase1Result::default());
1793 : }
1794 14 : }
1795 :
1796 14 : let mut level0_deltas = level0_deltas
1797 14 : .iter()
1798 201 : .map(|x| guard.get_from_desc(x))
1799 14 : .collect::<Vec<_>>();
1800 14 :
1801 14 : // Gather the files to compact in this iteration.
1802 14 : //
1803 14 : // Start with the oldest Level 0 delta file, and collect any other
1804 14 : // level 0 files that form a contiguous sequence, such that the end
1805 14 : // LSN of previous file matches the start LSN of the next file.
1806 14 : //
1807 14 : // Note that if the files don't form such a sequence, we might
1808 14 : // "compact" just a single file. That's a bit pointless, but it allows
1809 14 : // us to get rid of the level 0 file, and compact the other files on
1810 14 : // the next iteration. This could probably made smarter, but such
1811 14 : // "gaps" in the sequence of level 0 files should only happen in case
1812 14 : // of a crash, partial download from cloud storage, or something like
1813 14 : // that, so it's not a big deal in practice.
1814 374 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1815 14 : let mut level0_deltas_iter = level0_deltas.iter();
1816 14 :
1817 14 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1818 14 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1819 14 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1820 14 :
1821 14 : // Accumulate the size of layers in `deltas_to_compact`
1822 14 : let mut deltas_to_compact_bytes = 0;
1823 14 :
1824 14 : // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
1825 14 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1826 14 : // work in this function to only operate on this much delta data at once.
1827 14 : //
1828 14 : // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
1829 14 : // the constraint is not respected, we use the larger of the two.
1830 14 : let delta_size_limit = std::cmp::max(
1831 14 : self.get_compaction_upper_limit(),
1832 14 : self.get_compaction_threshold(),
1833 14 : ) as u64
1834 14 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1835 14 :
1836 14 : let mut fully_compacted = true;
1837 14 :
1838 14 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
1839 201 : for l in level0_deltas_iter {
1840 187 : let lsn_range = &l.layer_desc().lsn_range;
1841 187 :
1842 187 : if lsn_range.start != prev_lsn_end {
1843 0 : break;
1844 187 : }
1845 187 : deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
1846 187 : deltas_to_compact_bytes += l.metadata().file_size;
1847 187 : prev_lsn_end = lsn_range.end;
1848 187 :
1849 187 : if deltas_to_compact_bytes >= delta_size_limit {
1850 0 : info!(
1851 0 : l0_deltas_selected = deltas_to_compact.len(),
1852 0 : l0_deltas_total = level0_deltas.len(),
1853 0 : "L0 compaction picker hit max delta layer size limit: {}",
1854 : delta_size_limit
1855 : );
1856 0 : fully_compacted = false;
1857 0 :
1858 0 : // Proceed with compaction, but only a subset of L0s
1859 0 : break;
1860 187 : }
1861 : }
1862 14 : let lsn_range = Range {
1863 14 : start: deltas_to_compact
1864 14 : .first()
1865 14 : .unwrap()
1866 14 : .layer_desc()
1867 14 : .lsn_range
1868 14 : .start,
1869 14 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1870 14 : };
1871 14 :
1872 14 : info!(
1873 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1874 0 : lsn_range.start,
1875 0 : lsn_range.end,
1876 0 : deltas_to_compact.len(),
1877 0 : level0_deltas.len()
1878 : );
1879 :
1880 201 : for l in deltas_to_compact.iter() {
1881 201 : info!("compact includes {l}");
1882 : }
1883 :
1884 : // We don't need the original list of layers anymore. Drop it so that
1885 : // we don't accidentally use it later in the function.
1886 14 : drop(level0_deltas);
1887 14 :
1888 14 : stats.read_lock_held_prerequisites_micros = stats
1889 14 : .read_lock_held_spawn_blocking_startup_micros
1890 14 : .till_now();
1891 :
1892 : // TODO: replace with streaming k-merge
1893 14 : let all_keys = {
1894 14 : let mut all_keys = Vec::new();
1895 201 : for l in deltas_to_compact.iter() {
1896 201 : if self.cancel.is_cancelled() {
1897 0 : return Err(CompactionError::ShuttingDown);
1898 201 : }
1899 201 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1900 201 : let keys = delta
1901 201 : .index_entries(ctx)
1902 201 : .await
1903 201 : .map_err(CompactionError::Other)?;
1904 201 : all_keys.extend(keys);
1905 : }
1906 : // The current stdlib sorting implementation is designed in a way where it is
1907 : // particularly fast where the slice is made up of sorted sub-ranges.
1908 2211890 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1909 14 : all_keys
1910 14 : };
1911 14 :
1912 14 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1913 :
1914 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1915 : //
1916 : // A hole is a key range for which this compaction doesn't have any WAL records.
1917 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1918 : // cover the hole, but actually don't contain any WAL records for that key range.
1919 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1920 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1921 : //
1922 : // The algorithm chooses holes as follows.
1923 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1924 : // - Filter: min threshold on range length
1925 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1926 : //
1927 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1928 : #[derive(PartialEq, Eq)]
1929 : struct Hole {
1930 : key_range: Range<Key>,
1931 : coverage_size: usize,
1932 : }
1933 14 : let holes: Vec<Hole> = {
1934 : use std::cmp::Ordering;
1935 : impl Ord for Hole {
1936 0 : fn cmp(&self, other: &Self) -> Ordering {
1937 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1938 0 : }
1939 : }
1940 : impl PartialOrd for Hole {
1941 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1942 0 : Some(self.cmp(other))
1943 0 : }
1944 : }
1945 14 : let max_holes = deltas_to_compact.len();
1946 14 : let last_record_lsn = self.get_last_record_lsn();
1947 14 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1948 14 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1949 14 : // min-heap (reserve space for one more element added before eviction)
1950 14 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1951 14 : let mut prev: Option<Key> = None;
1952 :
1953 1032019 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1954 1032019 : if let Some(prev_key) = prev {
1955 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1956 : // compaction is the gap between data key and metadata keys.
1957 1032005 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1958 0 : && !Key::is_metadata_key(&prev_key)
1959 : {
1960 0 : let key_range = prev_key..next_key;
1961 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1962 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1963 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1964 0 : // That is why it is better to measure size of hole as number of covering image layers.
1965 0 : let coverage_size =
1966 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1967 0 : if coverage_size >= min_hole_coverage_size {
1968 0 : heap.push(Hole {
1969 0 : key_range,
1970 0 : coverage_size,
1971 0 : });
1972 0 : if heap.len() > max_holes {
1973 0 : heap.pop(); // remove smallest hole
1974 0 : }
1975 0 : }
1976 1032005 : }
1977 14 : }
1978 1032019 : prev = Some(next_key.next());
1979 : }
1980 14 : let mut holes = heap.into_vec();
1981 14 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1982 14 : holes
1983 14 : };
1984 14 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1985 14 : drop_rlock(guard);
1986 14 :
1987 14 : if self.cancel.is_cancelled() {
1988 0 : return Err(CompactionError::ShuttingDown);
1989 14 : }
1990 14 :
1991 14 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1992 :
1993 : // This iterator walks through all key-value pairs from all the layers
1994 : // we're compacting, in key, LSN order.
1995 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1996 : // then the Value::Image is ordered before Value::WalRecord.
1997 14 : let mut all_values_iter = {
1998 14 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1999 201 : for l in deltas_to_compact.iter() {
2000 201 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
2001 201 : deltas.push(l);
2002 : }
2003 14 : MergeIterator::create_with_options(
2004 14 : &deltas,
2005 14 : &[],
2006 14 : ctx,
2007 14 : 1024 * 8192, /* 8 MiB buffer per layer iterator */
2008 14 : 1024,
2009 14 : )
2010 14 : };
2011 14 :
2012 14 : // This iterator walks through all keys and is needed to calculate size used by each key
2013 14 : let mut all_keys_iter = all_keys
2014 14 : .iter()
2015 1032019 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
2016 1032005 : .coalesce(|mut prev, cur| {
2017 1032005 : // Coalesce keys that belong to the same key pair.
2018 1032005 : // This ensures that compaction doesn't put them
2019 1032005 : // into different layer files.
2020 1032005 : // Still limit this by the target file size,
2021 1032005 : // so that we keep the size of the files in
2022 1032005 : // check.
2023 1032005 : if prev.0 == cur.0 && prev.2 < target_file_size {
2024 20019 : prev.2 += cur.2;
2025 20019 : Ok(prev)
2026 : } else {
2027 1011986 : Err((prev, cur))
2028 : }
2029 1032005 : });
2030 14 :
2031 14 : // Merge the contents of all the input delta layers into a new set
2032 14 : // of delta layers, based on the current partitioning.
2033 14 : //
2034 14 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
2035 14 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
2036 14 : // would be too large. In that case, we also split on the LSN dimension.
2037 14 : //
2038 14 : // LSN
2039 14 : // ^
2040 14 : // |
2041 14 : // | +-----------+ +--+--+--+--+
2042 14 : // | | | | | | | |
2043 14 : // | +-----------+ | | | | |
2044 14 : // | | | | | | | |
2045 14 : // | +-----------+ ==> | | | | |
2046 14 : // | | | | | | | |
2047 14 : // | +-----------+ | | | | |
2048 14 : // | | | | | | | |
2049 14 : // | +-----------+ +--+--+--+--+
2050 14 : // |
2051 14 : // +--------------> key
2052 14 : //
2053 14 : //
2054 14 : // If one key (X) has a lot of page versions:
2055 14 : //
2056 14 : // LSN
2057 14 : // ^
2058 14 : // | (X)
2059 14 : // | +-----------+ +--+--+--+--+
2060 14 : // | | | | | | | |
2061 14 : // | +-----------+ | | +--+ |
2062 14 : // | | | | | | | |
2063 14 : // | +-----------+ ==> | | | | |
2064 14 : // | | | | | +--+ |
2065 14 : // | +-----------+ | | | | |
2066 14 : // | | | | | | | |
2067 14 : // | +-----------+ +--+--+--+--+
2068 14 : // |
2069 14 : // +--------------> key
2070 14 : // TODO: this actually divides the layers into fixed-size chunks, not
2071 14 : // based on the partitioning.
2072 14 : //
2073 14 : // TODO: we should also opportunistically materialize and
2074 14 : // garbage collect what we can.
2075 14 : let mut new_layers = Vec::new();
2076 14 : let mut prev_key: Option<Key> = None;
2077 14 : let mut writer: Option<DeltaLayerWriter> = None;
2078 14 : let mut key_values_total_size = 0u64;
2079 14 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
2080 14 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
2081 14 : let mut next_hole = 0; // index of next hole in holes vector
2082 14 :
2083 14 : let mut keys = 0;
2084 :
2085 1032033 : while let Some((key, lsn, value)) = all_values_iter
2086 1032033 : .next()
2087 1032033 : .await
2088 1032033 : .map_err(CompactionError::Other)?
2089 : {
2090 1032019 : keys += 1;
2091 1032019 :
2092 1032019 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
2093 : // avoid hitting the cancellation token on every key. in benches, we end up
2094 : // shuffling an order of million keys per layer, this means we'll check it
2095 : // around tens of times per layer.
2096 0 : return Err(CompactionError::ShuttingDown);
2097 1032019 : }
2098 1032019 :
2099 1032019 : let same_key = prev_key == Some(key);
2100 1032019 : // We need to check key boundaries once we reach next key or end of layer with the same key
2101 1032019 : if !same_key || lsn == dup_end_lsn {
2102 1012000 : let mut next_key_size = 0u64;
2103 1012000 : let is_dup_layer = dup_end_lsn.is_valid();
2104 1012000 : dup_start_lsn = Lsn::INVALID;
2105 1012000 : if !same_key {
2106 1012000 : dup_end_lsn = Lsn::INVALID;
2107 1012000 : }
2108 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
2109 1012000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
2110 1012000 : next_key_size = next_size;
2111 1012000 : if key != next_key {
2112 1011986 : if dup_end_lsn.is_valid() {
2113 0 : // We are writting segment with duplicates:
2114 0 : // place all remaining values of this key in separate segment
2115 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
2116 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
2117 1011986 : }
2118 1011986 : break;
2119 14 : }
2120 14 : key_values_total_size += next_size;
2121 14 : // Check if it is time to split segment: if total keys size is larger than target file size.
2122 14 : // We need to avoid generation of empty segments if next_size > target_file_size.
2123 14 : if key_values_total_size > target_file_size && lsn != next_lsn {
2124 : // Split key between multiple layers: such layer can contain only single key
2125 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
2126 0 : dup_end_lsn // new segment with duplicates starts where old one stops
2127 : } else {
2128 0 : lsn // start with the first LSN for this key
2129 : };
2130 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
2131 0 : break;
2132 14 : }
2133 : }
2134 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
2135 1012000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
2136 0 : dup_start_lsn = dup_end_lsn;
2137 0 : dup_end_lsn = lsn_range.end;
2138 1012000 : }
2139 1012000 : if writer.is_some() {
2140 1011986 : let written_size = writer.as_mut().unwrap().size();
2141 1011986 : let contains_hole =
2142 1011986 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
2143 : // check if key cause layer overflow or contains hole...
2144 1011986 : if is_dup_layer
2145 1011986 : || dup_end_lsn.is_valid()
2146 1011986 : || written_size + key_values_total_size > target_file_size
2147 1011846 : || contains_hole
2148 : {
2149 : // ... if so, flush previous layer and prepare to write new one
2150 140 : let (desc, path) = writer
2151 140 : .take()
2152 140 : .unwrap()
2153 140 : .finish(prev_key.unwrap().next(), ctx)
2154 140 : .await
2155 140 : .map_err(CompactionError::Other)?;
2156 140 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2157 140 : .map_err(CompactionError::Other)?;
2158 :
2159 140 : new_layers.push(new_delta);
2160 140 : writer = None;
2161 140 :
2162 140 : if contains_hole {
2163 0 : // skip hole
2164 0 : next_hole += 1;
2165 140 : }
2166 1011846 : }
2167 14 : }
2168 : // Remember size of key value because at next iteration we will access next item
2169 1012000 : key_values_total_size = next_key_size;
2170 20019 : }
2171 1032019 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2172 0 : Err(CompactionError::Other(anyhow::anyhow!(
2173 0 : "failpoint delta-layer-writer-fail-before-finish"
2174 0 : )))
2175 1032019 : });
2176 :
2177 1032019 : if !self.shard_identity.is_key_disposable(&key) {
2178 1032019 : if writer.is_none() {
2179 154 : if self.cancel.is_cancelled() {
2180 : // to be somewhat responsive to cancellation, check for each new layer
2181 0 : return Err(CompactionError::ShuttingDown);
2182 154 : }
2183 : // Create writer if not initiaized yet
2184 154 : writer = Some(
2185 : DeltaLayerWriter::new(
2186 154 : self.conf,
2187 154 : self.timeline_id,
2188 154 : self.tenant_shard_id,
2189 154 : key,
2190 154 : if dup_end_lsn.is_valid() {
2191 : // this is a layer containing slice of values of the same key
2192 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
2193 0 : dup_start_lsn..dup_end_lsn
2194 : } else {
2195 154 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2196 154 : lsn_range.clone()
2197 : },
2198 154 : &self.gate,
2199 154 : self.cancel.clone(),
2200 154 : ctx,
2201 154 : )
2202 154 : .await
2203 154 : .map_err(CompactionError::Other)?,
2204 : );
2205 :
2206 154 : keys = 0;
2207 1031865 : }
2208 :
2209 1032019 : writer
2210 1032019 : .as_mut()
2211 1032019 : .unwrap()
2212 1032019 : .put_value(key, lsn, value, ctx)
2213 1032019 : .await?;
2214 : } else {
2215 0 : let owner = self.shard_identity.get_shard_number(&key);
2216 0 :
2217 0 : // This happens after a shard split, when we're compacting an L0 created by our parent shard
2218 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
2219 : }
2220 :
2221 1032019 : if !new_layers.is_empty() {
2222 9893 : fail_point!("after-timeline-compacted-first-L1");
2223 1022126 : }
2224 :
2225 1032019 : prev_key = Some(key);
2226 : }
2227 14 : if let Some(writer) = writer {
2228 14 : let (desc, path) = writer
2229 14 : .finish(prev_key.unwrap().next(), ctx)
2230 14 : .await
2231 14 : .map_err(CompactionError::Other)?;
2232 14 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2233 14 : .map_err(CompactionError::Other)?;
2234 14 : new_layers.push(new_delta);
2235 0 : }
2236 :
2237 : // Sync layers
2238 14 : if !new_layers.is_empty() {
2239 : // Print a warning if the created layer is larger than double the target size
2240 : // Add two pages for potential overhead. This should in theory be already
2241 : // accounted for in the target calculation, but for very small targets,
2242 : // we still might easily hit the limit otherwise.
2243 14 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
2244 154 : for layer in new_layers.iter() {
2245 154 : if layer.layer_desc().file_size > warn_limit {
2246 0 : warn!(
2247 : %layer,
2248 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
2249 : );
2250 154 : }
2251 : }
2252 :
2253 : // The writer.finish() above already did the fsync of the inodes.
2254 : // We just need to fsync the directory in which these inodes are linked,
2255 : // which we know to be the timeline directory.
2256 : //
2257 : // We use fatal_err() below because the after writer.finish() returns with success,
2258 : // the in-memory state of the filesystem already has the layer file in its final place,
2259 : // and subsequent pageserver code could think it's durable while it really isn't.
2260 14 : let timeline_dir = VirtualFile::open(
2261 14 : &self
2262 14 : .conf
2263 14 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
2264 14 : ctx,
2265 14 : )
2266 14 : .await
2267 14 : .fatal_err("VirtualFile::open for timeline dir fsync");
2268 14 : timeline_dir
2269 14 : .sync_all()
2270 14 : .await
2271 14 : .fatal_err("VirtualFile::sync_all timeline dir");
2272 0 : }
2273 :
2274 14 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
2275 14 : stats.new_deltas_count = Some(new_layers.len());
2276 154 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
2277 14 :
2278 14 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
2279 14 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
2280 : {
2281 14 : Ok(stats_json) => {
2282 14 : info!(
2283 0 : stats_json = stats_json.as_str(),
2284 0 : "compact_level0_phase1 stats available"
2285 : )
2286 : }
2287 0 : Err(e) => {
2288 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
2289 : }
2290 : }
2291 :
2292 : // Without this, rustc complains about deltas_to_compact still
2293 : // being borrowed when we `.into_iter()` below.
2294 14 : drop(all_values_iter);
2295 14 :
2296 14 : Ok(CompactLevel0Phase1Result {
2297 14 : new_layers,
2298 14 : deltas_to_compact: deltas_to_compact
2299 14 : .into_iter()
2300 201 : .map(|x| x.drop_eviction_guard())
2301 14 : .collect::<Vec<_>>(),
2302 14 : outcome: if fully_compacted {
2303 14 : CompactionOutcome::Done
2304 : } else {
2305 0 : CompactionOutcome::Pending
2306 : },
2307 : })
2308 182 : }
2309 : }
2310 :
2311 : #[derive(Default)]
2312 : struct CompactLevel0Phase1Result {
2313 : new_layers: Vec<ResidentLayer>,
2314 : deltas_to_compact: Vec<Layer>,
2315 : // Whether we have included all L0 layers, or selected only part of them due to the
2316 : // L0 compaction size limit.
2317 : outcome: CompactionOutcome,
2318 : }
2319 :
2320 : #[derive(Default)]
2321 : struct CompactLevel0Phase1StatsBuilder {
2322 : version: Option<u64>,
2323 : tenant_id: Option<TenantShardId>,
2324 : timeline_id: Option<TimelineId>,
2325 : read_lock_acquisition_micros: DurationRecorder,
2326 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
2327 : read_lock_held_key_sort_micros: DurationRecorder,
2328 : read_lock_held_prerequisites_micros: DurationRecorder,
2329 : read_lock_held_compute_holes_micros: DurationRecorder,
2330 : read_lock_drop_micros: DurationRecorder,
2331 : write_layer_files_micros: DurationRecorder,
2332 : level0_deltas_count: Option<usize>,
2333 : new_deltas_count: Option<usize>,
2334 : new_deltas_size: Option<u64>,
2335 : }
2336 :
2337 : #[derive(serde::Serialize)]
2338 : struct CompactLevel0Phase1Stats {
2339 : version: u64,
2340 : tenant_id: TenantShardId,
2341 : timeline_id: TimelineId,
2342 : read_lock_acquisition_micros: RecordedDuration,
2343 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
2344 : read_lock_held_key_sort_micros: RecordedDuration,
2345 : read_lock_held_prerequisites_micros: RecordedDuration,
2346 : read_lock_held_compute_holes_micros: RecordedDuration,
2347 : read_lock_drop_micros: RecordedDuration,
2348 : write_layer_files_micros: RecordedDuration,
2349 : level0_deltas_count: usize,
2350 : new_deltas_count: usize,
2351 : new_deltas_size: u64,
2352 : }
2353 :
2354 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
2355 : type Error = anyhow::Error;
2356 :
2357 14 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
2358 14 : Ok(Self {
2359 14 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
2360 14 : tenant_id: value
2361 14 : .tenant_id
2362 14 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
2363 14 : timeline_id: value
2364 14 : .timeline_id
2365 14 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
2366 14 : read_lock_acquisition_micros: value
2367 14 : .read_lock_acquisition_micros
2368 14 : .into_recorded()
2369 14 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
2370 14 : read_lock_held_spawn_blocking_startup_micros: value
2371 14 : .read_lock_held_spawn_blocking_startup_micros
2372 14 : .into_recorded()
2373 14 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
2374 14 : read_lock_held_key_sort_micros: value
2375 14 : .read_lock_held_key_sort_micros
2376 14 : .into_recorded()
2377 14 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
2378 14 : read_lock_held_prerequisites_micros: value
2379 14 : .read_lock_held_prerequisites_micros
2380 14 : .into_recorded()
2381 14 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
2382 14 : read_lock_held_compute_holes_micros: value
2383 14 : .read_lock_held_compute_holes_micros
2384 14 : .into_recorded()
2385 14 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
2386 14 : read_lock_drop_micros: value
2387 14 : .read_lock_drop_micros
2388 14 : .into_recorded()
2389 14 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
2390 14 : write_layer_files_micros: value
2391 14 : .write_layer_files_micros
2392 14 : .into_recorded()
2393 14 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
2394 14 : level0_deltas_count: value
2395 14 : .level0_deltas_count
2396 14 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
2397 14 : new_deltas_count: value
2398 14 : .new_deltas_count
2399 14 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
2400 14 : new_deltas_size: value
2401 14 : .new_deltas_size
2402 14 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
2403 : })
2404 14 : }
2405 : }
2406 :
2407 : impl Timeline {
2408 : /// Entry point for new tiered compaction algorithm.
2409 : ///
2410 : /// All the real work is in the implementation in the pageserver_compaction
2411 : /// crate. The code here would apply to any algorithm implemented by the
2412 : /// same interface, but tiered is the only one at the moment.
2413 : ///
2414 : /// TODO: cancellation
2415 0 : pub(crate) async fn compact_tiered(
2416 0 : self: &Arc<Self>,
2417 0 : _cancel: &CancellationToken,
2418 0 : ctx: &RequestContext,
2419 0 : ) -> Result<(), CompactionError> {
2420 0 : let fanout = self.get_compaction_threshold() as u64;
2421 0 : let target_file_size = self.get_checkpoint_distance();
2422 :
2423 : // Find the top of the historical layers
2424 0 : let end_lsn = {
2425 0 : let guard = self.layers.read().await;
2426 0 : let layers = guard.layer_map()?;
2427 :
2428 0 : let l0_deltas = layers.level0_deltas();
2429 0 :
2430 0 : // As an optimization, if we find that there are too few L0 layers,
2431 0 : // bail out early. We know that the compaction algorithm would do
2432 0 : // nothing in that case.
2433 0 : if l0_deltas.len() < fanout as usize {
2434 : // doesn't need compacting
2435 0 : return Ok(());
2436 0 : }
2437 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
2438 0 : };
2439 0 :
2440 0 : // Is the timeline being deleted?
2441 0 : if self.is_stopping() {
2442 0 : trace!("Dropping out of compaction on timeline shutdown");
2443 0 : return Err(CompactionError::ShuttingDown);
2444 0 : }
2445 :
2446 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
2447 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
2448 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
2449 0 :
2450 0 : pageserver_compaction::compact_tiered::compact_tiered(
2451 0 : &mut adaptor,
2452 0 : end_lsn,
2453 0 : target_file_size,
2454 0 : fanout,
2455 0 : ctx,
2456 0 : )
2457 0 : .await
2458 : // TODO: compact_tiered needs to return CompactionError
2459 0 : .map_err(CompactionError::Other)?;
2460 :
2461 0 : adaptor.flush_updates().await?;
2462 0 : Ok(())
2463 0 : }
2464 :
2465 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
2466 : ///
2467 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
2468 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
2469 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
2470 : ///
2471 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
2472 : ///
2473 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
2474 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
2475 : ///
2476 : /// The function will produce:
2477 : ///
2478 : /// ```plain
2479 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
2480 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
2481 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
2482 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
2483 : /// ```
2484 : ///
2485 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
2486 : #[allow(clippy::too_many_arguments)]
2487 324 : pub(crate) async fn generate_key_retention(
2488 324 : self: &Arc<Timeline>,
2489 324 : key: Key,
2490 324 : full_history: &[(Key, Lsn, Value)],
2491 324 : horizon: Lsn,
2492 324 : retain_lsn_below_horizon: &[Lsn],
2493 324 : delta_threshold_cnt: usize,
2494 324 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
2495 324 : verification: bool,
2496 324 : ) -> anyhow::Result<KeyHistoryRetention> {
2497 : // Pre-checks for the invariants
2498 :
2499 324 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2500 :
2501 324 : if debug_mode {
2502 786 : for (log_key, _, _) in full_history {
2503 462 : assert_eq!(log_key, &key, "mismatched key");
2504 : }
2505 324 : for i in 1..full_history.len() {
2506 138 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
2507 138 : if full_history[i - 1].1 == full_history[i].1 {
2508 0 : assert!(
2509 0 : matches!(full_history[i - 1].2, Value::Image(_)),
2510 0 : "unordered delta/image, or duplicated delta"
2511 : );
2512 138 : }
2513 : }
2514 : // There was an assertion for no base image that checks if the first
2515 : // record in the history is `will_init` before, but it was removed.
2516 : // This is explained in the test cases for generate_key_retention.
2517 : // Search "incomplete history" for more information.
2518 714 : for lsn in retain_lsn_below_horizon {
2519 390 : assert!(lsn < &horizon, "retain lsn must be below horizon")
2520 : }
2521 324 : for i in 1..retain_lsn_below_horizon.len() {
2522 178 : assert!(
2523 178 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
2524 0 : "unordered LSN"
2525 : );
2526 : }
2527 0 : }
2528 324 : let has_ancestor = base_img_from_ancestor.is_some();
2529 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
2530 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
2531 324 : let (mut split_history, lsn_split_points) = {
2532 324 : let mut split_history = Vec::new();
2533 324 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
2534 324 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
2535 714 : for lsn in retain_lsn_below_horizon {
2536 390 : lsn_split_points.push(*lsn);
2537 390 : }
2538 324 : lsn_split_points.push(horizon);
2539 324 : let mut current_idx = 0;
2540 786 : for item @ (_, lsn, _) in full_history {
2541 584 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
2542 122 : current_idx += 1;
2543 122 : }
2544 462 : split_history[current_idx].push(item);
2545 : }
2546 324 : (split_history, lsn_split_points)
2547 : };
2548 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
2549 1362 : for split_for_lsn in &mut split_history {
2550 1038 : let mut prev_lsn = None;
2551 1038 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
2552 1038 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
2553 462 : if let Some(prev_lsn) = &prev_lsn {
2554 62 : if *prev_lsn == lsn {
2555 : // The case that we have an LSN with both data from the delta layer and the image layer. As
2556 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
2557 : // drop this delta and keep the image.
2558 : //
2559 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
2560 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
2561 : // dropped.
2562 : //
2563 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
2564 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
2565 0 : continue;
2566 62 : }
2567 400 : }
2568 462 : prev_lsn = Some(lsn);
2569 462 : new_split_for_lsn.push(record);
2570 : }
2571 1038 : *split_for_lsn = new_split_for_lsn;
2572 : }
2573 : // Step 3: generate images when necessary
2574 324 : let mut retention = Vec::with_capacity(split_history.len());
2575 324 : let mut records_since_last_image = 0;
2576 324 : let batch_cnt = split_history.len();
2577 324 : assert!(
2578 324 : batch_cnt >= 2,
2579 0 : "should have at least below + above horizon batches"
2580 : );
2581 324 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
2582 324 : if let Some((key, lsn, ref img)) = base_img_from_ancestor {
2583 21 : replay_history.push((key, lsn, Value::Image(img.clone())));
2584 303 : }
2585 :
2586 : /// Generate debug information for the replay history
2587 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
2588 : use std::fmt::Write;
2589 0 : let mut output = String::new();
2590 0 : if let Some((key, _, _)) = replay_history.first() {
2591 0 : write!(output, "key={} ", key).unwrap();
2592 0 : let mut cnt = 0;
2593 0 : for (_, lsn, val) in replay_history {
2594 0 : if val.is_image() {
2595 0 : write!(output, "i@{} ", lsn).unwrap();
2596 0 : } else if val.will_init() {
2597 0 : write!(output, "di@{} ", lsn).unwrap();
2598 0 : } else {
2599 0 : write!(output, "d@{} ", lsn).unwrap();
2600 0 : }
2601 0 : cnt += 1;
2602 0 : if cnt >= 128 {
2603 0 : write!(output, "... and more").unwrap();
2604 0 : break;
2605 0 : }
2606 : }
2607 0 : } else {
2608 0 : write!(output, "<no history>").unwrap();
2609 0 : }
2610 0 : output
2611 0 : }
2612 :
2613 0 : fn generate_debug_trace(
2614 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
2615 0 : full_history: &[(Key, Lsn, Value)],
2616 0 : lsns: &[Lsn],
2617 0 : horizon: Lsn,
2618 0 : ) -> String {
2619 : use std::fmt::Write;
2620 0 : let mut output = String::new();
2621 0 : if let Some(replay_history) = replay_history {
2622 0 : writeln!(
2623 0 : output,
2624 0 : "replay_history: {}",
2625 0 : generate_history_trace(replay_history)
2626 0 : )
2627 0 : .unwrap();
2628 0 : } else {
2629 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
2630 0 : }
2631 0 : writeln!(
2632 0 : output,
2633 0 : "full_history: {}",
2634 0 : generate_history_trace(full_history)
2635 0 : )
2636 0 : .unwrap();
2637 0 : writeln!(
2638 0 : output,
2639 0 : "when processing: [{}] horizon={}",
2640 0 : lsns.iter().map(|l| format!("{l}")).join(","),
2641 0 : horizon
2642 0 : )
2643 0 : .unwrap();
2644 0 : output
2645 0 : }
2646 :
2647 324 : let mut key_exists = false;
2648 1037 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
2649 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
2650 1037 : records_since_last_image += split_for_lsn.len();
2651 : // Whether to produce an image into the final layer files
2652 1037 : let produce_image = if i == 0 && !has_ancestor {
2653 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
2654 303 : true
2655 734 : } else if i == batch_cnt - 1 {
2656 : // Do not generate images for the last batch (above horizon)
2657 323 : false
2658 411 : } else if records_since_last_image == 0 {
2659 322 : false
2660 89 : } else if records_since_last_image >= delta_threshold_cnt {
2661 : // Generate images when there are too many records
2662 3 : true
2663 : } else {
2664 86 : false
2665 : };
2666 1037 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
2667 : // Only retain the items after the last image record
2668 1277 : for idx in (0..replay_history.len()).rev() {
2669 1277 : if replay_history[idx].2.will_init() {
2670 1037 : replay_history = replay_history[idx..].to_vec();
2671 1037 : break;
2672 240 : }
2673 : }
2674 1037 : if replay_history.is_empty() && !key_exists {
2675 : // The key does not exist at earlier LSN, we can skip this iteration.
2676 0 : retention.push(Vec::new());
2677 0 : continue;
2678 1037 : } else {
2679 1037 : key_exists = true;
2680 1037 : }
2681 1037 : let Some((_, _, val)) = replay_history.first() else {
2682 0 : unreachable!("replay history should not be empty once it exists")
2683 : };
2684 1037 : if !val.will_init() {
2685 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
2686 0 : generate_debug_trace(
2687 0 : Some(&replay_history),
2688 0 : full_history,
2689 0 : retain_lsn_below_horizon,
2690 0 : horizon,
2691 0 : )
2692 0 : });
2693 1037 : }
2694 : // Whether to reconstruct the image. In debug mode, we will generate an image
2695 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
2696 : // image into the final layer.
2697 1037 : let img_and_lsn = if produce_image {
2698 306 : records_since_last_image = 0;
2699 306 : let replay_history_for_debug = if debug_mode {
2700 306 : Some(replay_history.clone())
2701 : } else {
2702 0 : None
2703 : };
2704 306 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
2705 306 : let history = std::mem::take(&mut replay_history);
2706 306 : let mut img = None;
2707 306 : let mut records = Vec::with_capacity(history.len());
2708 306 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
2709 295 : img = Some((*lsn, val.clone()));
2710 295 : for (_, lsn, val) in history.into_iter().skip(1) {
2711 20 : let Value::WalRecord(rec) = val else {
2712 0 : return Err(anyhow::anyhow!(
2713 0 : "invalid record, first record is image, expect walrecords"
2714 0 : ))
2715 0 : .with_context(|| {
2716 0 : generate_debug_trace(
2717 0 : replay_history_for_debug_ref,
2718 0 : full_history,
2719 0 : retain_lsn_below_horizon,
2720 0 : horizon,
2721 0 : )
2722 0 : });
2723 : };
2724 20 : records.push((lsn, rec));
2725 : }
2726 : } else {
2727 18 : for (_, lsn, val) in history.into_iter() {
2728 18 : let Value::WalRecord(rec) = val else {
2729 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2730 0 : .with_context(|| generate_debug_trace(
2731 0 : replay_history_for_debug_ref,
2732 0 : full_history,
2733 0 : retain_lsn_below_horizon,
2734 0 : horizon,
2735 0 : ));
2736 : };
2737 18 : records.push((lsn, rec));
2738 : }
2739 : }
2740 : // WAL redo requires records in the reverse LSN order
2741 306 : records.reverse();
2742 306 : let state = ValueReconstructState { img, records };
2743 : // last batch does not generate image so i is always in range, unless we force generate
2744 : // an image during testing
2745 306 : let request_lsn = if i >= lsn_split_points.len() {
2746 0 : Lsn::MAX
2747 : } else {
2748 306 : lsn_split_points[i]
2749 : };
2750 306 : let img = self
2751 306 : .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
2752 306 : .await?;
2753 305 : Some((request_lsn, img))
2754 : } else {
2755 731 : None
2756 : };
2757 1036 : if produce_image {
2758 305 : let (request_lsn, img) = img_and_lsn.unwrap();
2759 305 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2760 305 : retention.push(vec![(request_lsn, Value::Image(img))]);
2761 731 : } else {
2762 731 : let deltas = split_for_lsn
2763 731 : .iter()
2764 731 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2765 731 : .collect_vec();
2766 731 : retention.push(deltas);
2767 731 : }
2768 : }
2769 323 : let mut result = Vec::with_capacity(retention.len());
2770 323 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2771 1036 : for (idx, logs) in retention.into_iter().enumerate() {
2772 1036 : if idx == lsn_split_points.len() {
2773 323 : let retention = KeyHistoryRetention {
2774 323 : below_horizon: result,
2775 323 : above_horizon: KeyLogAtLsn(logs),
2776 323 : };
2777 323 : if verification {
2778 323 : retention
2779 323 : .verify(key, &base_img_from_ancestor, full_history, self)
2780 323 : .await?;
2781 0 : }
2782 323 : return Ok(retention);
2783 713 : } else {
2784 713 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2785 713 : }
2786 : }
2787 0 : unreachable!("key retention is empty")
2788 324 : }
2789 :
2790 : /// Check how much space is left on the disk
2791 27 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2792 27 : let tenants_dir = self.conf.tenants_path();
2793 :
2794 27 : let stat = Statvfs::get(&tenants_dir, None)
2795 27 : .context("statvfs failed, presumably directory got unlinked")?;
2796 :
2797 27 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2798 27 :
2799 27 : Ok(avail_bytes)
2800 27 : }
2801 :
2802 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2803 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2804 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2805 : /// compaction.
2806 27 : async fn check_compaction_space(
2807 27 : self: &Arc<Self>,
2808 27 : layer_selection: &[Layer],
2809 27 : ) -> Result<(), CompactionError> {
2810 27 : let available_space = self
2811 27 : .check_available_space()
2812 27 : .await
2813 27 : .map_err(CompactionError::Other)?;
2814 27 : let mut remote_layer_size = 0;
2815 27 : let mut all_layer_size = 0;
2816 106 : for layer in layer_selection {
2817 79 : let needs_download = layer
2818 79 : .needs_download()
2819 79 : .await
2820 79 : .context("failed to check if layer needs download")
2821 79 : .map_err(CompactionError::Other)?;
2822 79 : if needs_download.is_some() {
2823 0 : remote_layer_size += layer.layer_desc().file_size;
2824 79 : }
2825 79 : all_layer_size += layer.layer_desc().file_size;
2826 : }
2827 27 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2828 27 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2829 : {
2830 0 : return Err(CompactionError::Other(anyhow!(
2831 0 : "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2832 0 : available_space,
2833 0 : allocated_space,
2834 0 : all_layer_size,
2835 0 : remote_layer_size,
2836 0 : all_layer_size + remote_layer_size
2837 0 : )));
2838 27 : }
2839 27 : Ok(())
2840 27 : }
2841 :
2842 : /// Check to bail out of gc compaction early if it would use too much memory.
2843 27 : async fn check_memory_usage(
2844 27 : self: &Arc<Self>,
2845 27 : layer_selection: &[Layer],
2846 27 : ) -> Result<(), CompactionError> {
2847 27 : let mut estimated_memory_usage_mb = 0.0;
2848 27 : let mut num_image_layers = 0;
2849 27 : let mut num_delta_layers = 0;
2850 27 : let target_layer_size_bytes = 256 * 1024 * 1024;
2851 106 : for layer in layer_selection {
2852 79 : let layer_desc = layer.layer_desc();
2853 79 : if layer_desc.is_delta() {
2854 44 : // Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
2855 44 : // Scale it by target_layer_size_bytes so that tests can pass (some tests, e.g., `test_pageserver_gc_compaction_preempt
2856 44 : // use 3MB layer size and we need to account for that).
2857 44 : estimated_memory_usage_mb +=
2858 44 : 3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
2859 44 : num_delta_layers += 1;
2860 44 : } else {
2861 35 : // Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
2862 35 : estimated_memory_usage_mb +=
2863 35 : 5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
2864 35 : num_image_layers += 1;
2865 35 : }
2866 : }
2867 27 : if estimated_memory_usage_mb > 1024.0 {
2868 0 : return Err(CompactionError::Other(anyhow!(
2869 0 : "estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
2870 0 : estimated_memory_usage_mb,
2871 0 : num_image_layers,
2872 0 : num_delta_layers
2873 0 : )));
2874 27 : }
2875 27 : Ok(())
2876 27 : }
2877 :
2878 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2879 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2880 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2881 : /// here.
2882 28 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2883 28 : let gc_cutoff_lsn = {
2884 28 : let gc_info = self.gc_info.read().unwrap();
2885 28 : gc_info.min_cutoff()
2886 28 : };
2887 28 :
2888 28 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2889 28 : // let watermark = watermark.min(self.standby_horizon.load());
2890 28 :
2891 28 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2892 28 : // them when computing the watermark.
2893 28 : gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
2894 28 : }
2895 :
2896 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2897 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2898 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2899 : /// like a full compaction of the specified keyspace.
2900 0 : pub(crate) async fn gc_compaction_split_jobs(
2901 0 : self: &Arc<Self>,
2902 0 : job: GcCompactJob,
2903 0 : sub_compaction_max_job_size_mb: Option<u64>,
2904 0 : ) -> Result<Vec<GcCompactJob>, CompactionError> {
2905 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2906 0 : job.compact_lsn_range.end
2907 : } else {
2908 0 : self.get_gc_compaction_watermark()
2909 : };
2910 :
2911 0 : if compact_below_lsn == Lsn::INVALID {
2912 0 : tracing::warn!(
2913 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2914 : );
2915 0 : return Ok(vec![]);
2916 0 : }
2917 :
2918 : // Split compaction job to about 4GB each
2919 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2920 0 : let sub_compaction_max_job_size_mb =
2921 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2922 0 :
2923 0 : let mut compact_jobs = Vec::<GcCompactJob>::new();
2924 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2925 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2926 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2927 : // Truncate the key range to be within user specified compaction range.
2928 0 : fn truncate_to(
2929 0 : source_start: &Key,
2930 0 : source_end: &Key,
2931 0 : target_start: &Key,
2932 0 : target_end: &Key,
2933 0 : ) -> Option<(Key, Key)> {
2934 0 : let start = source_start.max(target_start);
2935 0 : let end = source_end.min(target_end);
2936 0 : if start < end {
2937 0 : Some((*start, *end))
2938 : } else {
2939 0 : None
2940 : }
2941 0 : }
2942 0 : let mut split_key_ranges = Vec::new();
2943 0 : let ranges = dense_ks
2944 0 : .parts
2945 0 : .iter()
2946 0 : .map(|partition| partition.ranges.iter())
2947 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2948 0 : .flatten()
2949 0 : .cloned()
2950 0 : .collect_vec();
2951 0 : for range in ranges.iter() {
2952 0 : let Some((start, end)) = truncate_to(
2953 0 : &range.start,
2954 0 : &range.end,
2955 0 : &job.compact_key_range.start,
2956 0 : &job.compact_key_range.end,
2957 0 : ) else {
2958 0 : continue;
2959 : };
2960 0 : split_key_ranges.push((start, end));
2961 : }
2962 0 : split_key_ranges.sort();
2963 0 : let all_layers = {
2964 0 : let guard = self.layers.read().await;
2965 0 : let layer_map = guard.layer_map()?;
2966 0 : layer_map.iter_historic_layers().collect_vec()
2967 0 : };
2968 0 : let mut current_start = None;
2969 0 : let ranges_num = split_key_ranges.len();
2970 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2971 0 : if current_start.is_none() {
2972 0 : current_start = Some(start);
2973 0 : }
2974 0 : let start = current_start.unwrap();
2975 0 : if start >= end {
2976 : // We have already processed this partition.
2977 0 : continue;
2978 0 : }
2979 0 : let overlapping_layers = {
2980 0 : let mut desc = Vec::new();
2981 0 : for layer in all_layers.iter() {
2982 0 : if overlaps_with(&layer.get_key_range(), &(start..end))
2983 0 : && layer.get_lsn_range().start <= compact_below_lsn
2984 0 : {
2985 0 : desc.push(layer.clone());
2986 0 : }
2987 : }
2988 0 : desc
2989 0 : };
2990 0 : let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
2991 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2992 : // Try to extend the compaction range so that we include at least one full layer file.
2993 0 : let extended_end = overlapping_layers
2994 0 : .iter()
2995 0 : .map(|layer| layer.key_range.end)
2996 0 : .min();
2997 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2998 : // In this case, we simply use the specified key range end.
2999 0 : let end = if let Some(extended_end) = extended_end {
3000 0 : extended_end.max(end)
3001 : } else {
3002 0 : end
3003 : };
3004 0 : let end = if ranges_num == idx + 1 {
3005 : // extend the compaction range to the end of the key range if it's the last partition
3006 0 : end.max(job.compact_key_range.end)
3007 : } else {
3008 0 : end
3009 : };
3010 0 : if total_size == 0 && !compact_jobs.is_empty() {
3011 0 : info!(
3012 0 : "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
3013 : start, end, total_size
3014 : );
3015 0 : compact_jobs.last_mut().unwrap().compact_key_range.end = end;
3016 0 : current_start = Some(end);
3017 : } else {
3018 0 : info!(
3019 0 : "splitting compaction job: {}..{}, estimated_size={}",
3020 : start, end, total_size
3021 : );
3022 0 : compact_jobs.push(GcCompactJob {
3023 0 : dry_run: job.dry_run,
3024 0 : compact_key_range: start..end,
3025 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
3026 0 : });
3027 0 : current_start = Some(end);
3028 : }
3029 0 : }
3030 : }
3031 0 : Ok(compact_jobs)
3032 0 : }
3033 :
3034 : /// An experimental compaction building block that combines compaction with garbage collection.
3035 : ///
3036 : /// The current implementation picks all delta + image layers that are below or intersecting with
3037 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
3038 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
3039 : /// and create delta layers with all deltas >= gc horizon.
3040 : ///
3041 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
3042 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
3043 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
3044 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
3045 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
3046 : /// part of the range.
3047 : ///
3048 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
3049 : /// the LSN. Otherwise, it will use the gc cutoff by default.
3050 28 : pub(crate) async fn compact_with_gc(
3051 28 : self: &Arc<Self>,
3052 28 : cancel: &CancellationToken,
3053 28 : options: CompactOptions,
3054 28 : ctx: &RequestContext,
3055 28 : ) -> Result<CompactionOutcome, CompactionError> {
3056 28 : let sub_compaction = options.sub_compaction;
3057 28 : let job = GcCompactJob::from_compact_options(options.clone());
3058 28 : let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
3059 28 : if sub_compaction {
3060 0 : info!(
3061 0 : "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
3062 : );
3063 0 : let jobs = self
3064 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
3065 0 : .await?;
3066 0 : let jobs_len = jobs.len();
3067 0 : for (idx, job) in jobs.into_iter().enumerate() {
3068 0 : info!(
3069 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
3070 0 : idx + 1,
3071 : jobs_len
3072 : );
3073 0 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
3074 0 : .await?;
3075 : }
3076 0 : if jobs_len == 0 {
3077 0 : info!("no jobs to run, skipping gc bottom-most compaction");
3078 0 : }
3079 0 : return Ok(CompactionOutcome::Done);
3080 28 : }
3081 28 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
3082 28 : .await
3083 28 : }
3084 :
3085 28 : async fn compact_with_gc_inner(
3086 28 : self: &Arc<Self>,
3087 28 : cancel: &CancellationToken,
3088 28 : job: GcCompactJob,
3089 28 : ctx: &RequestContext,
3090 28 : yield_for_l0: bool,
3091 28 : ) -> Result<CompactionOutcome, CompactionError> {
3092 28 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
3093 28 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
3094 28 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
3095 28 :
3096 28 : let timer = Instant::now();
3097 28 : let begin_timer = timer;
3098 28 :
3099 28 : let gc_lock = async {
3100 28 : tokio::select! {
3101 28 : guard = self.gc_lock.lock() => Ok(guard),
3102 28 : _ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
3103 : }
3104 28 : };
3105 :
3106 28 : let time_acquire_lock = timer.elapsed();
3107 28 : let timer = Instant::now();
3108 :
3109 28 : let gc_lock = crate::timed(
3110 28 : gc_lock,
3111 28 : "acquires gc lock",
3112 28 : std::time::Duration::from_secs(5),
3113 28 : )
3114 28 : .await?;
3115 :
3116 28 : let dry_run = job.dry_run;
3117 28 : let compact_key_range = job.compact_key_range;
3118 28 : let compact_lsn_range = job.compact_lsn_range;
3119 :
3120 28 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
3121 :
3122 28 : info!(
3123 0 : "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
3124 : compact_key_range.start,
3125 : compact_key_range.end,
3126 : compact_lsn_range.start,
3127 : compact_lsn_range.end
3128 : );
3129 :
3130 28 : scopeguard::defer! {
3131 28 : info!("done enhanced gc bottom-most compaction");
3132 28 : };
3133 28 :
3134 28 : let mut stat = CompactionStatistics::default();
3135 :
3136 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
3137 : // The layer selection has the following properties:
3138 : // 1. If a layer is in the selection, all layers below it are in the selection.
3139 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
3140 27 : let job_desc = {
3141 28 : let guard = self.layers.read().await;
3142 28 : let layers = guard.layer_map()?;
3143 28 : let gc_info = self.gc_info.read().unwrap();
3144 28 : let mut retain_lsns_below_horizon = Vec::new();
3145 28 : let gc_cutoff = {
3146 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
3147 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
3148 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
3149 : // to get the truth data.
3150 28 : let real_gc_cutoff = self.get_gc_compaction_watermark();
3151 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
3152 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
3153 : // the real cutoff.
3154 28 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
3155 25 : if real_gc_cutoff == Lsn::INVALID {
3156 : // If the gc_cutoff is not generated yet, we should not compact anything.
3157 0 : tracing::warn!(
3158 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
3159 : );
3160 0 : return Ok(CompactionOutcome::Skipped);
3161 25 : }
3162 25 : real_gc_cutoff
3163 : } else {
3164 3 : compact_lsn_range.end
3165 : };
3166 28 : if gc_cutoff > real_gc_cutoff {
3167 2 : warn!(
3168 0 : "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
3169 : gc_cutoff, real_gc_cutoff
3170 : );
3171 2 : gc_cutoff = real_gc_cutoff;
3172 26 : }
3173 28 : gc_cutoff
3174 : };
3175 35 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
3176 35 : if lsn < &gc_cutoff {
3177 35 : retain_lsns_below_horizon.push(*lsn);
3178 35 : }
3179 : }
3180 28 : for lsn in gc_info.leases.keys() {
3181 0 : if lsn < &gc_cutoff {
3182 0 : retain_lsns_below_horizon.push(*lsn);
3183 0 : }
3184 : }
3185 28 : let mut selected_layers: Vec<Layer> = Vec::new();
3186 28 : drop(gc_info);
3187 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
3188 28 : let Some(max_layer_lsn) = layers
3189 28 : .iter_historic_layers()
3190 125 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
3191 107 : .map(|desc| desc.get_lsn_range().end)
3192 28 : .max()
3193 : else {
3194 0 : info!(
3195 0 : "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
3196 : gc_cutoff
3197 : );
3198 0 : return Ok(CompactionOutcome::Done);
3199 : };
3200 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
3201 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
3202 : // it is a branch.
3203 28 : let Some(min_layer_lsn) = layers
3204 28 : .iter_historic_layers()
3205 125 : .filter(|desc| {
3206 125 : if compact_lsn_range.start == Lsn::INVALID {
3207 102 : true // select all layers below if start == Lsn(0)
3208 : } else {
3209 23 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
3210 : }
3211 125 : })
3212 116 : .map(|desc| desc.get_lsn_range().start)
3213 28 : .min()
3214 : else {
3215 0 : info!(
3216 0 : "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
3217 : compact_lsn_range.end
3218 : );
3219 0 : return Ok(CompactionOutcome::Done);
3220 : };
3221 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
3222 : // layers to compact.
3223 28 : let mut rewrite_layers = Vec::new();
3224 125 : for desc in layers.iter_historic_layers() {
3225 125 : if desc.get_lsn_range().end <= max_layer_lsn
3226 107 : && desc.get_lsn_range().start >= min_layer_lsn
3227 98 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
3228 : {
3229 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
3230 : // even if it might contain extra keys
3231 79 : selected_layers.push(guard.get_from_desc(&desc));
3232 79 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
3233 79 : // to overlap image layers)
3234 79 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
3235 1 : {
3236 1 : rewrite_layers.push(desc);
3237 78 : }
3238 46 : }
3239 : }
3240 28 : if selected_layers.is_empty() {
3241 1 : info!(
3242 0 : "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
3243 : gc_cutoff, compact_key_range.start, compact_key_range.end
3244 : );
3245 1 : return Ok(CompactionOutcome::Done);
3246 27 : }
3247 27 : retain_lsns_below_horizon.sort();
3248 27 : GcCompactionJobDescription {
3249 27 : selected_layers,
3250 27 : gc_cutoff,
3251 27 : retain_lsns_below_horizon,
3252 27 : min_layer_lsn,
3253 27 : max_layer_lsn,
3254 27 : compaction_key_range: compact_key_range,
3255 27 : rewrite_layers,
3256 27 : }
3257 : };
3258 27 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
3259 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
3260 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
3261 4 : (true, job_desc.min_layer_lsn)
3262 23 : } else if self.ancestor_timeline.is_some() {
3263 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
3264 : // LSN ranges all the way to the ancestor timeline.
3265 1 : (true, self.ancestor_lsn)
3266 : } else {
3267 22 : let res = job_desc
3268 22 : .retain_lsns_below_horizon
3269 22 : .first()
3270 22 : .copied()
3271 22 : .unwrap_or(job_desc.gc_cutoff);
3272 22 : if debug_mode {
3273 22 : assert_eq!(
3274 22 : res,
3275 22 : job_desc
3276 22 : .retain_lsns_below_horizon
3277 22 : .iter()
3278 22 : .min()
3279 22 : .copied()
3280 22 : .unwrap_or(job_desc.gc_cutoff)
3281 22 : );
3282 0 : }
3283 22 : (false, res)
3284 : };
3285 :
3286 27 : let verification = self.get_gc_compaction_settings().gc_compaction_verification;
3287 27 :
3288 27 : info!(
3289 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
3290 0 : job_desc.selected_layers.len(),
3291 0 : job_desc.rewrite_layers.len(),
3292 : job_desc.max_layer_lsn,
3293 : job_desc.min_layer_lsn,
3294 : job_desc.gc_cutoff,
3295 : lowest_retain_lsn,
3296 : job_desc.compaction_key_range.start,
3297 : job_desc.compaction_key_range.end,
3298 : has_data_below,
3299 : );
3300 :
3301 27 : let time_analyze = timer.elapsed();
3302 27 : let timer = Instant::now();
3303 :
3304 106 : for layer in &job_desc.selected_layers {
3305 79 : debug!("read layer: {}", layer.layer_desc().key());
3306 : }
3307 28 : for layer in &job_desc.rewrite_layers {
3308 1 : debug!("rewrite layer: {}", layer.key());
3309 : }
3310 :
3311 27 : self.check_compaction_space(&job_desc.selected_layers)
3312 27 : .await?;
3313 :
3314 27 : self.check_memory_usage(&job_desc.selected_layers).await?;
3315 27 : if job_desc.selected_layers.len() > 100
3316 0 : && job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
3317 : {
3318 0 : return Err(CompactionError::Other(anyhow!(
3319 0 : "too many layers to rewrite: {} / {}, giving up compaction",
3320 0 : job_desc.rewrite_layers.len(),
3321 0 : job_desc.selected_layers.len()
3322 0 : )));
3323 27 : }
3324 :
3325 : // Generate statistics for the compaction
3326 106 : for layer in &job_desc.selected_layers {
3327 79 : let desc = layer.layer_desc();
3328 79 : if desc.is_delta() {
3329 44 : stat.visit_delta_layer(desc.file_size());
3330 44 : } else {
3331 35 : stat.visit_image_layer(desc.file_size());
3332 35 : }
3333 : }
3334 :
3335 : // Step 1: construct a k-merge iterator over all layers.
3336 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
3337 27 : let layer_names = job_desc
3338 27 : .selected_layers
3339 27 : .iter()
3340 79 : .map(|layer| layer.layer_desc().layer_name())
3341 27 : .collect_vec();
3342 27 : if let Some(err) = check_valid_layermap(&layer_names) {
3343 0 : return Err(CompactionError::Other(anyhow!(
3344 0 : "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
3345 0 : err
3346 0 : )));
3347 27 : }
3348 27 : // The maximum LSN we are processing in this compaction loop
3349 27 : let end_lsn = job_desc
3350 27 : .selected_layers
3351 27 : .iter()
3352 79 : .map(|l| l.layer_desc().lsn_range.end)
3353 27 : .max()
3354 27 : .unwrap();
3355 27 : let mut delta_layers = Vec::new();
3356 27 : let mut image_layers = Vec::new();
3357 27 : let mut downloaded_layers = Vec::new();
3358 27 : let mut total_downloaded_size = 0;
3359 27 : let mut total_layer_size = 0;
3360 106 : for layer in &job_desc.selected_layers {
3361 79 : if layer
3362 79 : .needs_download()
3363 79 : .await
3364 79 : .context("failed to check if layer needs download")
3365 79 : .map_err(CompactionError::Other)?
3366 79 : .is_some()
3367 0 : {
3368 0 : total_downloaded_size += layer.layer_desc().file_size;
3369 79 : }
3370 79 : total_layer_size += layer.layer_desc().file_size;
3371 79 : if cancel.is_cancelled() {
3372 0 : return Err(CompactionError::ShuttingDown);
3373 79 : }
3374 79 : let should_yield = yield_for_l0
3375 0 : && self
3376 0 : .l0_compaction_trigger
3377 0 : .notified()
3378 0 : .now_or_never()
3379 0 : .is_some();
3380 79 : if should_yield {
3381 0 : tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
3382 0 : return Ok(CompactionOutcome::YieldForL0);
3383 79 : }
3384 79 : let resident_layer = layer
3385 79 : .download_and_keep_resident(ctx)
3386 79 : .await
3387 79 : .context("failed to download and keep resident layer")
3388 79 : .map_err(CompactionError::Other)?;
3389 79 : downloaded_layers.push(resident_layer);
3390 : }
3391 27 : info!(
3392 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
3393 0 : total_downloaded_size,
3394 0 : total_layer_size,
3395 0 : total_downloaded_size as f64 / total_layer_size as f64
3396 : );
3397 106 : for resident_layer in &downloaded_layers {
3398 79 : if resident_layer.layer_desc().is_delta() {
3399 44 : let layer = resident_layer
3400 44 : .get_as_delta(ctx)
3401 44 : .await
3402 44 : .context("failed to get delta layer")
3403 44 : .map_err(CompactionError::Other)?;
3404 44 : delta_layers.push(layer);
3405 : } else {
3406 35 : let layer = resident_layer
3407 35 : .get_as_image(ctx)
3408 35 : .await
3409 35 : .context("failed to get image layer")
3410 35 : .map_err(CompactionError::Other)?;
3411 35 : image_layers.push(layer);
3412 : }
3413 : }
3414 27 : let (dense_ks, sparse_ks) = self
3415 27 : .collect_gc_compaction_keyspace()
3416 27 : .await
3417 27 : .context("failed to collect gc compaction keyspace")
3418 27 : .map_err(CompactionError::Other)?;
3419 27 : let mut merge_iter = FilterIterator::create(
3420 27 : MergeIterator::create_with_options(
3421 27 : &delta_layers,
3422 27 : &image_layers,
3423 27 : ctx,
3424 27 : 128 * 8192, /* 1MB buffer for each of the inner iterators */
3425 27 : 128,
3426 27 : ),
3427 27 : dense_ks,
3428 27 : sparse_ks,
3429 27 : )
3430 27 : .context("failed to create filter iterator")
3431 27 : .map_err(CompactionError::Other)?;
3432 :
3433 27 : let time_download_layer = timer.elapsed();
3434 27 : let mut timer = Instant::now();
3435 27 :
3436 27 : // Step 2: Produce images+deltas.
3437 27 : let mut accumulated_values = Vec::new();
3438 27 : let mut accumulated_values_estimated_size = 0;
3439 27 : let mut last_key: Option<Key> = None;
3440 :
3441 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
3442 : // when some condition meet.
3443 27 : let mut image_layer_writer = if !has_data_below {
3444 : Some(
3445 22 : SplitImageLayerWriter::new(
3446 22 : self.conf,
3447 22 : self.timeline_id,
3448 22 : self.tenant_shard_id,
3449 22 : job_desc.compaction_key_range.start,
3450 22 : lowest_retain_lsn,
3451 22 : self.get_compaction_target_size(),
3452 22 : &self.gate,
3453 22 : self.cancel.clone(),
3454 22 : ctx,
3455 22 : )
3456 22 : .await
3457 22 : .context("failed to create image layer writer")
3458 22 : .map_err(CompactionError::Other)?,
3459 : )
3460 : } else {
3461 5 : None
3462 : };
3463 :
3464 27 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
3465 27 : self.conf,
3466 27 : self.timeline_id,
3467 27 : self.tenant_shard_id,
3468 27 : lowest_retain_lsn..end_lsn,
3469 27 : self.get_compaction_target_size(),
3470 27 : &self.gate,
3471 27 : self.cancel.clone(),
3472 27 : )
3473 27 : .await
3474 27 : .context("failed to create delta layer writer")
3475 27 : .map_err(CompactionError::Other)?;
3476 :
3477 : #[derive(Default)]
3478 : struct RewritingLayers {
3479 : before: Option<DeltaLayerWriter>,
3480 : after: Option<DeltaLayerWriter>,
3481 : }
3482 27 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
3483 :
3484 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
3485 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
3486 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
3487 : ///
3488 : /// This function unifies the cases so that later code doesn't have to think about it.
3489 : ///
3490 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
3491 : /// is needed for reconstruction. This should be fixed in the future.
3492 : ///
3493 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
3494 : /// images.
3495 320 : async fn get_ancestor_image(
3496 320 : this_tline: &Arc<Timeline>,
3497 320 : key: Key,
3498 320 : ctx: &RequestContext,
3499 320 : has_data_below: bool,
3500 320 : history_lsn_point: Lsn,
3501 320 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
3502 320 : if !has_data_below {
3503 301 : return Ok(None);
3504 19 : };
3505 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
3506 : // as much existing code as possible.
3507 19 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
3508 19 : Ok(Some((key, history_lsn_point, img)))
3509 320 : }
3510 :
3511 : // Actually, we can decide not to write to the image layer at all at this point because
3512 : // the key and LSN range are determined. However, to keep things simple here, we still
3513 : // create this writer, and discard the writer in the end.
3514 27 : let mut time_to_first_kv_pair = None;
3515 :
3516 496 : while let Some(((key, lsn, val), desc)) = merge_iter
3517 496 : .next_with_trace()
3518 496 : .await
3519 496 : .context("failed to get next key-value pair")
3520 496 : .map_err(CompactionError::Other)?
3521 : {
3522 470 : if time_to_first_kv_pair.is_none() {
3523 27 : time_to_first_kv_pair = Some(timer.elapsed());
3524 27 : timer = Instant::now();
3525 443 : }
3526 :
3527 470 : if cancel.is_cancelled() {
3528 0 : return Err(CompactionError::ShuttingDown);
3529 470 : }
3530 :
3531 470 : let should_yield = yield_for_l0
3532 0 : && self
3533 0 : .l0_compaction_trigger
3534 0 : .notified()
3535 0 : .now_or_never()
3536 0 : .is_some();
3537 470 : if should_yield {
3538 0 : tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
3539 0 : return Ok(CompactionOutcome::YieldForL0);
3540 470 : }
3541 470 : if self.shard_identity.is_key_disposable(&key) {
3542 : // If this shard does not need to store this key, simply skip it.
3543 : //
3544 : // This is not handled in the filter iterator because shard is determined by hash.
3545 : // Therefore, it does not give us any performance benefit to do things like skip
3546 : // a whole layer file as handling key spaces (ranges).
3547 0 : if cfg!(debug_assertions) {
3548 0 : let shard = self.shard_identity.shard_index();
3549 0 : let owner = self.shard_identity.get_shard_number(&key);
3550 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
3551 0 : }
3552 0 : continue;
3553 470 : }
3554 470 : if !job_desc.compaction_key_range.contains(&key) {
3555 32 : if !desc.is_delta {
3556 30 : continue;
3557 2 : }
3558 2 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
3559 2 : let rewriter = if key < job_desc.compaction_key_range.start {
3560 0 : if rewriter.before.is_none() {
3561 0 : rewriter.before = Some(
3562 0 : DeltaLayerWriter::new(
3563 0 : self.conf,
3564 0 : self.timeline_id,
3565 0 : self.tenant_shard_id,
3566 0 : desc.key_range.start,
3567 0 : desc.lsn_range.clone(),
3568 0 : &self.gate,
3569 0 : self.cancel.clone(),
3570 0 : ctx,
3571 0 : )
3572 0 : .await
3573 0 : .context("failed to create delta layer writer")
3574 0 : .map_err(CompactionError::Other)?,
3575 : );
3576 0 : }
3577 0 : rewriter.before.as_mut().unwrap()
3578 2 : } else if key >= job_desc.compaction_key_range.end {
3579 2 : if rewriter.after.is_none() {
3580 1 : rewriter.after = Some(
3581 1 : DeltaLayerWriter::new(
3582 1 : self.conf,
3583 1 : self.timeline_id,
3584 1 : self.tenant_shard_id,
3585 1 : job_desc.compaction_key_range.end,
3586 1 : desc.lsn_range.clone(),
3587 1 : &self.gate,
3588 1 : self.cancel.clone(),
3589 1 : ctx,
3590 1 : )
3591 1 : .await
3592 1 : .context("failed to create delta layer writer")
3593 1 : .map_err(CompactionError::Other)?,
3594 : );
3595 1 : }
3596 2 : rewriter.after.as_mut().unwrap()
3597 : } else {
3598 0 : unreachable!()
3599 : };
3600 2 : rewriter
3601 2 : .put_value(key, lsn, val, ctx)
3602 2 : .await
3603 2 : .context("failed to put value")
3604 2 : .map_err(CompactionError::Other)?;
3605 2 : continue;
3606 438 : }
3607 438 : match val {
3608 315 : Value::Image(_) => stat.visit_image_key(&val),
3609 123 : Value::WalRecord(_) => stat.visit_wal_key(&val),
3610 : }
3611 438 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
3612 144 : if last_key.is_none() {
3613 27 : last_key = Some(key);
3614 117 : }
3615 144 : accumulated_values_estimated_size += val.estimated_size();
3616 144 : accumulated_values.push((key, lsn, val));
3617 144 :
3618 144 : // Accumulated values should never exceed 512MB.
3619 144 : if accumulated_values_estimated_size >= 1024 * 1024 * 512 {
3620 0 : return Err(CompactionError::Other(anyhow!(
3621 0 : "too many values for a single key: {} for key {}, {} items",
3622 0 : accumulated_values_estimated_size,
3623 0 : key,
3624 0 : accumulated_values.len()
3625 0 : )));
3626 144 : }
3627 : } else {
3628 294 : let last_key: &mut Key = last_key.as_mut().unwrap();
3629 294 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
3630 294 : let retention = self
3631 294 : .generate_key_retention(
3632 294 : *last_key,
3633 294 : &accumulated_values,
3634 294 : job_desc.gc_cutoff,
3635 294 : &job_desc.retain_lsns_below_horizon,
3636 294 : COMPACTION_DELTA_THRESHOLD,
3637 294 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
3638 294 : .await
3639 294 : .context("failed to get ancestor image")
3640 294 : .map_err(CompactionError::Other)?,
3641 294 : verification,
3642 294 : )
3643 294 : .await
3644 294 : .context("failed to generate key retention")
3645 294 : .map_err(CompactionError::Other)?;
3646 293 : retention
3647 293 : .pipe_to(
3648 293 : *last_key,
3649 293 : &mut delta_layer_writer,
3650 293 : image_layer_writer.as_mut(),
3651 293 : &mut stat,
3652 293 : ctx,
3653 293 : )
3654 293 : .await
3655 293 : .context("failed to pipe to delta layer writer")
3656 293 : .map_err(CompactionError::Other)?;
3657 293 : accumulated_values.clear();
3658 293 : *last_key = key;
3659 293 : accumulated_values_estimated_size = val.estimated_size();
3660 293 : accumulated_values.push((key, lsn, val));
3661 : }
3662 : }
3663 :
3664 : // TODO: move the below part to the loop body
3665 26 : let Some(last_key) = last_key else {
3666 0 : return Err(CompactionError::Other(anyhow!(
3667 0 : "no keys produced during compaction"
3668 0 : )));
3669 : };
3670 26 : stat.on_unique_key_visited();
3671 :
3672 26 : let retention = self
3673 26 : .generate_key_retention(
3674 26 : last_key,
3675 26 : &accumulated_values,
3676 26 : job_desc.gc_cutoff,
3677 26 : &job_desc.retain_lsns_below_horizon,
3678 26 : COMPACTION_DELTA_THRESHOLD,
3679 26 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
3680 26 : .await
3681 26 : .context("failed to get ancestor image")
3682 26 : .map_err(CompactionError::Other)?,
3683 26 : verification,
3684 26 : )
3685 26 : .await
3686 26 : .context("failed to generate key retention")
3687 26 : .map_err(CompactionError::Other)?;
3688 26 : retention
3689 26 : .pipe_to(
3690 26 : last_key,
3691 26 : &mut delta_layer_writer,
3692 26 : image_layer_writer.as_mut(),
3693 26 : &mut stat,
3694 26 : ctx,
3695 26 : )
3696 26 : .await
3697 26 : .context("failed to pipe to delta layer writer")
3698 26 : .map_err(CompactionError::Other)?;
3699 : // end: move the above part to the loop body
3700 :
3701 26 : let time_main_loop = timer.elapsed();
3702 26 : let timer = Instant::now();
3703 26 :
3704 26 : let mut rewrote_delta_layers = Vec::new();
3705 27 : for (key, writers) in delta_layer_rewriters {
3706 1 : if let Some(delta_writer_before) = writers.before {
3707 0 : let (desc, path) = delta_writer_before
3708 0 : .finish(job_desc.compaction_key_range.start, ctx)
3709 0 : .await
3710 0 : .context("failed to finish delta layer writer")
3711 0 : .map_err(CompactionError::Other)?;
3712 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3713 0 : .context("failed to finish creating delta layer")
3714 0 : .map_err(CompactionError::Other)?;
3715 0 : rewrote_delta_layers.push(layer);
3716 1 : }
3717 1 : if let Some(delta_writer_after) = writers.after {
3718 1 : let (desc, path) = delta_writer_after
3719 1 : .finish(key.key_range.end, ctx)
3720 1 : .await
3721 1 : .context("failed to finish delta layer writer")
3722 1 : .map_err(CompactionError::Other)?;
3723 1 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3724 1 : .context("failed to finish creating delta layer")
3725 1 : .map_err(CompactionError::Other)?;
3726 1 : rewrote_delta_layers.push(layer);
3727 0 : }
3728 : }
3729 :
3730 37 : let discard = |key: &PersistentLayerKey| {
3731 37 : let key = key.clone();
3732 37 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
3733 37 : };
3734 :
3735 26 : let produced_image_layers = if let Some(writer) = image_layer_writer {
3736 21 : if !dry_run {
3737 19 : let end_key = job_desc.compaction_key_range.end;
3738 19 : writer
3739 19 : .finish_with_discard_fn(self, ctx, end_key, discard)
3740 19 : .await
3741 19 : .context("failed to finish image layer writer")
3742 19 : .map_err(CompactionError::Other)?
3743 : } else {
3744 2 : drop(writer);
3745 2 : Vec::new()
3746 : }
3747 : } else {
3748 5 : Vec::new()
3749 : };
3750 :
3751 26 : let produced_delta_layers = if !dry_run {
3752 24 : delta_layer_writer
3753 24 : .finish_with_discard_fn(self, ctx, discard)
3754 24 : .await
3755 24 : .context("failed to finish delta layer writer")
3756 24 : .map_err(CompactionError::Other)?
3757 : } else {
3758 2 : drop(delta_layer_writer);
3759 2 : Vec::new()
3760 : };
3761 :
3762 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
3763 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
3764 26 : let mut compact_to = Vec::new();
3765 26 : let mut keep_layers = HashSet::new();
3766 26 : let produced_delta_layers_len = produced_delta_layers.len();
3767 26 : let produced_image_layers_len = produced_image_layers.len();
3768 26 :
3769 26 : let layer_selection_by_key = job_desc
3770 26 : .selected_layers
3771 26 : .iter()
3772 76 : .map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
3773 26 : .collect::<HashMap<_, _>>();
3774 :
3775 44 : for action in produced_delta_layers {
3776 18 : match action {
3777 11 : BatchWriterResult::Produced(layer) => {
3778 11 : if cfg!(debug_assertions) {
3779 11 : info!("produced delta layer: {}", layer.layer_desc().key());
3780 0 : }
3781 11 : stat.produce_delta_layer(layer.layer_desc().file_size());
3782 11 : compact_to.push(layer);
3783 : }
3784 7 : BatchWriterResult::Discarded(l) => {
3785 7 : if cfg!(debug_assertions) {
3786 7 : info!("discarded delta layer: {}", l);
3787 0 : }
3788 7 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3789 7 : stat.discard_delta_layer(layer_desc.file_size());
3790 7 : } else {
3791 0 : tracing::warn!(
3792 0 : "discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3793 : l
3794 : );
3795 0 : stat.discard_delta_layer(0);
3796 : }
3797 7 : keep_layers.insert(l);
3798 : }
3799 : }
3800 : }
3801 27 : for layer in &rewrote_delta_layers {
3802 1 : debug!(
3803 0 : "produced rewritten delta layer: {}",
3804 0 : layer.layer_desc().key()
3805 : );
3806 : // For now, we include rewritten delta layer size in the "produce_delta_layer". We could
3807 : // make it a separate statistics in the future.
3808 1 : stat.produce_delta_layer(layer.layer_desc().file_size());
3809 : }
3810 26 : compact_to.extend(rewrote_delta_layers);
3811 45 : for action in produced_image_layers {
3812 19 : match action {
3813 15 : BatchWriterResult::Produced(layer) => {
3814 15 : debug!("produced image layer: {}", layer.layer_desc().key());
3815 15 : stat.produce_image_layer(layer.layer_desc().file_size());
3816 15 : compact_to.push(layer);
3817 : }
3818 4 : BatchWriterResult::Discarded(l) => {
3819 4 : debug!("discarded image layer: {}", l);
3820 4 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3821 4 : stat.discard_image_layer(layer_desc.file_size());
3822 4 : } else {
3823 0 : tracing::warn!(
3824 0 : "discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3825 : l
3826 : );
3827 0 : stat.discard_image_layer(0);
3828 : }
3829 4 : keep_layers.insert(l);
3830 : }
3831 : }
3832 : }
3833 :
3834 26 : let mut layer_selection = job_desc.selected_layers;
3835 :
3836 : // Partial compaction might select more data than it processes, e.g., if
3837 : // the compaction_key_range only partially overlaps:
3838 : //
3839 : // [---compaction_key_range---]
3840 : // [---A----][----B----][----C----][----D----]
3841 : //
3842 : // For delta layers, we will rewrite the layers so that it is cut exactly at
3843 : // the compaction key range, so we can always discard them. However, for image
3844 : // layers, as we do not rewrite them for now, we need to handle them differently.
3845 : // Assume image layers A, B, C, D are all in the `layer_selection`.
3846 : //
3847 : // The created image layers contain whatever is needed from B, C, and from
3848 : // `----]` of A, and from `[---` of D.
3849 : //
3850 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
3851 : // keep that data.
3852 : //
3853 : // The solution for now is to keep A and D completely if they are image layers.
3854 : // (layer_selection is what we'll remove from the layer map, so, retain what
3855 : // is _not_ fully covered by compaction_key_range).
3856 102 : for layer in &layer_selection {
3857 76 : if !layer.layer_desc().is_delta() {
3858 33 : if !overlaps_with(
3859 33 : &layer.layer_desc().key_range,
3860 33 : &job_desc.compaction_key_range,
3861 33 : ) {
3862 0 : return Err(CompactionError::Other(anyhow!(
3863 0 : "violated constraint: image layer outside of compaction key range"
3864 0 : )));
3865 33 : }
3866 33 : if !fully_contains(
3867 33 : &job_desc.compaction_key_range,
3868 33 : &layer.layer_desc().key_range,
3869 33 : ) {
3870 4 : keep_layers.insert(layer.layer_desc().key());
3871 29 : }
3872 43 : }
3873 : }
3874 :
3875 76 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
3876 26 :
3877 26 : let time_final_phase = timer.elapsed();
3878 26 :
3879 26 : stat.time_final_phase_secs = time_final_phase.as_secs_f64();
3880 26 : stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
3881 26 : .unwrap_or(Duration::ZERO)
3882 26 : .as_secs_f64();
3883 26 : stat.time_main_loop_secs = time_main_loop.as_secs_f64();
3884 26 : stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
3885 26 : stat.time_download_layer_secs = time_download_layer.as_secs_f64();
3886 26 : stat.time_analyze_secs = time_analyze.as_secs_f64();
3887 26 : stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
3888 26 : stat.finalize();
3889 26 :
3890 26 : info!(
3891 0 : "gc-compaction statistics: {}",
3892 0 : serde_json::to_string(&stat)
3893 0 : .context("failed to serialize gc-compaction statistics")
3894 0 : .map_err(CompactionError::Other)?
3895 : );
3896 :
3897 26 : if dry_run {
3898 2 : return Ok(CompactionOutcome::Done);
3899 24 : }
3900 24 :
3901 24 : info!(
3902 0 : "produced {} delta layers and {} image layers, {} layers are kept",
3903 0 : produced_delta_layers_len,
3904 0 : produced_image_layers_len,
3905 0 : keep_layers.len()
3906 : );
3907 :
3908 : // Step 3: Place back to the layer map.
3909 :
3910 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
3911 24 : let all_layers = {
3912 24 : let guard = self.layers.read().await;
3913 24 : let layer_map = guard.layer_map()?;
3914 24 : layer_map.iter_historic_layers().collect_vec()
3915 24 : };
3916 24 :
3917 24 : let mut final_layers = all_layers
3918 24 : .iter()
3919 107 : .map(|layer| layer.layer_name())
3920 24 : .collect::<HashSet<_>>();
3921 76 : for layer in &layer_selection {
3922 52 : final_layers.remove(&layer.layer_desc().layer_name());
3923 52 : }
3924 51 : for layer in &compact_to {
3925 27 : final_layers.insert(layer.layer_desc().layer_name());
3926 27 : }
3927 24 : let final_layers = final_layers.into_iter().collect_vec();
3928 :
3929 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
3930 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
3931 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
3932 24 : if let Some(err) = check_valid_layermap(&final_layers) {
3933 0 : return Err(CompactionError::Other(anyhow!(
3934 0 : "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
3935 0 : err
3936 0 : )));
3937 24 : }
3938 :
3939 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
3940 : // operate on L1 layers.
3941 : {
3942 : // Gc-compaction will rewrite the history of a key. This could happen in two ways:
3943 : //
3944 : // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
3945 : // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
3946 : // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
3947 : // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
3948 : // now gets stored in I at the compact LSN.
3949 : //
3950 : // --------------- ---------------
3951 : // delta1@LSN20 image1@LSN20
3952 : // --------------- (read path collects delta@LSN20, => --------------- (read path cannot find anything
3953 : // delta1@LSN10 yields) below LSN 20)
3954 : // ---------------
3955 : //
3956 : // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
3957 : // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
3958 : // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
3959 : //
3960 : // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
3961 : // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
3962 : // layer containing 4, yields, and we update the layer map to put the delta layer.
3963 : //
3964 : // --------------- ---------------
3965 : // delta1@LSN4 image1@LSN4
3966 : // --------------- (read path collects delta@LSN4, => --------------- (read path collects LSN4 and LSN1,
3967 : // delta1@LSN1-3 yields) delta1@LSN1 which is an invalid history)
3968 : // --------------- ---------------
3969 : //
3970 : // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
3971 : // and only allow reads to continue after the update is finished.
3972 :
3973 24 : let update_guard = self.gc_compaction_layer_update_lock.write().await;
3974 : // Acquiring the update guard ensures current read operations end and new read operations are blocked.
3975 : // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
3976 24 : let mut guard = self.layers.write().await;
3977 24 : guard
3978 24 : .open_mut()?
3979 24 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
3980 24 : drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
3981 24 : };
3982 24 :
3983 24 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
3984 24 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
3985 24 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
3986 24 : // be batched into `schedule_compaction_update`.
3987 24 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
3988 24 : self.schedule_uploads(disk_consistent_lsn, None)
3989 24 : .context("failed to schedule uploads")
3990 24 : .map_err(CompactionError::Other)?;
3991 : // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
3992 : // of `compact_from`.
3993 24 : let compact_from = {
3994 24 : let mut compact_from = Vec::new();
3995 24 : let mut compact_to_set = HashMap::new();
3996 51 : for layer in &compact_to {
3997 27 : compact_to_set.insert(layer.layer_desc().key(), layer);
3998 27 : }
3999 76 : for layer in &layer_selection {
4000 52 : if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
4001 0 : tracing::info!(
4002 0 : "skipping delete {} because found same layer key at different generation {}",
4003 : layer,
4004 : to
4005 : );
4006 52 : } else {
4007 52 : compact_from.push(layer.clone());
4008 52 : }
4009 : }
4010 24 : compact_from
4011 24 : };
4012 24 : self.remote_client
4013 24 : .schedule_compaction_update(&compact_from, &compact_to)?;
4014 :
4015 24 : drop(gc_lock);
4016 24 :
4017 24 : Ok(CompactionOutcome::Done)
4018 28 : }
4019 : }
4020 :
4021 : struct TimelineAdaptor {
4022 : timeline: Arc<Timeline>,
4023 :
4024 : keyspace: (Lsn, KeySpace),
4025 :
4026 : new_deltas: Vec<ResidentLayer>,
4027 : new_images: Vec<ResidentLayer>,
4028 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
4029 : }
4030 :
4031 : impl TimelineAdaptor {
4032 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
4033 0 : Self {
4034 0 : timeline: timeline.clone(),
4035 0 : keyspace,
4036 0 : new_images: Vec::new(),
4037 0 : new_deltas: Vec::new(),
4038 0 : layers_to_delete: Vec::new(),
4039 0 : }
4040 0 : }
4041 :
4042 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
4043 0 : let layers_to_delete = {
4044 0 : let guard = self.timeline.layers.read().await;
4045 0 : self.layers_to_delete
4046 0 : .iter()
4047 0 : .map(|x| guard.get_from_desc(x))
4048 0 : .collect::<Vec<Layer>>()
4049 0 : };
4050 0 : self.timeline
4051 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
4052 0 : .await?;
4053 :
4054 0 : self.timeline
4055 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
4056 :
4057 0 : self.new_deltas.clear();
4058 0 : self.layers_to_delete.clear();
4059 0 : Ok(())
4060 0 : }
4061 : }
4062 :
4063 : #[derive(Clone)]
4064 : struct ResidentDeltaLayer(ResidentLayer);
4065 : #[derive(Clone)]
4066 : struct ResidentImageLayer(ResidentLayer);
4067 :
4068 : impl CompactionJobExecutor for TimelineAdaptor {
4069 : type Key = pageserver_api::key::Key;
4070 :
4071 : type Layer = OwnArc<PersistentLayerDesc>;
4072 : type DeltaLayer = ResidentDeltaLayer;
4073 : type ImageLayer = ResidentImageLayer;
4074 :
4075 : type RequestContext = crate::context::RequestContext;
4076 :
4077 0 : fn get_shard_identity(&self) -> &ShardIdentity {
4078 0 : self.timeline.get_shard_identity()
4079 0 : }
4080 :
4081 0 : async fn get_layers(
4082 0 : &mut self,
4083 0 : key_range: &Range<Key>,
4084 0 : lsn_range: &Range<Lsn>,
4085 0 : _ctx: &RequestContext,
4086 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
4087 0 : self.flush_updates().await?;
4088 :
4089 0 : let guard = self.timeline.layers.read().await;
4090 0 : let layer_map = guard.layer_map()?;
4091 :
4092 0 : let result = layer_map
4093 0 : .iter_historic_layers()
4094 0 : .filter(|l| {
4095 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
4096 0 : })
4097 0 : .map(OwnArc)
4098 0 : .collect();
4099 0 : Ok(result)
4100 0 : }
4101 :
4102 0 : async fn get_keyspace(
4103 0 : &mut self,
4104 0 : key_range: &Range<Key>,
4105 0 : lsn: Lsn,
4106 0 : _ctx: &RequestContext,
4107 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
4108 0 : if lsn == self.keyspace.0 {
4109 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
4110 0 : &self.keyspace.1.ranges,
4111 0 : key_range,
4112 0 : ))
4113 : } else {
4114 : // The current compaction implementation only ever requests the key space
4115 : // at the compaction end LSN.
4116 0 : anyhow::bail!("keyspace not available for requested lsn");
4117 : }
4118 0 : }
4119 :
4120 0 : async fn downcast_delta_layer(
4121 0 : &self,
4122 0 : layer: &OwnArc<PersistentLayerDesc>,
4123 0 : ctx: &RequestContext,
4124 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
4125 0 : // this is a lot more complex than a simple downcast...
4126 0 : if layer.is_delta() {
4127 0 : let l = {
4128 0 : let guard = self.timeline.layers.read().await;
4129 0 : guard.get_from_desc(layer)
4130 : };
4131 0 : let result = l.download_and_keep_resident(ctx).await?;
4132 :
4133 0 : Ok(Some(ResidentDeltaLayer(result)))
4134 : } else {
4135 0 : Ok(None)
4136 : }
4137 0 : }
4138 :
4139 0 : async fn create_image(
4140 0 : &mut self,
4141 0 : lsn: Lsn,
4142 0 : key_range: &Range<Key>,
4143 0 : ctx: &RequestContext,
4144 0 : ) -> anyhow::Result<()> {
4145 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
4146 0 : }
4147 :
4148 0 : async fn create_delta(
4149 0 : &mut self,
4150 0 : lsn_range: &Range<Lsn>,
4151 0 : key_range: &Range<Key>,
4152 0 : input_layers: &[ResidentDeltaLayer],
4153 0 : ctx: &RequestContext,
4154 0 : ) -> anyhow::Result<()> {
4155 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
4156 :
4157 0 : let mut all_entries = Vec::new();
4158 0 : for dl in input_layers.iter() {
4159 0 : all_entries.extend(dl.load_keys(ctx).await?);
4160 : }
4161 :
4162 : // The current stdlib sorting implementation is designed in a way where it is
4163 : // particularly fast where the slice is made up of sorted sub-ranges.
4164 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
4165 :
4166 0 : let mut writer = DeltaLayerWriter::new(
4167 0 : self.timeline.conf,
4168 0 : self.timeline.timeline_id,
4169 0 : self.timeline.tenant_shard_id,
4170 0 : key_range.start,
4171 0 : lsn_range.clone(),
4172 0 : &self.timeline.gate,
4173 0 : self.timeline.cancel.clone(),
4174 0 : ctx,
4175 0 : )
4176 0 : .await?;
4177 :
4178 0 : let mut dup_values = 0;
4179 0 :
4180 0 : // This iterator walks through all key-value pairs from all the layers
4181 0 : // we're compacting, in key, LSN order.
4182 0 : let mut prev: Option<(Key, Lsn)> = None;
4183 : for &DeltaEntry {
4184 0 : key, lsn, ref val, ..
4185 0 : } in all_entries.iter()
4186 : {
4187 0 : if prev == Some((key, lsn)) {
4188 : // This is a duplicate. Skip it.
4189 : //
4190 : // It can happen if compaction is interrupted after writing some
4191 : // layers but not all, and we are compacting the range again.
4192 : // The calculations in the algorithm assume that there are no
4193 : // duplicates, so the math on targeted file size is likely off,
4194 : // and we will create smaller files than expected.
4195 0 : dup_values += 1;
4196 0 : continue;
4197 0 : }
4198 :
4199 0 : let value = val.load(ctx).await?;
4200 :
4201 0 : writer.put_value(key, lsn, value, ctx).await?;
4202 :
4203 0 : prev = Some((key, lsn));
4204 : }
4205 :
4206 0 : if dup_values > 0 {
4207 0 : warn!("delta layer created with {} duplicate values", dup_values);
4208 0 : }
4209 :
4210 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
4211 0 : Err(anyhow::anyhow!(
4212 0 : "failpoint delta-layer-writer-fail-before-finish"
4213 0 : ))
4214 0 : });
4215 :
4216 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
4217 0 : let new_delta_layer =
4218 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4219 :
4220 0 : self.new_deltas.push(new_delta_layer);
4221 0 : Ok(())
4222 0 : }
4223 :
4224 0 : async fn delete_layer(
4225 0 : &mut self,
4226 0 : layer: &OwnArc<PersistentLayerDesc>,
4227 0 : _ctx: &RequestContext,
4228 0 : ) -> anyhow::Result<()> {
4229 0 : self.layers_to_delete.push(layer.clone().0);
4230 0 : Ok(())
4231 0 : }
4232 : }
4233 :
4234 : impl TimelineAdaptor {
4235 0 : async fn create_image_impl(
4236 0 : &mut self,
4237 0 : lsn: Lsn,
4238 0 : key_range: &Range<Key>,
4239 0 : ctx: &RequestContext,
4240 0 : ) -> Result<(), CreateImageLayersError> {
4241 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
4242 :
4243 0 : let image_layer_writer = ImageLayerWriter::new(
4244 0 : self.timeline.conf,
4245 0 : self.timeline.timeline_id,
4246 0 : self.timeline.tenant_shard_id,
4247 0 : key_range,
4248 0 : lsn,
4249 0 : &self.timeline.gate,
4250 0 : self.timeline.cancel.clone(),
4251 0 : ctx,
4252 0 : )
4253 0 : .await?;
4254 :
4255 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
4256 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
4257 0 : "failpoint image-layer-writer-fail-before-finish"
4258 0 : )))
4259 0 : });
4260 :
4261 0 : let keyspace = KeySpace {
4262 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
4263 : };
4264 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
4265 0 : let outcome = self
4266 0 : .timeline
4267 0 : .create_image_layer_for_rel_blocks(
4268 0 : &keyspace,
4269 0 : image_layer_writer,
4270 0 : lsn,
4271 0 : ctx,
4272 0 : key_range.clone(),
4273 0 : IoConcurrency::sequential(),
4274 0 : )
4275 0 : .await?;
4276 :
4277 : if let ImageLayerCreationOutcome::Generated {
4278 0 : unfinished_image_layer,
4279 0 : } = outcome
4280 : {
4281 0 : let (desc, path) = unfinished_image_layer.finish(ctx).await?;
4282 0 : let image_layer =
4283 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4284 0 : self.new_images.push(image_layer);
4285 0 : }
4286 :
4287 0 : timer.stop_and_record();
4288 0 :
4289 0 : Ok(())
4290 0 : }
4291 : }
4292 :
4293 : impl CompactionRequestContext for crate::context::RequestContext {}
4294 :
4295 : #[derive(Debug, Clone)]
4296 : pub struct OwnArc<T>(pub Arc<T>);
4297 :
4298 : impl<T> Deref for OwnArc<T> {
4299 : type Target = <Arc<T> as Deref>::Target;
4300 0 : fn deref(&self) -> &Self::Target {
4301 0 : &self.0
4302 0 : }
4303 : }
4304 :
4305 : impl<T> AsRef<T> for OwnArc<T> {
4306 0 : fn as_ref(&self) -> &T {
4307 0 : self.0.as_ref()
4308 0 : }
4309 : }
4310 :
4311 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
4312 0 : fn key_range(&self) -> &Range<Key> {
4313 0 : &self.key_range
4314 0 : }
4315 0 : fn lsn_range(&self) -> &Range<Lsn> {
4316 0 : &self.lsn_range
4317 0 : }
4318 0 : fn file_size(&self) -> u64 {
4319 0 : self.file_size
4320 0 : }
4321 0 : fn short_id(&self) -> std::string::String {
4322 0 : self.as_ref().short_id().to_string()
4323 0 : }
4324 0 : fn is_delta(&self) -> bool {
4325 0 : self.as_ref().is_delta()
4326 0 : }
4327 : }
4328 :
4329 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
4330 0 : fn key_range(&self) -> &Range<Key> {
4331 0 : &self.layer_desc().key_range
4332 0 : }
4333 0 : fn lsn_range(&self) -> &Range<Lsn> {
4334 0 : &self.layer_desc().lsn_range
4335 0 : }
4336 0 : fn file_size(&self) -> u64 {
4337 0 : self.layer_desc().file_size
4338 0 : }
4339 0 : fn short_id(&self) -> std::string::String {
4340 0 : self.layer_desc().short_id().to_string()
4341 0 : }
4342 0 : fn is_delta(&self) -> bool {
4343 0 : true
4344 0 : }
4345 : }
4346 :
4347 : impl CompactionLayer<Key> for ResidentDeltaLayer {
4348 0 : fn key_range(&self) -> &Range<Key> {
4349 0 : &self.0.layer_desc().key_range
4350 0 : }
4351 0 : fn lsn_range(&self) -> &Range<Lsn> {
4352 0 : &self.0.layer_desc().lsn_range
4353 0 : }
4354 0 : fn file_size(&self) -> u64 {
4355 0 : self.0.layer_desc().file_size
4356 0 : }
4357 0 : fn short_id(&self) -> std::string::String {
4358 0 : self.0.layer_desc().short_id().to_string()
4359 0 : }
4360 0 : fn is_delta(&self) -> bool {
4361 0 : true
4362 0 : }
4363 : }
4364 :
4365 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
4366 : type DeltaEntry<'a> = DeltaEntry<'a>;
4367 :
4368 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
4369 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
4370 0 : }
4371 : }
4372 :
4373 : impl CompactionLayer<Key> for ResidentImageLayer {
4374 0 : fn key_range(&self) -> &Range<Key> {
4375 0 : &self.0.layer_desc().key_range
4376 0 : }
4377 0 : fn lsn_range(&self) -> &Range<Lsn> {
4378 0 : &self.0.layer_desc().lsn_range
4379 0 : }
4380 0 : fn file_size(&self) -> u64 {
4381 0 : self.0.layer_desc().file_size
4382 0 : }
4383 0 : fn short_id(&self) -> std::string::String {
4384 0 : self.0.layer_desc().short_id().to_string()
4385 0 : }
4386 0 : fn is_delta(&self) -> bool {
4387 0 : false
4388 0 : }
4389 : }
4390 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|