Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 : use std::time::{Duration, Instant};
11 :
12 : use super::layer_manager::LayerManager;
13 : use super::{
14 : CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
15 : GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
16 : Timeline,
17 : };
18 :
19 : use crate::tenant::timeline::DeltaEntry;
20 : use crate::walredo::RedoAttemptType;
21 : use anyhow::{Context, anyhow};
22 : use bytes::Bytes;
23 : use enumset::EnumSet;
24 : use fail::fail_point;
25 : use futures::FutureExt;
26 : use itertools::Itertools;
27 : use once_cell::sync::Lazy;
28 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
29 : use pageserver_api::key::{KEY_SIZE, Key};
30 : use pageserver_api::keyspace::{KeySpace, ShardedRange};
31 : use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
32 : use pageserver_api::record::NeonWalRecord;
33 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
34 : use pageserver_api::value::Value;
35 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
36 : use pageserver_compaction::interface::*;
37 : use serde::Serialize;
38 : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
41 : use utils::critical;
42 : use utils::id::TimelineId;
43 : use utils::lsn::Lsn;
44 :
45 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
46 : use crate::page_cache;
47 : use crate::statvfs::Statvfs;
48 : use crate::tenant::checks::check_valid_layermap;
49 : use crate::tenant::gc_block::GcBlock;
50 : use crate::tenant::layer_map::LayerMap;
51 : use crate::tenant::remote_timeline_client::WaitCompletionError;
52 : use crate::tenant::remote_timeline_client::index::GcCompactionState;
53 : use crate::tenant::storage_layer::batch_split_writer::{
54 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
55 : };
56 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
57 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
58 : use crate::tenant::storage_layer::{
59 : AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, PersistentLayerKey,
60 : ValueReconstructState,
61 : };
62 : use crate::tenant::tasks::log_compaction_error;
63 : use crate::tenant::timeline::{
64 : DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
65 : ResidentLayer, drop_rlock,
66 : };
67 : use crate::tenant::{DeltaLayer, MaybeOffloaded};
68 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
69 :
70 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
71 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
72 :
73 : /// Ratio of shard-local pages below which we trigger shard ancestor layer rewrites. 0.3 means that
74 : /// <= 30% of layer pages must belong to the descendant shard to rewrite the layer.
75 : ///
76 : /// We choose a value < 0.5 to avoid rewriting all visible layers every time we do a power-of-two
77 : /// shard split, which gets expensive for large tenants.
78 : const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3;
79 :
80 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
81 : pub struct GcCompactionJobId(pub usize);
82 :
83 : impl std::fmt::Display for GcCompactionJobId {
84 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 0 : write!(f, "{}", self.0)
86 0 : }
87 : }
88 :
89 : pub struct GcCompactionCombinedSettings {
90 : pub gc_compaction_enabled: bool,
91 : pub gc_compaction_verification: bool,
92 : pub gc_compaction_initial_threshold_kb: u64,
93 : pub gc_compaction_ratio_percent: u64,
94 : }
95 :
96 : #[derive(Debug, Clone)]
97 : pub enum GcCompactionQueueItem {
98 : MetaJob {
99 : /// Compaction options
100 : options: CompactOptions,
101 : /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
102 : auto: bool,
103 : },
104 : SubCompactionJob(CompactOptions),
105 : Notify(GcCompactionJobId, Option<Lsn>),
106 : }
107 :
108 : impl GcCompactionQueueItem {
109 0 : pub fn into_compact_info_resp(
110 0 : self,
111 0 : id: GcCompactionJobId,
112 0 : running: bool,
113 0 : ) -> Option<CompactInfoResponse> {
114 0 : match self {
115 0 : GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
116 0 : compact_key_range: options.compact_key_range,
117 0 : compact_lsn_range: options.compact_lsn_range,
118 0 : sub_compaction: options.sub_compaction,
119 0 : running,
120 0 : job_id: id.0,
121 0 : }),
122 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
123 0 : compact_key_range: options.compact_key_range,
124 0 : compact_lsn_range: options.compact_lsn_range,
125 0 : sub_compaction: options.sub_compaction,
126 0 : running,
127 0 : job_id: id.0,
128 0 : }),
129 0 : GcCompactionQueueItem::Notify(_, _) => None,
130 : }
131 0 : }
132 : }
133 :
134 : #[derive(Default)]
135 : struct GcCompactionGuardItems {
136 : notify: Option<tokio::sync::oneshot::Sender<()>>,
137 : permit: Option<OwnedSemaphorePermit>,
138 : }
139 :
140 : struct GcCompactionQueueInner {
141 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
142 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
143 : guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
144 : last_id: GcCompactionJobId,
145 : }
146 :
147 : impl GcCompactionQueueInner {
148 0 : fn next_id(&mut self) -> GcCompactionJobId {
149 0 : let id = self.last_id;
150 0 : self.last_id = GcCompactionJobId(id.0 + 1);
151 0 : id
152 0 : }
153 : }
154 :
155 : /// A structure to store gc_compaction jobs.
156 : pub struct GcCompactionQueue {
157 : /// All items in the queue, and the currently-running job.
158 : inner: std::sync::Mutex<GcCompactionQueueInner>,
159 : /// Ensure only one thread is consuming the queue.
160 : consumer_lock: tokio::sync::Mutex<()>,
161 : }
162 :
163 0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
164 0 : // Only allow two timelines on one pageserver to run gc compaction at a time.
165 0 : Arc::new(Semaphore::new(2))
166 0 : });
167 :
168 : impl GcCompactionQueue {
169 0 : pub fn new() -> Self {
170 0 : GcCompactionQueue {
171 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
172 0 : running: None,
173 0 : queued: VecDeque::new(),
174 0 : guards: HashMap::new(),
175 0 : last_id: GcCompactionJobId(0),
176 0 : }),
177 0 : consumer_lock: tokio::sync::Mutex::new(()),
178 0 : }
179 0 : }
180 :
181 0 : pub fn cancel_scheduled(&self) {
182 0 : let mut guard = self.inner.lock().unwrap();
183 0 : guard.queued.clear();
184 0 : // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
185 0 : // API is only used for testing purposes, so we can drop everything here.
186 0 : guard.guards.clear();
187 0 : }
188 :
189 : /// Schedule a manual compaction job.
190 0 : pub fn schedule_manual_compaction(
191 0 : &self,
192 0 : options: CompactOptions,
193 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
194 0 : ) -> GcCompactionJobId {
195 0 : let mut guard = self.inner.lock().unwrap();
196 0 : let id = guard.next_id();
197 0 : guard.queued.push_back((
198 0 : id,
199 0 : GcCompactionQueueItem::MetaJob {
200 0 : options,
201 0 : auto: false,
202 0 : },
203 0 : ));
204 0 : guard.guards.entry(id).or_default().notify = notify;
205 0 : info!("scheduled compaction job id={}", id);
206 0 : id
207 0 : }
208 :
209 : /// Schedule an auto compaction job.
210 0 : fn schedule_auto_compaction(
211 0 : &self,
212 0 : options: CompactOptions,
213 0 : permit: OwnedSemaphorePermit,
214 0 : ) -> GcCompactionJobId {
215 0 : let mut guard = self.inner.lock().unwrap();
216 0 : let id = guard.next_id();
217 0 : guard.queued.push_back((
218 0 : id,
219 0 : GcCompactionQueueItem::MetaJob {
220 0 : options,
221 0 : auto: true,
222 0 : },
223 0 : ));
224 0 : guard.guards.entry(id).or_default().permit = Some(permit);
225 0 : id
226 0 : }
227 :
228 : /// Trigger an auto compaction.
229 0 : pub async fn trigger_auto_compaction(
230 0 : &self,
231 0 : timeline: &Arc<Timeline>,
232 0 : ) -> Result<(), CompactionError> {
233 0 : let GcCompactionCombinedSettings {
234 0 : gc_compaction_enabled,
235 0 : gc_compaction_initial_threshold_kb,
236 0 : gc_compaction_ratio_percent,
237 0 : ..
238 0 : } = timeline.get_gc_compaction_settings();
239 0 : if !gc_compaction_enabled {
240 0 : return Ok(());
241 0 : }
242 0 : if self.remaining_jobs_num() > 0 {
243 : // Only schedule auto compaction when the queue is empty
244 0 : return Ok(());
245 0 : }
246 0 : if timeline.ancestor_timeline().is_some() {
247 : // Do not trigger auto compaction for child timelines. We haven't tested
248 : // it enough in staging yet.
249 0 : return Ok(());
250 0 : }
251 0 : if timeline.get_gc_compaction_watermark() == Lsn::INVALID {
252 : // If the gc watermark is not set, we don't need to trigger auto compaction.
253 : // This check is the same as in `gc_compaction_split_jobs` but we don't log
254 : // here and we can also skip the computation of the trigger condition earlier.
255 0 : return Ok(());
256 0 : }
257 :
258 0 : let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
259 : // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
260 : // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
261 : // to ensure the fairness while avoid starving other tasks.
262 0 : return Ok(());
263 : };
264 :
265 0 : let gc_compaction_state = timeline.get_gc_compaction_state();
266 0 : let l2_lsn = gc_compaction_state
267 0 : .map(|x| x.last_completed_lsn)
268 0 : .unwrap_or(Lsn::INVALID);
269 :
270 0 : let layers = {
271 0 : let guard = timeline.layers.read().await;
272 0 : let layer_map = guard.layer_map()?;
273 0 : layer_map.iter_historic_layers().collect_vec()
274 0 : };
275 0 : let mut l2_size: u64 = 0;
276 0 : let mut l1_size = 0;
277 0 : let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
278 0 : for layer in layers {
279 0 : if layer.lsn_range.start <= l2_lsn {
280 0 : l2_size += layer.file_size();
281 0 : } else if layer.lsn_range.start <= gc_cutoff {
282 0 : l1_size += layer.file_size();
283 0 : }
284 : }
285 :
286 0 : fn trigger_compaction(
287 0 : l1_size: u64,
288 0 : l2_size: u64,
289 0 : gc_compaction_initial_threshold_kb: u64,
290 0 : gc_compaction_ratio_percent: u64,
291 0 : ) -> bool {
292 : const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
293 0 : if l1_size + l2_size >= AUTO_TRIGGER_LIMIT {
294 : // Do not auto-trigger when physical size >= 150GB
295 0 : return false;
296 0 : }
297 0 : // initial trigger
298 0 : if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
299 0 : info!(
300 0 : "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
301 : l1_size, gc_compaction_initial_threshold_kb
302 : );
303 0 : return true;
304 0 : }
305 0 : // size ratio trigger
306 0 : if l2_size == 0 {
307 0 : return false;
308 0 : }
309 0 : if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
310 0 : info!(
311 0 : "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
312 : l1_size, l2_size, gc_compaction_ratio_percent
313 : );
314 0 : return true;
315 0 : }
316 0 : false
317 0 : }
318 :
319 0 : if trigger_compaction(
320 0 : l1_size,
321 0 : l2_size,
322 0 : gc_compaction_initial_threshold_kb,
323 0 : gc_compaction_ratio_percent,
324 0 : ) {
325 0 : self.schedule_auto_compaction(
326 0 : CompactOptions {
327 0 : flags: {
328 0 : let mut flags = EnumSet::new();
329 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
330 0 : if timeline.get_compaction_l0_first() {
331 0 : flags |= CompactFlags::YieldForL0;
332 0 : }
333 0 : flags
334 0 : },
335 0 : sub_compaction: true,
336 0 : // Only auto-trigger gc-compaction over the data keyspace due to concerns in
337 0 : // https://github.com/neondatabase/neon/issues/11318.
338 0 : compact_key_range: Some(CompactKeyRange {
339 0 : start: Key::MIN,
340 0 : end: Key::metadata_key_range().start,
341 0 : }),
342 0 : compact_lsn_range: None,
343 0 : sub_compaction_max_job_size_mb: None,
344 0 : },
345 0 : permit,
346 0 : );
347 0 : info!(
348 0 : "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
349 : l1_size, l2_size, l2_lsn, gc_cutoff
350 : );
351 : } else {
352 0 : debug!(
353 0 : "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
354 : l1_size, l2_size, l2_lsn, gc_cutoff
355 : );
356 : }
357 0 : Ok(())
358 0 : }
359 :
360 : /// Notify the caller the job has finished and unblock GC.
361 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
362 0 : info!("compaction job id={} finished", id);
363 0 : let mut guard = self.inner.lock().unwrap();
364 0 : if let Some(items) = guard.guards.remove(&id) {
365 0 : if let Some(tx) = items.notify {
366 0 : let _ = tx.send(());
367 0 : }
368 0 : }
369 0 : }
370 :
371 0 : fn clear_running_job(&self) {
372 0 : let mut guard = self.inner.lock().unwrap();
373 0 : guard.running = None;
374 0 : }
375 :
376 0 : async fn handle_sub_compaction(
377 0 : &self,
378 0 : id: GcCompactionJobId,
379 0 : options: CompactOptions,
380 0 : timeline: &Arc<Timeline>,
381 0 : auto: bool,
382 0 : ) -> Result<(), CompactionError> {
383 0 : info!(
384 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
385 : );
386 0 : let res = timeline
387 0 : .gc_compaction_split_jobs(
388 0 : GcCompactJob::from_compact_options(options.clone()),
389 0 : options.sub_compaction_max_job_size_mb,
390 0 : )
391 0 : .await;
392 0 : let jobs = match res {
393 0 : Ok(jobs) => jobs,
394 0 : Err(err) => {
395 0 : warn!("cannot split gc-compaction jobs: {}, unblocked gc", err);
396 0 : self.notify_and_unblock(id);
397 0 : return Err(err);
398 : }
399 : };
400 0 : if jobs.is_empty() {
401 0 : info!("no jobs to run, skipping scheduled compaction task");
402 0 : self.notify_and_unblock(id);
403 : } else {
404 0 : let jobs_len = jobs.len();
405 0 : let mut pending_tasks = Vec::new();
406 0 : // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
407 0 : // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
408 0 : let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
409 0 : for job in jobs {
410 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
411 : // until we do further refactors to allow directly call `compact_with_gc`.
412 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
413 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
414 0 : if job.dry_run {
415 0 : flags |= CompactFlags::DryRun;
416 0 : }
417 0 : if options.flags.contains(CompactFlags::YieldForL0) {
418 0 : flags |= CompactFlags::YieldForL0;
419 0 : }
420 0 : let options = CompactOptions {
421 0 : flags,
422 0 : sub_compaction: false,
423 0 : compact_key_range: Some(job.compact_key_range.into()),
424 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
425 0 : sub_compaction_max_job_size_mb: None,
426 0 : };
427 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
428 : }
429 :
430 0 : if !auto {
431 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
432 0 : } else {
433 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
434 0 : }
435 :
436 : {
437 0 : let mut guard = self.inner.lock().unwrap();
438 0 : let mut tasks = Vec::new();
439 0 : for task in pending_tasks {
440 0 : let id = guard.next_id();
441 0 : tasks.push((id, task));
442 0 : }
443 0 : tasks.reverse();
444 0 : for item in tasks {
445 0 : guard.queued.push_front(item);
446 0 : }
447 : }
448 0 : info!(
449 0 : "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
450 : jobs_len
451 : );
452 : }
453 0 : Ok(())
454 0 : }
455 :
456 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
457 0 : pub async fn iteration(
458 0 : &self,
459 0 : cancel: &CancellationToken,
460 0 : ctx: &RequestContext,
461 0 : gc_block: &GcBlock,
462 0 : timeline: &Arc<Timeline>,
463 0 : ) -> Result<CompactionOutcome, CompactionError> {
464 0 : let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
465 0 : if let Err(err) = &res {
466 0 : log_compaction_error(err, None, cancel.is_cancelled(), true);
467 0 : }
468 0 : match res {
469 0 : Ok(res) => Ok(res),
470 0 : Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
471 : Err(_) => {
472 : // There are some cases where traditional gc might collect some layer
473 : // files causing gc-compaction cannot read the full history of the key.
474 : // This needs to be resolved in the long-term by improving the compaction
475 : // process. For now, let's simply avoid such errors triggering the
476 : // circuit breaker.
477 0 : Ok(CompactionOutcome::Skipped)
478 : }
479 : }
480 0 : }
481 :
482 0 : async fn iteration_inner(
483 0 : &self,
484 0 : cancel: &CancellationToken,
485 0 : ctx: &RequestContext,
486 0 : gc_block: &GcBlock,
487 0 : timeline: &Arc<Timeline>,
488 0 : ) -> Result<CompactionOutcome, CompactionError> {
489 0 : let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
490 0 : return Err(CompactionError::AlreadyRunning(
491 0 : "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
492 0 : ));
493 : };
494 : let has_pending_tasks;
495 0 : let mut yield_for_l0 = false;
496 0 : let Some((id, item)) = ({
497 0 : let mut guard = self.inner.lock().unwrap();
498 0 : if let Some((id, item)) = guard.queued.pop_front() {
499 0 : guard.running = Some((id, item.clone()));
500 0 : has_pending_tasks = !guard.queued.is_empty();
501 0 : Some((id, item))
502 : } else {
503 0 : has_pending_tasks = false;
504 0 : None
505 : }
506 : }) else {
507 0 : self.trigger_auto_compaction(timeline).await?;
508 : // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
509 : // have not implemented preemption mechanism yet. We always want to yield it to more important
510 : // tasks if there is one.
511 0 : return Ok(CompactionOutcome::Done);
512 : };
513 0 : match item {
514 0 : GcCompactionQueueItem::MetaJob { options, auto } => {
515 0 : if !options
516 0 : .flags
517 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
518 : {
519 0 : warn!(
520 0 : "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
521 : options
522 : );
523 0 : } else if options.sub_compaction {
524 0 : info!(
525 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
526 : );
527 0 : self.handle_sub_compaction(id, options, timeline, auto)
528 0 : .await?;
529 : } else {
530 : // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
531 : // in this branch.
532 0 : let _gc_guard = match gc_block.start().await {
533 0 : Ok(guard) => guard,
534 0 : Err(e) => {
535 0 : self.notify_and_unblock(id);
536 0 : self.clear_running_job();
537 0 : return Err(CompactionError::Other(anyhow!(
538 0 : "cannot run gc-compaction because gc is blocked: {}",
539 0 : e
540 0 : )));
541 : }
542 : };
543 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
544 0 : let compaction_result = match res {
545 0 : Ok(res) => res,
546 0 : Err(err) => {
547 0 : warn!(%err, "failed to run gc-compaction");
548 0 : self.notify_and_unblock(id);
549 0 : self.clear_running_job();
550 0 : return Err(err);
551 : }
552 : };
553 0 : if compaction_result == CompactionOutcome::YieldForL0 {
554 0 : yield_for_l0 = true;
555 0 : }
556 : }
557 : }
558 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
559 : // TODO: error handling, clear the queue if any task fails?
560 0 : let _gc_guard = match gc_block.start().await {
561 0 : Ok(guard) => guard,
562 0 : Err(e) => {
563 0 : self.clear_running_job();
564 0 : return Err(CompactionError::Other(anyhow!(
565 0 : "cannot run gc-compaction because gc is blocked: {}",
566 0 : e
567 0 : )));
568 : }
569 : };
570 0 : let res = timeline.compact_with_options(cancel, options, ctx).await;
571 0 : let compaction_result = match res {
572 0 : Ok(res) => res,
573 0 : Err(err) => {
574 0 : warn!(%err, "failed to run gc-compaction subcompaction job");
575 0 : self.clear_running_job();
576 0 : return Err(err);
577 : }
578 : };
579 0 : if compaction_result == CompactionOutcome::YieldForL0 {
580 0 : // We will permenantly give up a task if we yield for L0 compaction: the preempted subcompaction job won't be running
581 0 : // again. This ensures that we don't keep doing duplicated work within gc-compaction. Not directly returning here because
582 0 : // we need to clean things up before returning from the function.
583 0 : yield_for_l0 = true;
584 0 : }
585 : }
586 0 : GcCompactionQueueItem::Notify(id, l2_lsn) => {
587 0 : self.notify_and_unblock(id);
588 0 : if let Some(l2_lsn) = l2_lsn {
589 0 : let current_l2_lsn = timeline
590 0 : .get_gc_compaction_state()
591 0 : .map(|x| x.last_completed_lsn)
592 0 : .unwrap_or(Lsn::INVALID);
593 0 : if l2_lsn >= current_l2_lsn {
594 0 : info!("l2_lsn updated to {}", l2_lsn);
595 0 : timeline
596 0 : .update_gc_compaction_state(GcCompactionState {
597 0 : last_completed_lsn: l2_lsn,
598 0 : })
599 0 : .map_err(CompactionError::Other)?;
600 : } else {
601 0 : warn!(
602 0 : "l2_lsn updated to {} but it is less than the current l2_lsn {}",
603 : l2_lsn, current_l2_lsn
604 : );
605 : }
606 0 : }
607 : }
608 : }
609 0 : self.clear_running_job();
610 0 : Ok(if yield_for_l0 {
611 0 : tracing::info!("give up gc-compaction: yield for L0 compaction");
612 0 : CompactionOutcome::YieldForL0
613 0 : } else if has_pending_tasks {
614 0 : CompactionOutcome::Pending
615 : } else {
616 0 : CompactionOutcome::Done
617 : })
618 0 : }
619 :
620 : #[allow(clippy::type_complexity)]
621 0 : pub fn remaining_jobs(
622 0 : &self,
623 0 : ) -> (
624 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
625 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
626 0 : ) {
627 0 : let guard = self.inner.lock().unwrap();
628 0 : (guard.running.clone(), guard.queued.clone())
629 0 : }
630 :
631 0 : pub fn remaining_jobs_num(&self) -> usize {
632 0 : let guard = self.inner.lock().unwrap();
633 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
634 0 : }
635 : }
636 :
637 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
638 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
639 : /// called.
640 : #[derive(Debug, Clone)]
641 : pub(crate) struct GcCompactJob {
642 : pub dry_run: bool,
643 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
644 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
645 : pub compact_key_range: Range<Key>,
646 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
647 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
648 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
649 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
650 : pub compact_lsn_range: Range<Lsn>,
651 : }
652 :
653 : impl GcCompactJob {
654 112 : pub fn from_compact_options(options: CompactOptions) -> Self {
655 112 : GcCompactJob {
656 112 : dry_run: options.flags.contains(CompactFlags::DryRun),
657 112 : compact_key_range: options
658 112 : .compact_key_range
659 112 : .map(|x| x.into())
660 112 : .unwrap_or(Key::MIN..Key::MAX),
661 112 : compact_lsn_range: options
662 112 : .compact_lsn_range
663 112 : .map(|x| x.into())
664 112 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
665 112 : }
666 112 : }
667 : }
668 :
669 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
670 : /// and contains the exact layers we want to compact.
671 : pub struct GcCompactionJobDescription {
672 : /// All layers to read in the compaction job
673 : selected_layers: Vec<Layer>,
674 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
675 : /// keep all deltas <= this LSN or generate an image == this LSN.
676 : gc_cutoff: Lsn,
677 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
678 : /// generate an image == this LSN.
679 : retain_lsns_below_horizon: Vec<Lsn>,
680 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
681 : /// \>= this LSN will be kept and will not be rewritten.
682 : max_layer_lsn: Lsn,
683 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
684 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
685 : /// k-merge within gc-compaction.
686 : min_layer_lsn: Lsn,
687 : /// Only compact layers overlapping with this range.
688 : compaction_key_range: Range<Key>,
689 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
690 : /// This field is here solely for debugging. The field will not be read once the compaction
691 : /// description is generated.
692 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
693 : }
694 :
695 : /// The result of bottom-most compaction for a single key at each LSN.
696 : #[derive(Debug)]
697 : #[cfg_attr(test, derive(PartialEq))]
698 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
699 :
700 : /// The result of bottom-most compaction.
701 : #[derive(Debug)]
702 : #[cfg_attr(test, derive(PartialEq))]
703 : pub(crate) struct KeyHistoryRetention {
704 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
705 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
706 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
707 : pub(crate) above_horizon: KeyLogAtLsn,
708 : }
709 :
710 : impl KeyHistoryRetention {
711 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
712 : ///
713 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
714 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
715 : /// And we have branches at LSN 0x10, 0x20, 0x30.
716 : /// Then we delete branch @ 0x20.
717 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
718 : /// And that wouldnt' change the shape of the layer.
719 : ///
720 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
721 : ///
722 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
723 148 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
724 148 : if dry_run {
725 0 : return true;
726 148 : }
727 148 : if LayerMap::is_l0(&key.key_range, key.is_delta) {
728 : // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
729 : // We should ignore such layers.
730 0 : return true;
731 148 : }
732 : let layer_generation;
733 : {
734 148 : let guard = tline.layers.read().await;
735 148 : if !guard.contains_key(key) {
736 104 : return false;
737 44 : }
738 44 : layer_generation = guard.get_from_key(key).metadata().generation;
739 44 : }
740 44 : if layer_generation == tline.generation {
741 44 : info!(
742 : key=%key,
743 : ?layer_generation,
744 0 : "discard layer due to duplicated layer key in the same generation",
745 : );
746 44 : true
747 : } else {
748 0 : false
749 : }
750 148 : }
751 :
752 : /// Pipe a history of a single key to the writers.
753 : ///
754 : /// If `image_writer` is none, the images will be placed into the delta layers.
755 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
756 : #[allow(clippy::too_many_arguments)]
757 1276 : async fn pipe_to(
758 1276 : self,
759 1276 : key: Key,
760 1276 : delta_writer: &mut SplitDeltaLayerWriter<'_>,
761 1276 : mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
762 1276 : stat: &mut CompactionStatistics,
763 1276 : ctx: &RequestContext,
764 1276 : ) -> anyhow::Result<()> {
765 1276 : let mut first_batch = true;
766 4088 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
767 2812 : if first_batch {
768 1276 : if logs.len() == 1 && logs[0].1.is_image() {
769 1200 : let Value::Image(img) = &logs[0].1 else {
770 0 : unreachable!()
771 : };
772 1200 : stat.produce_image_key(img);
773 1200 : if let Some(image_writer) = image_writer.as_mut() {
774 1200 : image_writer.put_image(key, img.clone(), ctx).await?;
775 : } else {
776 0 : delta_writer
777 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
778 0 : .await?;
779 : }
780 : } else {
781 132 : for (lsn, val) in logs {
782 56 : stat.produce_key(&val);
783 56 : delta_writer.put_value(key, lsn, val, ctx).await?;
784 : }
785 : }
786 1276 : first_batch = false;
787 : } else {
788 1768 : for (lsn, val) in logs {
789 232 : stat.produce_key(&val);
790 232 : delta_writer.put_value(key, lsn, val, ctx).await?;
791 : }
792 : }
793 : }
794 1276 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
795 1392 : for (lsn, val) in above_horizon_logs {
796 116 : stat.produce_key(&val);
797 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
798 : }
799 1276 : Ok(())
800 1276 : }
801 :
802 : /// Verify if every key in the retention is readable by replaying the logs.
803 1292 : async fn verify(
804 1292 : &self,
805 1292 : key: Key,
806 1292 : base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
807 1292 : full_history: &[(Key, Lsn, Value)],
808 1292 : tline: &Arc<Timeline>,
809 1292 : ) -> anyhow::Result<()> {
810 : // Usually the min_lsn should be the first record but we do a full iteration to be safe.
811 1832 : let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
812 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
813 0 : return Ok(());
814 : };
815 1832 : let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
816 : // This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
817 0 : return Ok(());
818 : };
819 1292 : let mut base_img = base_img_from_ancestor
820 1292 : .as_ref()
821 1292 : .map(|(_, lsn, img)| (*lsn, img));
822 1292 : let mut history = Vec::new();
823 :
824 4108 : async fn collect_and_verify(
825 4108 : key: Key,
826 4108 : lsn: Lsn,
827 4108 : base_img: &Option<(Lsn, &Bytes)>,
828 4108 : history: &[(Lsn, &NeonWalRecord)],
829 4108 : tline: &Arc<Timeline>,
830 4108 : skip_empty: bool,
831 4108 : ) -> anyhow::Result<()> {
832 4108 : if base_img.is_none() && history.is_empty() {
833 0 : if skip_empty {
834 0 : return Ok(());
835 0 : }
836 0 : anyhow::bail!("verification failed: key {} has no history at {}", key, lsn);
837 4108 : };
838 4108 :
839 4108 : let mut records = history
840 4108 : .iter()
841 4108 : .map(|(lsn, val)| (*lsn, (*val).clone()))
842 4108 : .collect::<Vec<_>>();
843 4108 :
844 4108 : // WAL redo requires records in the reverse LSN order
845 4108 : records.reverse();
846 4108 : let data = ValueReconstructState {
847 4108 : img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
848 4108 : records,
849 4108 : };
850 4108 :
851 4108 : tline
852 4108 : .reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
853 4108 : .await
854 4108 : .with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
855 :
856 4108 : Ok(())
857 4108 : }
858 :
859 4144 : for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
860 4384 : for (lsn, val) in logs {
861 304 : match val {
862 1228 : Value::Image(img) => {
863 1228 : base_img = Some((*lsn, img));
864 1228 : history.clear();
865 1228 : }
866 304 : Value::WalRecord(rec) if val.will_init() => {
867 0 : base_img = None;
868 0 : history.clear();
869 0 : history.push((*lsn, rec));
870 0 : }
871 304 : Value::WalRecord(rec) => {
872 304 : history.push((*lsn, rec));
873 304 : }
874 : }
875 : }
876 2852 : if *retain_lsn >= min_lsn {
877 : // Only verify after the key appears in the full history for the first time.
878 :
879 : // We don't modify history: in theory, we could replace the history with a single
880 : // image as in `generate_key_retention` to make redos at later LSNs faster. But we
881 : // want to verify everything as if they are read from the real layer map.
882 2796 : collect_and_verify(key, *retain_lsn, &base_img, &history, tline, false)
883 2796 : .await
884 2796 : .context("below horizon retain_lsn")?;
885 56 : }
886 : }
887 :
888 1440 : for (lsn, val) in &self.above_horizon.0 {
889 128 : match val {
890 20 : Value::Image(img) => {
891 20 : // Above the GC horizon, we verify every time we see an image.
892 20 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
893 20 : .await
894 20 : .context("above horizon full image")?;
895 20 : base_img = Some((*lsn, img));
896 20 : history.clear();
897 : }
898 128 : Value::WalRecord(rec) if val.will_init() => {
899 0 : // Above the GC horizon, we verify every time we see an init record.
900 0 : collect_and_verify(key, *lsn, &base_img, &history, tline, true)
901 0 : .await
902 0 : .context("above horizon init record")?;
903 0 : base_img = None;
904 0 : history.clear();
905 0 : history.push((*lsn, rec));
906 : }
907 128 : Value::WalRecord(rec) => {
908 128 : history.push((*lsn, rec));
909 128 : }
910 : }
911 : }
912 : // Ensure the latest record is readable.
913 1292 : collect_and_verify(key, max_lsn, &base_img, &history, tline, false)
914 1292 : .await
915 1292 : .context("latest record")?;
916 1292 : Ok(())
917 1292 : }
918 : }
919 :
920 : #[derive(Debug, Serialize, Default)]
921 : struct CompactionStatisticsNumSize {
922 : num: u64,
923 : size: u64,
924 : }
925 :
926 : #[derive(Debug, Serialize, Default)]
927 : pub struct CompactionStatistics {
928 : /// Delta layer visited (maybe compressed, physical size)
929 : delta_layer_visited: CompactionStatisticsNumSize,
930 : /// Image layer visited (maybe compressed, physical size)
931 : image_layer_visited: CompactionStatisticsNumSize,
932 : /// Delta layer produced (maybe compressed, physical size)
933 : delta_layer_produced: CompactionStatisticsNumSize,
934 : /// Image layer produced (maybe compressed, physical size)
935 : image_layer_produced: CompactionStatisticsNumSize,
936 : /// Delta layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
937 : delta_layer_discarded: CompactionStatisticsNumSize,
938 : /// Image layer discarded (maybe compressed, physical size of the layer being discarded instead of the original layer)
939 : image_layer_discarded: CompactionStatisticsNumSize,
940 : num_unique_keys_visited: usize,
941 : /// Delta visited (uncompressed, original size)
942 : wal_keys_visited: CompactionStatisticsNumSize,
943 : /// Image visited (uncompressed, original size)
944 : image_keys_visited: CompactionStatisticsNumSize,
945 : /// Delta produced (uncompressed, original size)
946 : wal_produced: CompactionStatisticsNumSize,
947 : /// Image produced (uncompressed, original size)
948 : image_produced: CompactionStatisticsNumSize,
949 :
950 : // Time spent in each phase
951 : time_acquire_lock_secs: f64,
952 : time_analyze_secs: f64,
953 : time_download_layer_secs: f64,
954 : time_to_first_kv_pair_secs: f64,
955 : time_main_loop_secs: f64,
956 : time_final_phase_secs: f64,
957 : time_total_secs: f64,
958 :
959 : // Summary
960 : /// Ratio of the key-value size after/before gc-compaction.
961 : uncompressed_retention_ratio: f64,
962 : /// Ratio of the physical size after/before gc-compaction.
963 : compressed_retention_ratio: f64,
964 : }
965 :
966 : impl CompactionStatistics {
967 2136 : fn estimated_size_of_value(val: &Value) -> usize {
968 876 : match val {
969 1260 : Value::Image(img) => img.len(),
970 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
971 876 : _ => std::mem::size_of::<NeonWalRecord>(),
972 : }
973 2136 : }
974 3356 : fn estimated_size_of_key() -> usize {
975 3356 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
976 3356 : }
977 176 : fn visit_delta_layer(&mut self, size: u64) {
978 176 : self.delta_layer_visited.num += 1;
979 176 : self.delta_layer_visited.size += size;
980 176 : }
981 140 : fn visit_image_layer(&mut self, size: u64) {
982 140 : self.image_layer_visited.num += 1;
983 140 : self.image_layer_visited.size += size;
984 140 : }
985 1280 : fn on_unique_key_visited(&mut self) {
986 1280 : self.num_unique_keys_visited += 1;
987 1280 : }
988 492 : fn visit_wal_key(&mut self, val: &Value) {
989 492 : self.wal_keys_visited.num += 1;
990 492 : self.wal_keys_visited.size +=
991 492 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
992 492 : }
993 1260 : fn visit_image_key(&mut self, val: &Value) {
994 1260 : self.image_keys_visited.num += 1;
995 1260 : self.image_keys_visited.size +=
996 1260 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
997 1260 : }
998 404 : fn produce_key(&mut self, val: &Value) {
999 404 : match val {
1000 20 : Value::Image(img) => self.produce_image_key(img),
1001 384 : Value::WalRecord(_) => self.produce_wal_key(val),
1002 : }
1003 404 : }
1004 384 : fn produce_wal_key(&mut self, val: &Value) {
1005 384 : self.wal_produced.num += 1;
1006 384 : self.wal_produced.size +=
1007 384 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
1008 384 : }
1009 1220 : fn produce_image_key(&mut self, val: &Bytes) {
1010 1220 : self.image_produced.num += 1;
1011 1220 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
1012 1220 : }
1013 28 : fn discard_delta_layer(&mut self, original_size: u64) {
1014 28 : self.delta_layer_discarded.num += 1;
1015 28 : self.delta_layer_discarded.size += original_size;
1016 28 : }
1017 16 : fn discard_image_layer(&mut self, original_size: u64) {
1018 16 : self.image_layer_discarded.num += 1;
1019 16 : self.image_layer_discarded.size += original_size;
1020 16 : }
1021 48 : fn produce_delta_layer(&mut self, size: u64) {
1022 48 : self.delta_layer_produced.num += 1;
1023 48 : self.delta_layer_produced.size += size;
1024 48 : }
1025 60 : fn produce_image_layer(&mut self, size: u64) {
1026 60 : self.image_layer_produced.num += 1;
1027 60 : self.image_layer_produced.size += size;
1028 60 : }
1029 104 : fn finalize(&mut self) {
1030 104 : let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
1031 104 : let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
1032 104 : self.uncompressed_retention_ratio =
1033 104 : produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
1034 104 : let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
1035 104 : let produced_physical_size = self.image_layer_produced.size
1036 104 : + self.delta_layer_produced.size
1037 104 : + self.image_layer_discarded.size
1038 104 : + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
1039 104 : self.compressed_retention_ratio =
1040 104 : produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
1041 104 : }
1042 : }
1043 :
1044 : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
1045 : pub enum CompactionOutcome {
1046 : #[default]
1047 : /// No layers need to be compacted after this round. Compaction doesn't need
1048 : /// to be immediately scheduled.
1049 : Done,
1050 : /// Still has pending layers to be compacted after this round. Ideally, the scheduler
1051 : /// should immediately schedule another compaction.
1052 : Pending,
1053 : /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
1054 : /// guaranteed when `compaction_l0_first` is enabled).
1055 : YieldForL0,
1056 : /// Compaction was skipped, because the timeline is ineligible for compaction.
1057 : Skipped,
1058 : }
1059 :
1060 : impl Timeline {
1061 : /// TODO: cancellation
1062 : ///
1063 : /// Returns whether the compaction has pending tasks.
1064 728 : pub(crate) async fn compact_legacy(
1065 728 : self: &Arc<Self>,
1066 728 : cancel: &CancellationToken,
1067 728 : options: CompactOptions,
1068 728 : ctx: &RequestContext,
1069 728 : ) -> Result<CompactionOutcome, CompactionError> {
1070 728 : if options
1071 728 : .flags
1072 728 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
1073 : {
1074 0 : self.compact_with_gc(cancel, options, ctx).await?;
1075 0 : return Ok(CompactionOutcome::Done);
1076 728 : }
1077 728 :
1078 728 : if options.flags.contains(CompactFlags::DryRun) {
1079 0 : return Err(CompactionError::Other(anyhow!(
1080 0 : "dry-run mode is not supported for legacy compaction for now"
1081 0 : )));
1082 728 : }
1083 728 :
1084 728 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
1085 : // maybe useful in the future? could implement this at some point
1086 0 : return Err(CompactionError::Other(anyhow!(
1087 0 : "compaction range is not supported for legacy compaction for now"
1088 0 : )));
1089 728 : }
1090 728 :
1091 728 : // High level strategy for compaction / image creation:
1092 728 : //
1093 728 : // 1. First, do a L0 compaction to ensure we move the L0
1094 728 : // layers into the historic layer map get flat levels of
1095 728 : // layers. If we did not compact all L0 layers, we will
1096 728 : // prioritize compacting the timeline again and not do
1097 728 : // any of the compactions below.
1098 728 : //
1099 728 : // 2. Then, calculate the desired "partitioning" of the
1100 728 : // currently in-use key space. The goal is to partition the
1101 728 : // key space into roughly fixed-size chunks, but also take into
1102 728 : // account any existing image layers, and try to align the
1103 728 : // chunk boundaries with the existing image layers to avoid
1104 728 : // too much churn. Also try to align chunk boundaries with
1105 728 : // relation boundaries. In principle, we don't know about
1106 728 : // relation boundaries here, we just deal with key-value
1107 728 : // pairs, and the code in pgdatadir_mapping.rs knows how to
1108 728 : // map relations into key-value pairs. But in practice we know
1109 728 : // that 'field6' is the block number, and the fields 1-5
1110 728 : // identify a relation. This is just an optimization,
1111 728 : // though.
1112 728 : //
1113 728 : // 3. Once we know the partitioning, for each partition,
1114 728 : // decide if it's time to create a new image layer. The
1115 728 : // criteria is: there has been too much "churn" since the last
1116 728 : // image layer? The "churn" is fuzzy concept, it's a
1117 728 : // combination of too many delta files, or too much WAL in
1118 728 : // total in the delta file. Or perhaps: if creating an image
1119 728 : // file would allow to delete some older files.
1120 728 : //
1121 728 : // 4. In the end, if the tenant gets auto-sharded, we will run
1122 728 : // a shard-ancestor compaction.
1123 728 :
1124 728 : // Is the timeline being deleted?
1125 728 : if self.is_stopping() {
1126 0 : trace!("Dropping out of compaction on timeline shutdown");
1127 0 : return Err(CompactionError::ShuttingDown);
1128 728 : }
1129 728 :
1130 728 : let target_file_size = self.get_checkpoint_distance();
1131 :
1132 : // Define partitioning schema if needed
1133 :
1134 : // 1. L0 Compact
1135 728 : let l0_outcome = {
1136 728 : let timer = self.metrics.compact_time_histo.start_timer();
1137 728 : let l0_outcome = self
1138 728 : .compact_level0(
1139 728 : target_file_size,
1140 728 : options.flags.contains(CompactFlags::ForceL0Compaction),
1141 728 : ctx,
1142 728 : )
1143 728 : .await?;
1144 728 : timer.stop_and_record();
1145 728 : l0_outcome
1146 728 : };
1147 728 :
1148 728 : if options.flags.contains(CompactFlags::OnlyL0Compaction) {
1149 0 : return Ok(l0_outcome);
1150 728 : }
1151 728 :
1152 728 : // Yield if we have pending L0 compaction. The scheduler will do another pass.
1153 728 : if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
1154 0 : && options.flags.contains(CompactFlags::YieldForL0)
1155 : {
1156 0 : info!("image/ancestor compaction yielding for L0 compaction");
1157 0 : return Ok(CompactionOutcome::YieldForL0);
1158 728 : }
1159 728 :
1160 728 : // 2. Repartition and create image layers if necessary
1161 728 : match self
1162 728 : .repartition(
1163 728 : self.get_last_record_lsn(),
1164 728 : self.get_compaction_target_size(),
1165 728 : options.flags,
1166 728 : ctx,
1167 728 : )
1168 728 : .await
1169 : {
1170 728 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
1171 728 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
1172 728 : let image_ctx = RequestContextBuilder::from(ctx)
1173 728 : .access_stats_behavior(AccessStatsBehavior::Skip)
1174 728 : .attached_child();
1175 728 :
1176 728 : let mut partitioning = dense_partitioning;
1177 728 : partitioning
1178 728 : .parts
1179 728 : .extend(sparse_partitioning.into_dense().parts);
1180 :
1181 : // 3. Create new image layers for partitions that have been modified "enough".
1182 728 : let (image_layers, outcome) = self
1183 728 : .create_image_layers(
1184 728 : &partitioning,
1185 728 : lsn,
1186 728 : if options
1187 728 : .flags
1188 728 : .contains(CompactFlags::ForceImageLayerCreation)
1189 : {
1190 28 : ImageLayerCreationMode::Force
1191 : } else {
1192 700 : ImageLayerCreationMode::Try
1193 : },
1194 728 : &image_ctx,
1195 728 : self.last_image_layer_creation_status
1196 728 : .load()
1197 728 : .as_ref()
1198 728 : .clone(),
1199 728 : options.flags.contains(CompactFlags::YieldForL0),
1200 728 : )
1201 728 : .await
1202 728 : .inspect_err(|err| {
1203 : if let CreateImageLayersError::GetVectoredError(
1204 : GetVectoredError::MissingKey(_),
1205 0 : ) = err
1206 : {
1207 0 : critical!("missing key during compaction: {err:?}");
1208 0 : }
1209 728 : })?;
1210 :
1211 728 : self.last_image_layer_creation_status
1212 728 : .store(Arc::new(outcome.clone()));
1213 728 :
1214 728 : self.upload_new_image_layers(image_layers)?;
1215 728 : if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
1216 : // Yield and do not do any other kind of compaction.
1217 0 : info!(
1218 0 : "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
1219 : );
1220 0 : return Ok(CompactionOutcome::YieldForL0);
1221 728 : }
1222 : }
1223 :
1224 : // Suppress errors when cancelled.
1225 0 : Err(_) if self.cancel.is_cancelled() => {}
1226 0 : Err(err) if err.is_cancel() => {}
1227 :
1228 : // Alert on critical errors that indicate data corruption.
1229 0 : Err(err) if err.is_critical() => {
1230 0 : critical!("could not compact, repartitioning keyspace failed: {err:?}");
1231 : }
1232 :
1233 : // Log other errors. No partitioning? This is normal, if the timeline was just created
1234 : // as an empty timeline. Also in unit tests, when we use the timeline as a simple
1235 : // key-value store, ignoring the datadir layout. Log the error but continue.
1236 0 : Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
1237 : };
1238 :
1239 728 : let partition_count = self.partitioning.read().0.0.parts.len();
1240 728 :
1241 728 : // 4. Shard ancestor compaction
1242 728 : if self.get_compaction_shard_ancestor() && self.shard_identity.count >= ShardCount::new(2) {
1243 : // Limit the number of layer rewrites to the number of partitions: this means its
1244 : // runtime should be comparable to a full round of image layer creations, rather than
1245 : // being potentially much longer.
1246 0 : let rewrite_max = partition_count;
1247 :
1248 0 : let outcome = self
1249 0 : .compact_shard_ancestors(
1250 0 : rewrite_max,
1251 0 : options.flags.contains(CompactFlags::YieldForL0),
1252 0 : ctx,
1253 0 : )
1254 0 : .await?;
1255 0 : match outcome {
1256 0 : CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
1257 0 : CompactionOutcome::Done | CompactionOutcome::Skipped => {}
1258 : }
1259 728 : }
1260 :
1261 728 : Ok(CompactionOutcome::Done)
1262 728 : }
1263 :
1264 : /// Check for layers that are elegible to be rewritten:
1265 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
1266 : /// we don't indefinitely retain keys in this shard that aren't needed.
1267 : /// - For future use: layers beyond pitr_interval that are in formats we would
1268 : /// rather not maintain compatibility with indefinitely.
1269 : ///
1270 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
1271 : /// how much work it will try to do in each compaction pass.
1272 0 : async fn compact_shard_ancestors(
1273 0 : self: &Arc<Self>,
1274 0 : rewrite_max: usize,
1275 0 : yield_for_l0: bool,
1276 0 : ctx: &RequestContext,
1277 0 : ) -> Result<CompactionOutcome, CompactionError> {
1278 0 : let mut outcome = CompactionOutcome::Done;
1279 0 : let mut drop_layers = Vec::new();
1280 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
1281 0 :
1282 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
1283 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
1284 0 : // pitr_interval, for example because a branchpoint references it.
1285 0 : //
1286 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
1287 0 : // are rewriting layers.
1288 0 : let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
1289 0 : let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
1290 :
1291 0 : let layers = self.layers.read().await;
1292 0 : let layers_iter = layers.layer_map()?.iter_historic_layers();
1293 0 : let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
1294 0 : for layer_desc in layers_iter {
1295 0 : layers_checked += 1;
1296 0 : let layer = layers.get_from_desc(&layer_desc);
1297 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
1298 : // This layer does not belong to a historic ancestor, no need to re-image it.
1299 0 : continue;
1300 0 : }
1301 0 :
1302 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
1303 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
1304 0 : let layer_local_page_count = sharded_range.page_count();
1305 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
1306 0 : if layer_local_page_count == 0 {
1307 : // This ancestral layer only covers keys that belong to other shards.
1308 : // We include the full metadata in the log: if we had some critical bug that caused
1309 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
1310 0 : debug!(%layer, old_metadata=?layer.metadata(),
1311 0 : "dropping layer after shard split, contains no keys for this shard",
1312 : );
1313 :
1314 0 : if cfg!(debug_assertions) {
1315 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
1316 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
1317 : // should be !is_key_disposable()
1318 : // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop.
1319 0 : let range = layer_desc.get_key_range();
1320 0 : let mut key = range.start;
1321 0 : while key < range.end {
1322 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
1323 0 : key = key.next();
1324 : }
1325 0 : }
1326 :
1327 0 : drop_layers.push(layer);
1328 0 : continue;
1329 0 : } else if layer_local_page_count != u32::MAX
1330 0 : && layer_local_page_count == layer_raw_page_count
1331 : {
1332 0 : debug!(%layer,
1333 0 : "layer is entirely shard local ({} keys), no need to filter it",
1334 : layer_local_page_count
1335 : );
1336 0 : continue;
1337 0 : }
1338 0 :
1339 0 : // Only rewrite a layer if we can reclaim significant space.
1340 0 : if layer_local_page_count != u32::MAX
1341 0 : && layer_local_page_count as f64 / layer_raw_page_count as f64
1342 0 : <= ANCESTOR_COMPACTION_REWRITE_THRESHOLD
1343 : {
1344 0 : debug!(%layer,
1345 0 : "layer has a large share of local pages \
1346 0 : ({layer_local_page_count}/{layer_raw_page_count} > \
1347 0 : {ANCESTOR_COMPACTION_REWRITE_THRESHOLD}), not rewriting",
1348 : );
1349 0 : }
1350 :
1351 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
1352 : // without incurring the I/O cost of a rewrite.
1353 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
1354 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
1355 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
1356 0 : continue;
1357 0 : }
1358 0 :
1359 0 : // We do not yet implement rewrite of delta layers.
1360 0 : if layer_desc.is_delta() {
1361 0 : debug!(%layer, "Skipping rewrite of delta layer");
1362 0 : continue;
1363 0 : }
1364 0 :
1365 0 : // We don't bother rewriting layers that aren't visible, since these won't be needed by
1366 0 : // reads and will likely be garbage collected soon.
1367 0 : if layer.visibility() != LayerVisibilityHint::Visible {
1368 0 : debug!(%layer, "Skipping rewrite of invisible layer");
1369 0 : continue;
1370 0 : }
1371 0 :
1372 0 : // Only rewrite layers if their generations differ. This guarantees:
1373 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
1374 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
1375 0 : if layer.metadata().generation == self.generation {
1376 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
1377 0 : continue;
1378 0 : }
1379 0 :
1380 0 : if layers_to_rewrite.len() >= rewrite_max {
1381 0 : debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
1382 0 : layers_to_rewrite.len()
1383 : );
1384 0 : outcome = CompactionOutcome::Pending;
1385 0 : break;
1386 0 : }
1387 0 :
1388 0 : // Fall through: all our conditions for doing a rewrite passed.
1389 0 : layers_to_rewrite.push(layer);
1390 : }
1391 :
1392 : // Drop read lock on layer map before we start doing time-consuming I/O.
1393 0 : drop(layers);
1394 0 :
1395 0 : // Drop out early if there's nothing to do.
1396 0 : if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
1397 0 : return Ok(CompactionOutcome::Done);
1398 0 : }
1399 0 :
1400 0 : info!(
1401 0 : "starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
1402 0 : checked {layers_checked}/{layers_total} layers \
1403 0 : (latest_gc_cutoff={} pitr_cutoff={})",
1404 0 : layers_to_rewrite.len(),
1405 0 : drop_layers.len(),
1406 0 : *latest_gc_cutoff,
1407 : pitr_cutoff,
1408 : );
1409 0 : let started = Instant::now();
1410 0 :
1411 0 : let mut replace_image_layers = Vec::new();
1412 :
1413 0 : for layer in layers_to_rewrite {
1414 0 : if self.cancel.is_cancelled() {
1415 0 : return Err(CompactionError::ShuttingDown);
1416 0 : }
1417 0 :
1418 0 : info!(layer=%layer, "rewriting layer after shard split");
1419 0 : let mut image_layer_writer = ImageLayerWriter::new(
1420 0 : self.conf,
1421 0 : self.timeline_id,
1422 0 : self.tenant_shard_id,
1423 0 : &layer.layer_desc().key_range,
1424 0 : layer.layer_desc().image_layer_lsn(),
1425 0 : &self.gate,
1426 0 : self.cancel.clone(),
1427 0 : ctx,
1428 0 : )
1429 0 : .await
1430 0 : .map_err(CompactionError::Other)?;
1431 :
1432 : // Safety of layer rewrites:
1433 : // - We are writing to a different local file path than we are reading from, so the old Layer
1434 : // cannot interfere with the new one.
1435 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
1436 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
1437 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
1438 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
1439 : // - Any readers that have a reference to the old layer will keep it alive until they are done
1440 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
1441 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
1442 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
1443 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
1444 : // - ingestion, which only inserts layers, therefore cannot collide with us.
1445 0 : let resident = layer.download_and_keep_resident(ctx).await?;
1446 :
1447 0 : let keys_written = resident
1448 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
1449 0 : .await?;
1450 :
1451 0 : if keys_written > 0 {
1452 0 : let (desc, path) = image_layer_writer
1453 0 : .finish(ctx)
1454 0 : .await
1455 0 : .map_err(CompactionError::Other)?;
1456 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
1457 0 : .map_err(CompactionError::Other)?;
1458 0 : info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
1459 0 : layer.metadata().file_size,
1460 0 : new_layer.metadata().file_size);
1461 :
1462 0 : replace_image_layers.push((layer, new_layer));
1463 0 : } else {
1464 0 : // Drop the old layer. Usually for this case we would already have noticed that
1465 0 : // the layer has no data for us with the ShardedRange check above, but
1466 0 : drop_layers.push(layer);
1467 0 : }
1468 :
1469 : // Yield for L0 compaction if necessary, but make sure we update the layer map below
1470 : // with the work we've already done.
1471 0 : if yield_for_l0
1472 0 : && self
1473 0 : .l0_compaction_trigger
1474 0 : .notified()
1475 0 : .now_or_never()
1476 0 : .is_some()
1477 : {
1478 0 : info!("shard ancestor compaction yielding for L0 compaction");
1479 0 : outcome = CompactionOutcome::YieldForL0;
1480 0 : break;
1481 0 : }
1482 : }
1483 :
1484 0 : for layer in &drop_layers {
1485 0 : info!(%layer, old_metadata=?layer.metadata(),
1486 0 : "dropping layer after shard split (no keys for this shard)",
1487 : );
1488 : }
1489 :
1490 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
1491 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
1492 : // to remote index) and be removed. This is inefficient but safe.
1493 0 : fail::fail_point!("compact-shard-ancestors-localonly");
1494 0 :
1495 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
1496 0 : self.rewrite_layers(replace_image_layers, drop_layers)
1497 0 : .await?;
1498 :
1499 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
1500 0 :
1501 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
1502 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
1503 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
1504 0 : // load.
1505 0 : if outcome != CompactionOutcome::YieldForL0 {
1506 0 : info!("shard ancestor compaction waiting for uploads");
1507 0 : tokio::select! {
1508 0 : result = self.remote_client.wait_completion() => match result {
1509 0 : Ok(()) => {},
1510 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
1511 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
1512 0 : return Err(CompactionError::ShuttingDown);
1513 : }
1514 : },
1515 : // Don't wait if there's L0 compaction to do. We don't need to update the outcome
1516 : // here, because we've already done the actual work.
1517 0 : _ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
1518 : }
1519 0 : }
1520 :
1521 0 : info!(
1522 0 : "shard ancestor compaction done in {:.3}s{}",
1523 0 : started.elapsed().as_secs_f64(),
1524 0 : match outcome {
1525 : CompactionOutcome::Pending =>
1526 0 : format!(", with pending work (rewrite_max={rewrite_max})"),
1527 0 : CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
1528 0 : CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
1529 : }
1530 : );
1531 :
1532 0 : fail::fail_point!("compact-shard-ancestors-persistent");
1533 0 :
1534 0 : Ok(outcome)
1535 0 : }
1536 :
1537 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
1538 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
1539 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
1540 : ///
1541 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
1542 : /// that we know won't be needed for reads.
1543 480 : pub(crate) async fn update_layer_visibility(
1544 480 : &self,
1545 480 : ) -> Result<(), super::layer_manager::Shutdown> {
1546 480 : let head_lsn = self.get_last_record_lsn();
1547 :
1548 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
1549 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
1550 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
1551 : // they will be subject to L0->L1 compaction in the near future.
1552 480 : let layer_manager = self.layers.read().await;
1553 480 : let layer_map = layer_manager.layer_map()?;
1554 :
1555 480 : let readable_points = {
1556 480 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
1557 480 :
1558 480 : let mut readable_points = Vec::with_capacity(children.len() + 1);
1559 480 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
1560 0 : if *is_offloaded == MaybeOffloaded::Yes {
1561 0 : continue;
1562 0 : }
1563 0 : readable_points.push(*child_lsn);
1564 : }
1565 480 : readable_points.push(head_lsn);
1566 480 : readable_points
1567 480 : };
1568 480 :
1569 480 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
1570 1208 : for (layer_desc, visibility) in layer_visibility {
1571 728 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
1572 728 : let layer = layer_manager.get_from_desc(&layer_desc);
1573 728 : layer.set_visibility(visibility);
1574 728 : }
1575 :
1576 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
1577 : // avoid assuming that everything at a branch point is visible.
1578 480 : drop(covered);
1579 480 : Ok(())
1580 480 : }
1581 :
1582 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1583 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1584 728 : async fn compact_level0(
1585 728 : self: &Arc<Self>,
1586 728 : target_file_size: u64,
1587 728 : force_compaction_ignore_threshold: bool,
1588 728 : ctx: &RequestContext,
1589 728 : ) -> Result<CompactionOutcome, CompactionError> {
1590 : let CompactLevel0Phase1Result {
1591 728 : new_layers,
1592 728 : deltas_to_compact,
1593 728 : outcome,
1594 : } = {
1595 728 : let phase1_span = info_span!("compact_level0_phase1");
1596 728 : let ctx = ctx.attached_child();
1597 728 : let mut stats = CompactLevel0Phase1StatsBuilder {
1598 728 : version: Some(2),
1599 728 : tenant_id: Some(self.tenant_shard_id),
1600 728 : timeline_id: Some(self.timeline_id),
1601 728 : ..Default::default()
1602 728 : };
1603 728 :
1604 728 : let begin = tokio::time::Instant::now();
1605 728 : let phase1_layers_locked = self.layers.read().await;
1606 728 : let now = tokio::time::Instant::now();
1607 728 : stats.read_lock_acquisition_micros =
1608 728 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1609 728 : self.compact_level0_phase1(
1610 728 : phase1_layers_locked,
1611 728 : stats,
1612 728 : target_file_size,
1613 728 : force_compaction_ignore_threshold,
1614 728 : &ctx,
1615 728 : )
1616 728 : .instrument(phase1_span)
1617 728 : .await?
1618 : };
1619 :
1620 728 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1621 : // nothing to do
1622 672 : return Ok(CompactionOutcome::Done);
1623 56 : }
1624 56 :
1625 56 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1626 56 : .await?;
1627 56 : Ok(outcome)
1628 728 : }
1629 :
1630 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1631 728 : async fn compact_level0_phase1<'a>(
1632 728 : self: &'a Arc<Self>,
1633 728 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1634 728 : mut stats: CompactLevel0Phase1StatsBuilder,
1635 728 : target_file_size: u64,
1636 728 : force_compaction_ignore_threshold: bool,
1637 728 : ctx: &RequestContext,
1638 728 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1639 728 : stats.read_lock_held_spawn_blocking_startup_micros =
1640 728 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1641 728 : let layers = guard.layer_map()?;
1642 728 : let level0_deltas = layers.level0_deltas();
1643 728 : stats.level0_deltas_count = Some(level0_deltas.len());
1644 728 :
1645 728 : // Only compact if enough layers have accumulated.
1646 728 : let threshold = self.get_compaction_threshold();
1647 728 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1648 672 : if force_compaction_ignore_threshold {
1649 0 : if !level0_deltas.is_empty() {
1650 0 : info!(
1651 0 : level0_deltas = level0_deltas.len(),
1652 0 : threshold, "too few deltas to compact, but forcing compaction"
1653 : );
1654 : } else {
1655 0 : info!(
1656 0 : level0_deltas = level0_deltas.len(),
1657 0 : threshold, "too few deltas to compact, cannot force compaction"
1658 : );
1659 0 : return Ok(CompactLevel0Phase1Result::default());
1660 : }
1661 : } else {
1662 672 : debug!(
1663 0 : level0_deltas = level0_deltas.len(),
1664 0 : threshold, "too few deltas to compact"
1665 : );
1666 672 : return Ok(CompactLevel0Phase1Result::default());
1667 : }
1668 56 : }
1669 :
1670 56 : let mut level0_deltas = level0_deltas
1671 56 : .iter()
1672 804 : .map(|x| guard.get_from_desc(x))
1673 56 : .collect::<Vec<_>>();
1674 56 :
1675 56 : // Gather the files to compact in this iteration.
1676 56 : //
1677 56 : // Start with the oldest Level 0 delta file, and collect any other
1678 56 : // level 0 files that form a contiguous sequence, such that the end
1679 56 : // LSN of previous file matches the start LSN of the next file.
1680 56 : //
1681 56 : // Note that if the files don't form such a sequence, we might
1682 56 : // "compact" just a single file. That's a bit pointless, but it allows
1683 56 : // us to get rid of the level 0 file, and compact the other files on
1684 56 : // the next iteration. This could probably made smarter, but such
1685 56 : // "gaps" in the sequence of level 0 files should only happen in case
1686 56 : // of a crash, partial download from cloud storage, or something like
1687 56 : // that, so it's not a big deal in practice.
1688 1496 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1689 56 : let mut level0_deltas_iter = level0_deltas.iter();
1690 56 :
1691 56 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1692 56 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1693 56 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1694 56 :
1695 56 : // Accumulate the size of layers in `deltas_to_compact`
1696 56 : let mut deltas_to_compact_bytes = 0;
1697 56 :
1698 56 : // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
1699 56 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1700 56 : // work in this function to only operate on this much delta data at once.
1701 56 : //
1702 56 : // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
1703 56 : // the constraint is not respected, we use the larger of the two.
1704 56 : let delta_size_limit = std::cmp::max(
1705 56 : self.get_compaction_upper_limit(),
1706 56 : self.get_compaction_threshold(),
1707 56 : ) as u64
1708 56 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1709 56 :
1710 56 : let mut fully_compacted = true;
1711 56 :
1712 56 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
1713 804 : for l in level0_deltas_iter {
1714 748 : let lsn_range = &l.layer_desc().lsn_range;
1715 748 :
1716 748 : if lsn_range.start != prev_lsn_end {
1717 0 : break;
1718 748 : }
1719 748 : deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
1720 748 : deltas_to_compact_bytes += l.metadata().file_size;
1721 748 : prev_lsn_end = lsn_range.end;
1722 748 :
1723 748 : if deltas_to_compact_bytes >= delta_size_limit {
1724 0 : info!(
1725 0 : l0_deltas_selected = deltas_to_compact.len(),
1726 0 : l0_deltas_total = level0_deltas.len(),
1727 0 : "L0 compaction picker hit max delta layer size limit: {}",
1728 : delta_size_limit
1729 : );
1730 0 : fully_compacted = false;
1731 0 :
1732 0 : // Proceed with compaction, but only a subset of L0s
1733 0 : break;
1734 748 : }
1735 : }
1736 56 : let lsn_range = Range {
1737 56 : start: deltas_to_compact
1738 56 : .first()
1739 56 : .unwrap()
1740 56 : .layer_desc()
1741 56 : .lsn_range
1742 56 : .start,
1743 56 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1744 56 : };
1745 56 :
1746 56 : info!(
1747 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1748 0 : lsn_range.start,
1749 0 : lsn_range.end,
1750 0 : deltas_to_compact.len(),
1751 0 : level0_deltas.len()
1752 : );
1753 :
1754 804 : for l in deltas_to_compact.iter() {
1755 804 : info!("compact includes {l}");
1756 : }
1757 :
1758 : // We don't need the original list of layers anymore. Drop it so that
1759 : // we don't accidentally use it later in the function.
1760 56 : drop(level0_deltas);
1761 56 :
1762 56 : stats.read_lock_held_prerequisites_micros = stats
1763 56 : .read_lock_held_spawn_blocking_startup_micros
1764 56 : .till_now();
1765 :
1766 : // TODO: replace with streaming k-merge
1767 56 : let all_keys = {
1768 56 : let mut all_keys = Vec::new();
1769 804 : for l in deltas_to_compact.iter() {
1770 804 : if self.cancel.is_cancelled() {
1771 0 : return Err(CompactionError::ShuttingDown);
1772 804 : }
1773 804 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1774 804 : let keys = delta
1775 804 : .index_entries(ctx)
1776 804 : .await
1777 804 : .map_err(CompactionError::Other)?;
1778 804 : all_keys.extend(keys);
1779 : }
1780 : // The current stdlib sorting implementation is designed in a way where it is
1781 : // particularly fast where the slice is made up of sorted sub-ranges.
1782 8847610 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1783 56 : all_keys
1784 56 : };
1785 56 :
1786 56 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1787 :
1788 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1789 : //
1790 : // A hole is a key range for which this compaction doesn't have any WAL records.
1791 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1792 : // cover the hole, but actually don't contain any WAL records for that key range.
1793 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1794 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1795 : //
1796 : // The algorithm chooses holes as follows.
1797 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1798 : // - Filter: min threshold on range length
1799 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1800 : //
1801 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1802 : #[derive(PartialEq, Eq)]
1803 : struct Hole {
1804 : key_range: Range<Key>,
1805 : coverage_size: usize,
1806 : }
1807 56 : let holes: Vec<Hole> = {
1808 : use std::cmp::Ordering;
1809 : impl Ord for Hole {
1810 0 : fn cmp(&self, other: &Self) -> Ordering {
1811 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1812 0 : }
1813 : }
1814 : impl PartialOrd for Hole {
1815 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1816 0 : Some(self.cmp(other))
1817 0 : }
1818 : }
1819 56 : let max_holes = deltas_to_compact.len();
1820 56 : let last_record_lsn = self.get_last_record_lsn();
1821 56 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1822 56 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1823 56 : // min-heap (reserve space for one more element added before eviction)
1824 56 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1825 56 : let mut prev: Option<Key> = None;
1826 :
1827 4128076 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1828 4128076 : if let Some(prev_key) = prev {
1829 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1830 : // compaction is the gap between data key and metadata keys.
1831 4128020 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1832 0 : && !Key::is_metadata_key(&prev_key)
1833 : {
1834 0 : let key_range = prev_key..next_key;
1835 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1836 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1837 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1838 0 : // That is why it is better to measure size of hole as number of covering image layers.
1839 0 : let coverage_size =
1840 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1841 0 : if coverage_size >= min_hole_coverage_size {
1842 0 : heap.push(Hole {
1843 0 : key_range,
1844 0 : coverage_size,
1845 0 : });
1846 0 : if heap.len() > max_holes {
1847 0 : heap.pop(); // remove smallest hole
1848 0 : }
1849 0 : }
1850 4128020 : }
1851 56 : }
1852 4128076 : prev = Some(next_key.next());
1853 : }
1854 56 : let mut holes = heap.into_vec();
1855 56 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1856 56 : holes
1857 56 : };
1858 56 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1859 56 : drop_rlock(guard);
1860 56 :
1861 56 : if self.cancel.is_cancelled() {
1862 0 : return Err(CompactionError::ShuttingDown);
1863 56 : }
1864 56 :
1865 56 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1866 :
1867 : // This iterator walks through all key-value pairs from all the layers
1868 : // we're compacting, in key, LSN order.
1869 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1870 : // then the Value::Image is ordered before Value::WalRecord.
1871 56 : let mut all_values_iter = {
1872 56 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1873 804 : for l in deltas_to_compact.iter() {
1874 804 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1875 804 : deltas.push(l);
1876 : }
1877 56 : MergeIterator::create(&deltas, &[], ctx)
1878 56 : };
1879 56 :
1880 56 : // This iterator walks through all keys and is needed to calculate size used by each key
1881 56 : let mut all_keys_iter = all_keys
1882 56 : .iter()
1883 4128076 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1884 4128020 : .coalesce(|mut prev, cur| {
1885 4128020 : // Coalesce keys that belong to the same key pair.
1886 4128020 : // This ensures that compaction doesn't put them
1887 4128020 : // into different layer files.
1888 4128020 : // Still limit this by the target file size,
1889 4128020 : // so that we keep the size of the files in
1890 4128020 : // check.
1891 4128020 : if prev.0 == cur.0 && prev.2 < target_file_size {
1892 80076 : prev.2 += cur.2;
1893 80076 : Ok(prev)
1894 : } else {
1895 4047944 : Err((prev, cur))
1896 : }
1897 4128020 : });
1898 56 :
1899 56 : // Merge the contents of all the input delta layers into a new set
1900 56 : // of delta layers, based on the current partitioning.
1901 56 : //
1902 56 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1903 56 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1904 56 : // would be too large. In that case, we also split on the LSN dimension.
1905 56 : //
1906 56 : // LSN
1907 56 : // ^
1908 56 : // |
1909 56 : // | +-----------+ +--+--+--+--+
1910 56 : // | | | | | | | |
1911 56 : // | +-----------+ | | | | |
1912 56 : // | | | | | | | |
1913 56 : // | +-----------+ ==> | | | | |
1914 56 : // | | | | | | | |
1915 56 : // | +-----------+ | | | | |
1916 56 : // | | | | | | | |
1917 56 : // | +-----------+ +--+--+--+--+
1918 56 : // |
1919 56 : // +--------------> key
1920 56 : //
1921 56 : //
1922 56 : // If one key (X) has a lot of page versions:
1923 56 : //
1924 56 : // LSN
1925 56 : // ^
1926 56 : // | (X)
1927 56 : // | +-----------+ +--+--+--+--+
1928 56 : // | | | | | | | |
1929 56 : // | +-----------+ | | +--+ |
1930 56 : // | | | | | | | |
1931 56 : // | +-----------+ ==> | | | | |
1932 56 : // | | | | | +--+ |
1933 56 : // | +-----------+ | | | | |
1934 56 : // | | | | | | | |
1935 56 : // | +-----------+ +--+--+--+--+
1936 56 : // |
1937 56 : // +--------------> key
1938 56 : // TODO: this actually divides the layers into fixed-size chunks, not
1939 56 : // based on the partitioning.
1940 56 : //
1941 56 : // TODO: we should also opportunistically materialize and
1942 56 : // garbage collect what we can.
1943 56 : let mut new_layers = Vec::new();
1944 56 : let mut prev_key: Option<Key> = None;
1945 56 : let mut writer: Option<DeltaLayerWriter> = None;
1946 56 : let mut key_values_total_size = 0u64;
1947 56 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1948 56 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1949 56 : let mut next_hole = 0; // index of next hole in holes vector
1950 56 :
1951 56 : let mut keys = 0;
1952 :
1953 4128132 : while let Some((key, lsn, value)) = all_values_iter
1954 4128132 : .next()
1955 4128132 : .await
1956 4128132 : .map_err(CompactionError::Other)?
1957 : {
1958 4128076 : keys += 1;
1959 4128076 :
1960 4128076 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1961 : // avoid hitting the cancellation token on every key. in benches, we end up
1962 : // shuffling an order of million keys per layer, this means we'll check it
1963 : // around tens of times per layer.
1964 0 : return Err(CompactionError::ShuttingDown);
1965 4128076 : }
1966 4128076 :
1967 4128076 : let same_key = prev_key == Some(key);
1968 4128076 : // We need to check key boundaries once we reach next key or end of layer with the same key
1969 4128076 : if !same_key || lsn == dup_end_lsn {
1970 4048000 : let mut next_key_size = 0u64;
1971 4048000 : let is_dup_layer = dup_end_lsn.is_valid();
1972 4048000 : dup_start_lsn = Lsn::INVALID;
1973 4048000 : if !same_key {
1974 4048000 : dup_end_lsn = Lsn::INVALID;
1975 4048000 : }
1976 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1977 4048000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1978 4048000 : next_key_size = next_size;
1979 4048000 : if key != next_key {
1980 4047944 : if dup_end_lsn.is_valid() {
1981 0 : // We are writting segment with duplicates:
1982 0 : // place all remaining values of this key in separate segment
1983 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1984 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1985 4047944 : }
1986 4047944 : break;
1987 56 : }
1988 56 : key_values_total_size += next_size;
1989 56 : // Check if it is time to split segment: if total keys size is larger than target file size.
1990 56 : // We need to avoid generation of empty segments if next_size > target_file_size.
1991 56 : if key_values_total_size > target_file_size && lsn != next_lsn {
1992 : // Split key between multiple layers: such layer can contain only single key
1993 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1994 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1995 : } else {
1996 0 : lsn // start with the first LSN for this key
1997 : };
1998 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1999 0 : break;
2000 56 : }
2001 : }
2002 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
2003 4048000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
2004 0 : dup_start_lsn = dup_end_lsn;
2005 0 : dup_end_lsn = lsn_range.end;
2006 4048000 : }
2007 4048000 : if writer.is_some() {
2008 4047944 : let written_size = writer.as_mut().unwrap().size();
2009 4047944 : let contains_hole =
2010 4047944 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
2011 : // check if key cause layer overflow or contains hole...
2012 4047944 : if is_dup_layer
2013 4047944 : || dup_end_lsn.is_valid()
2014 4047944 : || written_size + key_values_total_size > target_file_size
2015 4047384 : || contains_hole
2016 : {
2017 : // ... if so, flush previous layer and prepare to write new one
2018 560 : let (desc, path) = writer
2019 560 : .take()
2020 560 : .unwrap()
2021 560 : .finish(prev_key.unwrap().next(), ctx)
2022 560 : .await
2023 560 : .map_err(CompactionError::Other)?;
2024 560 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2025 560 : .map_err(CompactionError::Other)?;
2026 :
2027 560 : new_layers.push(new_delta);
2028 560 : writer = None;
2029 560 :
2030 560 : if contains_hole {
2031 0 : // skip hole
2032 0 : next_hole += 1;
2033 560 : }
2034 4047384 : }
2035 56 : }
2036 : // Remember size of key value because at next iteration we will access next item
2037 4048000 : key_values_total_size = next_key_size;
2038 80076 : }
2039 4128076 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
2040 0 : Err(CompactionError::Other(anyhow::anyhow!(
2041 0 : "failpoint delta-layer-writer-fail-before-finish"
2042 0 : )))
2043 4128076 : });
2044 :
2045 4128076 : if !self.shard_identity.is_key_disposable(&key) {
2046 4128076 : if writer.is_none() {
2047 616 : if self.cancel.is_cancelled() {
2048 : // to be somewhat responsive to cancellation, check for each new layer
2049 0 : return Err(CompactionError::ShuttingDown);
2050 616 : }
2051 : // Create writer if not initiaized yet
2052 616 : writer = Some(
2053 : DeltaLayerWriter::new(
2054 616 : self.conf,
2055 616 : self.timeline_id,
2056 616 : self.tenant_shard_id,
2057 616 : key,
2058 616 : if dup_end_lsn.is_valid() {
2059 : // this is a layer containing slice of values of the same key
2060 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
2061 0 : dup_start_lsn..dup_end_lsn
2062 : } else {
2063 616 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
2064 616 : lsn_range.clone()
2065 : },
2066 616 : &self.gate,
2067 616 : self.cancel.clone(),
2068 616 : ctx,
2069 616 : )
2070 616 : .await
2071 616 : .map_err(CompactionError::Other)?,
2072 : );
2073 :
2074 616 : keys = 0;
2075 4127460 : }
2076 :
2077 4128076 : writer
2078 4128076 : .as_mut()
2079 4128076 : .unwrap()
2080 4128076 : .put_value(key, lsn, value, ctx)
2081 4128076 : .await
2082 4128076 : .map_err(CompactionError::Other)?;
2083 : } else {
2084 0 : let owner = self.shard_identity.get_shard_number(&key);
2085 0 :
2086 0 : // This happens after a shard split, when we're compacting an L0 created by our parent shard
2087 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
2088 : }
2089 :
2090 4128076 : if !new_layers.is_empty() {
2091 39572 : fail_point!("after-timeline-compacted-first-L1");
2092 4088504 : }
2093 :
2094 4128076 : prev_key = Some(key);
2095 : }
2096 56 : if let Some(writer) = writer {
2097 56 : let (desc, path) = writer
2098 56 : .finish(prev_key.unwrap().next(), ctx)
2099 56 : .await
2100 56 : .map_err(CompactionError::Other)?;
2101 56 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
2102 56 : .map_err(CompactionError::Other)?;
2103 56 : new_layers.push(new_delta);
2104 0 : }
2105 :
2106 : // Sync layers
2107 56 : if !new_layers.is_empty() {
2108 : // Print a warning if the created layer is larger than double the target size
2109 : // Add two pages for potential overhead. This should in theory be already
2110 : // accounted for in the target calculation, but for very small targets,
2111 : // we still might easily hit the limit otherwise.
2112 56 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
2113 616 : for layer in new_layers.iter() {
2114 616 : if layer.layer_desc().file_size > warn_limit {
2115 0 : warn!(
2116 : %layer,
2117 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
2118 : );
2119 616 : }
2120 : }
2121 :
2122 : // The writer.finish() above already did the fsync of the inodes.
2123 : // We just need to fsync the directory in which these inodes are linked,
2124 : // which we know to be the timeline directory.
2125 : //
2126 : // We use fatal_err() below because the after writer.finish() returns with success,
2127 : // the in-memory state of the filesystem already has the layer file in its final place,
2128 : // and subsequent pageserver code could think it's durable while it really isn't.
2129 56 : let timeline_dir = VirtualFile::open(
2130 56 : &self
2131 56 : .conf
2132 56 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
2133 56 : ctx,
2134 56 : )
2135 56 : .await
2136 56 : .fatal_err("VirtualFile::open for timeline dir fsync");
2137 56 : timeline_dir
2138 56 : .sync_all()
2139 56 : .await
2140 56 : .fatal_err("VirtualFile::sync_all timeline dir");
2141 0 : }
2142 :
2143 56 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
2144 56 : stats.new_deltas_count = Some(new_layers.len());
2145 616 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
2146 56 :
2147 56 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
2148 56 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
2149 : {
2150 56 : Ok(stats_json) => {
2151 56 : info!(
2152 0 : stats_json = stats_json.as_str(),
2153 0 : "compact_level0_phase1 stats available"
2154 : )
2155 : }
2156 0 : Err(e) => {
2157 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
2158 : }
2159 : }
2160 :
2161 : // Without this, rustc complains about deltas_to_compact still
2162 : // being borrowed when we `.into_iter()` below.
2163 56 : drop(all_values_iter);
2164 56 :
2165 56 : Ok(CompactLevel0Phase1Result {
2166 56 : new_layers,
2167 56 : deltas_to_compact: deltas_to_compact
2168 56 : .into_iter()
2169 804 : .map(|x| x.drop_eviction_guard())
2170 56 : .collect::<Vec<_>>(),
2171 56 : outcome: if fully_compacted {
2172 56 : CompactionOutcome::Done
2173 : } else {
2174 0 : CompactionOutcome::Pending
2175 : },
2176 : })
2177 728 : }
2178 : }
2179 :
2180 : #[derive(Default)]
2181 : struct CompactLevel0Phase1Result {
2182 : new_layers: Vec<ResidentLayer>,
2183 : deltas_to_compact: Vec<Layer>,
2184 : // Whether we have included all L0 layers, or selected only part of them due to the
2185 : // L0 compaction size limit.
2186 : outcome: CompactionOutcome,
2187 : }
2188 :
2189 : #[derive(Default)]
2190 : struct CompactLevel0Phase1StatsBuilder {
2191 : version: Option<u64>,
2192 : tenant_id: Option<TenantShardId>,
2193 : timeline_id: Option<TimelineId>,
2194 : read_lock_acquisition_micros: DurationRecorder,
2195 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
2196 : read_lock_held_key_sort_micros: DurationRecorder,
2197 : read_lock_held_prerequisites_micros: DurationRecorder,
2198 : read_lock_held_compute_holes_micros: DurationRecorder,
2199 : read_lock_drop_micros: DurationRecorder,
2200 : write_layer_files_micros: DurationRecorder,
2201 : level0_deltas_count: Option<usize>,
2202 : new_deltas_count: Option<usize>,
2203 : new_deltas_size: Option<u64>,
2204 : }
2205 :
2206 : #[derive(serde::Serialize)]
2207 : struct CompactLevel0Phase1Stats {
2208 : version: u64,
2209 : tenant_id: TenantShardId,
2210 : timeline_id: TimelineId,
2211 : read_lock_acquisition_micros: RecordedDuration,
2212 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
2213 : read_lock_held_key_sort_micros: RecordedDuration,
2214 : read_lock_held_prerequisites_micros: RecordedDuration,
2215 : read_lock_held_compute_holes_micros: RecordedDuration,
2216 : read_lock_drop_micros: RecordedDuration,
2217 : write_layer_files_micros: RecordedDuration,
2218 : level0_deltas_count: usize,
2219 : new_deltas_count: usize,
2220 : new_deltas_size: u64,
2221 : }
2222 :
2223 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
2224 : type Error = anyhow::Error;
2225 :
2226 56 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
2227 56 : Ok(Self {
2228 56 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
2229 56 : tenant_id: value
2230 56 : .tenant_id
2231 56 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
2232 56 : timeline_id: value
2233 56 : .timeline_id
2234 56 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
2235 56 : read_lock_acquisition_micros: value
2236 56 : .read_lock_acquisition_micros
2237 56 : .into_recorded()
2238 56 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
2239 56 : read_lock_held_spawn_blocking_startup_micros: value
2240 56 : .read_lock_held_spawn_blocking_startup_micros
2241 56 : .into_recorded()
2242 56 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
2243 56 : read_lock_held_key_sort_micros: value
2244 56 : .read_lock_held_key_sort_micros
2245 56 : .into_recorded()
2246 56 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
2247 56 : read_lock_held_prerequisites_micros: value
2248 56 : .read_lock_held_prerequisites_micros
2249 56 : .into_recorded()
2250 56 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
2251 56 : read_lock_held_compute_holes_micros: value
2252 56 : .read_lock_held_compute_holes_micros
2253 56 : .into_recorded()
2254 56 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
2255 56 : read_lock_drop_micros: value
2256 56 : .read_lock_drop_micros
2257 56 : .into_recorded()
2258 56 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
2259 56 : write_layer_files_micros: value
2260 56 : .write_layer_files_micros
2261 56 : .into_recorded()
2262 56 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
2263 56 : level0_deltas_count: value
2264 56 : .level0_deltas_count
2265 56 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
2266 56 : new_deltas_count: value
2267 56 : .new_deltas_count
2268 56 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
2269 56 : new_deltas_size: value
2270 56 : .new_deltas_size
2271 56 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
2272 : })
2273 56 : }
2274 : }
2275 :
2276 : impl Timeline {
2277 : /// Entry point for new tiered compaction algorithm.
2278 : ///
2279 : /// All the real work is in the implementation in the pageserver_compaction
2280 : /// crate. The code here would apply to any algorithm implemented by the
2281 : /// same interface, but tiered is the only one at the moment.
2282 : ///
2283 : /// TODO: cancellation
2284 0 : pub(crate) async fn compact_tiered(
2285 0 : self: &Arc<Self>,
2286 0 : _cancel: &CancellationToken,
2287 0 : ctx: &RequestContext,
2288 0 : ) -> Result<(), CompactionError> {
2289 0 : let fanout = self.get_compaction_threshold() as u64;
2290 0 : let target_file_size = self.get_checkpoint_distance();
2291 :
2292 : // Find the top of the historical layers
2293 0 : let end_lsn = {
2294 0 : let guard = self.layers.read().await;
2295 0 : let layers = guard.layer_map()?;
2296 :
2297 0 : let l0_deltas = layers.level0_deltas();
2298 0 :
2299 0 : // As an optimization, if we find that there are too few L0 layers,
2300 0 : // bail out early. We know that the compaction algorithm would do
2301 0 : // nothing in that case.
2302 0 : if l0_deltas.len() < fanout as usize {
2303 : // doesn't need compacting
2304 0 : return Ok(());
2305 0 : }
2306 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
2307 0 : };
2308 0 :
2309 0 : // Is the timeline being deleted?
2310 0 : if self.is_stopping() {
2311 0 : trace!("Dropping out of compaction on timeline shutdown");
2312 0 : return Err(CompactionError::ShuttingDown);
2313 0 : }
2314 :
2315 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
2316 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
2317 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
2318 0 :
2319 0 : pageserver_compaction::compact_tiered::compact_tiered(
2320 0 : &mut adaptor,
2321 0 : end_lsn,
2322 0 : target_file_size,
2323 0 : fanout,
2324 0 : ctx,
2325 0 : )
2326 0 : .await
2327 : // TODO: compact_tiered needs to return CompactionError
2328 0 : .map_err(CompactionError::Other)?;
2329 :
2330 0 : adaptor.flush_updates().await?;
2331 0 : Ok(())
2332 0 : }
2333 :
2334 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
2335 : ///
2336 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
2337 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
2338 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
2339 : ///
2340 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
2341 : ///
2342 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
2343 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
2344 : ///
2345 : /// The function will produce:
2346 : ///
2347 : /// ```plain
2348 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
2349 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
2350 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
2351 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
2352 : /// ```
2353 : ///
2354 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
2355 : #[allow(clippy::too_many_arguments)]
2356 1296 : pub(crate) async fn generate_key_retention(
2357 1296 : self: &Arc<Timeline>,
2358 1296 : key: Key,
2359 1296 : full_history: &[(Key, Lsn, Value)],
2360 1296 : horizon: Lsn,
2361 1296 : retain_lsn_below_horizon: &[Lsn],
2362 1296 : delta_threshold_cnt: usize,
2363 1296 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
2364 1296 : verification: bool,
2365 1296 : ) -> anyhow::Result<KeyHistoryRetention> {
2366 : // Pre-checks for the invariants
2367 :
2368 1296 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2369 :
2370 1296 : if debug_mode {
2371 3144 : for (log_key, _, _) in full_history {
2372 1848 : assert_eq!(log_key, &key, "mismatched key");
2373 : }
2374 1296 : for i in 1..full_history.len() {
2375 552 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
2376 552 : if full_history[i - 1].1 == full_history[i].1 {
2377 0 : assert!(
2378 0 : matches!(full_history[i - 1].2, Value::Image(_)),
2379 0 : "unordered delta/image, or duplicated delta"
2380 : );
2381 552 : }
2382 : }
2383 : // There was an assertion for no base image that checks if the first
2384 : // record in the history is `will_init` before, but it was removed.
2385 : // This is explained in the test cases for generate_key_retention.
2386 : // Search "incomplete history" for more information.
2387 2856 : for lsn in retain_lsn_below_horizon {
2388 1560 : assert!(lsn < &horizon, "retain lsn must be below horizon")
2389 : }
2390 1296 : for i in 1..retain_lsn_below_horizon.len() {
2391 712 : assert!(
2392 712 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
2393 0 : "unordered LSN"
2394 : );
2395 : }
2396 0 : }
2397 1296 : let has_ancestor = base_img_from_ancestor.is_some();
2398 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
2399 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
2400 1296 : let (mut split_history, lsn_split_points) = {
2401 1296 : let mut split_history = Vec::new();
2402 1296 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
2403 1296 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
2404 2856 : for lsn in retain_lsn_below_horizon {
2405 1560 : lsn_split_points.push(*lsn);
2406 1560 : }
2407 1296 : lsn_split_points.push(horizon);
2408 1296 : let mut current_idx = 0;
2409 3144 : for item @ (_, lsn, _) in full_history {
2410 2336 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
2411 488 : current_idx += 1;
2412 488 : }
2413 1848 : split_history[current_idx].push(item);
2414 : }
2415 1296 : (split_history, lsn_split_points)
2416 : };
2417 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
2418 5448 : for split_for_lsn in &mut split_history {
2419 4152 : let mut prev_lsn = None;
2420 4152 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
2421 4152 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
2422 1848 : if let Some(prev_lsn) = &prev_lsn {
2423 248 : if *prev_lsn == lsn {
2424 : // The case that we have an LSN with both data from the delta layer and the image layer. As
2425 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
2426 : // drop this delta and keep the image.
2427 : //
2428 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
2429 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
2430 : // dropped.
2431 : //
2432 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
2433 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
2434 0 : continue;
2435 248 : }
2436 1600 : }
2437 1848 : prev_lsn = Some(lsn);
2438 1848 : new_split_for_lsn.push(record);
2439 : }
2440 4152 : *split_for_lsn = new_split_for_lsn;
2441 : }
2442 : // Step 3: generate images when necessary
2443 1296 : let mut retention = Vec::with_capacity(split_history.len());
2444 1296 : let mut records_since_last_image = 0;
2445 1296 : let batch_cnt = split_history.len();
2446 1296 : assert!(
2447 1296 : batch_cnt >= 2,
2448 0 : "should have at least below + above horizon batches"
2449 : );
2450 1296 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
2451 1296 : if let Some((key, lsn, ref img)) = base_img_from_ancestor {
2452 84 : replay_history.push((key, lsn, Value::Image(img.clone())));
2453 1212 : }
2454 :
2455 : /// Generate debug information for the replay history
2456 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
2457 : use std::fmt::Write;
2458 0 : let mut output = String::new();
2459 0 : if let Some((key, _, _)) = replay_history.first() {
2460 0 : write!(output, "key={} ", key).unwrap();
2461 0 : let mut cnt = 0;
2462 0 : for (_, lsn, val) in replay_history {
2463 0 : if val.is_image() {
2464 0 : write!(output, "i@{} ", lsn).unwrap();
2465 0 : } else if val.will_init() {
2466 0 : write!(output, "di@{} ", lsn).unwrap();
2467 0 : } else {
2468 0 : write!(output, "d@{} ", lsn).unwrap();
2469 0 : }
2470 0 : cnt += 1;
2471 0 : if cnt >= 128 {
2472 0 : write!(output, "... and more").unwrap();
2473 0 : break;
2474 0 : }
2475 : }
2476 0 : } else {
2477 0 : write!(output, "<no history>").unwrap();
2478 0 : }
2479 0 : output
2480 0 : }
2481 :
2482 0 : fn generate_debug_trace(
2483 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
2484 0 : full_history: &[(Key, Lsn, Value)],
2485 0 : lsns: &[Lsn],
2486 0 : horizon: Lsn,
2487 0 : ) -> String {
2488 : use std::fmt::Write;
2489 0 : let mut output = String::new();
2490 0 : if let Some(replay_history) = replay_history {
2491 0 : writeln!(
2492 0 : output,
2493 0 : "replay_history: {}",
2494 0 : generate_history_trace(replay_history)
2495 0 : )
2496 0 : .unwrap();
2497 0 : } else {
2498 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
2499 0 : }
2500 0 : writeln!(
2501 0 : output,
2502 0 : "full_history: {}",
2503 0 : generate_history_trace(full_history)
2504 0 : )
2505 0 : .unwrap();
2506 0 : writeln!(
2507 0 : output,
2508 0 : "when processing: [{}] horizon={}",
2509 0 : lsns.iter().map(|l| format!("{l}")).join(","),
2510 0 : horizon
2511 0 : )
2512 0 : .unwrap();
2513 0 : output
2514 0 : }
2515 :
2516 1296 : let mut key_exists = false;
2517 4148 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
2518 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
2519 4148 : records_since_last_image += split_for_lsn.len();
2520 : // Whether to produce an image into the final layer files
2521 4148 : let produce_image = if i == 0 && !has_ancestor {
2522 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
2523 1212 : true
2524 2936 : } else if i == batch_cnt - 1 {
2525 : // Do not generate images for the last batch (above horizon)
2526 1292 : false
2527 1644 : } else if records_since_last_image == 0 {
2528 1288 : false
2529 356 : } else if records_since_last_image >= delta_threshold_cnt {
2530 : // Generate images when there are too many records
2531 12 : true
2532 : } else {
2533 344 : false
2534 : };
2535 4148 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
2536 : // Only retain the items after the last image record
2537 5108 : for idx in (0..replay_history.len()).rev() {
2538 5108 : if replay_history[idx].2.will_init() {
2539 4148 : replay_history = replay_history[idx..].to_vec();
2540 4148 : break;
2541 960 : }
2542 : }
2543 4148 : if replay_history.is_empty() && !key_exists {
2544 : // The key does not exist at earlier LSN, we can skip this iteration.
2545 0 : retention.push(Vec::new());
2546 0 : continue;
2547 4148 : } else {
2548 4148 : key_exists = true;
2549 4148 : }
2550 4148 : let Some((_, _, val)) = replay_history.first() else {
2551 0 : unreachable!("replay history should not be empty once it exists")
2552 : };
2553 4148 : if !val.will_init() {
2554 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
2555 0 : generate_debug_trace(
2556 0 : Some(&replay_history),
2557 0 : full_history,
2558 0 : retain_lsn_below_horizon,
2559 0 : horizon,
2560 0 : )
2561 0 : });
2562 4148 : }
2563 : // Whether to reconstruct the image. In debug mode, we will generate an image
2564 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
2565 : // image into the final layer.
2566 4148 : let img_and_lsn = if produce_image {
2567 1224 : records_since_last_image = 0;
2568 1224 : let replay_history_for_debug = if debug_mode {
2569 1224 : Some(replay_history.clone())
2570 : } else {
2571 0 : None
2572 : };
2573 1224 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
2574 1224 : let history = std::mem::take(&mut replay_history);
2575 1224 : let mut img = None;
2576 1224 : let mut records = Vec::with_capacity(history.len());
2577 1224 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
2578 1180 : img = Some((*lsn, val.clone()));
2579 1180 : for (_, lsn, val) in history.into_iter().skip(1) {
2580 80 : let Value::WalRecord(rec) = val else {
2581 0 : return Err(anyhow::anyhow!(
2582 0 : "invalid record, first record is image, expect walrecords"
2583 0 : ))
2584 0 : .with_context(|| {
2585 0 : generate_debug_trace(
2586 0 : replay_history_for_debug_ref,
2587 0 : full_history,
2588 0 : retain_lsn_below_horizon,
2589 0 : horizon,
2590 0 : )
2591 0 : });
2592 : };
2593 80 : records.push((lsn, rec));
2594 : }
2595 : } else {
2596 72 : for (_, lsn, val) in history.into_iter() {
2597 72 : let Value::WalRecord(rec) = val else {
2598 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2599 0 : .with_context(|| generate_debug_trace(
2600 0 : replay_history_for_debug_ref,
2601 0 : full_history,
2602 0 : retain_lsn_below_horizon,
2603 0 : horizon,
2604 0 : ));
2605 : };
2606 72 : records.push((lsn, rec));
2607 : }
2608 : }
2609 : // WAL redo requires records in the reverse LSN order
2610 1224 : records.reverse();
2611 1224 : let state = ValueReconstructState { img, records };
2612 : // last batch does not generate image so i is always in range, unless we force generate
2613 : // an image during testing
2614 1224 : let request_lsn = if i >= lsn_split_points.len() {
2615 0 : Lsn::MAX
2616 : } else {
2617 1224 : lsn_split_points[i]
2618 : };
2619 1224 : let img = self
2620 1224 : .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
2621 1224 : .await?;
2622 1220 : Some((request_lsn, img))
2623 : } else {
2624 2924 : None
2625 : };
2626 4144 : if produce_image {
2627 1220 : let (request_lsn, img) = img_and_lsn.unwrap();
2628 1220 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2629 1220 : retention.push(vec![(request_lsn, Value::Image(img))]);
2630 2924 : } else {
2631 2924 : let deltas = split_for_lsn
2632 2924 : .iter()
2633 2924 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2634 2924 : .collect_vec();
2635 2924 : retention.push(deltas);
2636 2924 : }
2637 : }
2638 1292 : let mut result = Vec::with_capacity(retention.len());
2639 1292 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2640 4144 : for (idx, logs) in retention.into_iter().enumerate() {
2641 4144 : if idx == lsn_split_points.len() {
2642 1292 : let retention = KeyHistoryRetention {
2643 1292 : below_horizon: result,
2644 1292 : above_horizon: KeyLogAtLsn(logs),
2645 1292 : };
2646 1292 : if verification {
2647 1292 : retention
2648 1292 : .verify(key, &base_img_from_ancestor, full_history, self)
2649 1292 : .await?;
2650 0 : }
2651 1292 : return Ok(retention);
2652 2852 : } else {
2653 2852 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2654 2852 : }
2655 : }
2656 0 : unreachable!("key retention is empty")
2657 1296 : }
2658 :
2659 : /// Check how much space is left on the disk
2660 108 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2661 108 : let tenants_dir = self.conf.tenants_path();
2662 :
2663 108 : let stat = Statvfs::get(&tenants_dir, None)
2664 108 : .context("statvfs failed, presumably directory got unlinked")?;
2665 :
2666 108 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2667 108 :
2668 108 : Ok(avail_bytes)
2669 108 : }
2670 :
2671 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2672 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2673 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2674 : /// compaction.
2675 108 : async fn check_compaction_space(
2676 108 : self: &Arc<Self>,
2677 108 : layer_selection: &[Layer],
2678 108 : ) -> Result<(), CompactionError> {
2679 108 : let available_space = self
2680 108 : .check_available_space()
2681 108 : .await
2682 108 : .map_err(CompactionError::Other)?;
2683 108 : let mut remote_layer_size = 0;
2684 108 : let mut all_layer_size = 0;
2685 424 : for layer in layer_selection {
2686 316 : let needs_download = layer
2687 316 : .needs_download()
2688 316 : .await
2689 316 : .context("failed to check if layer needs download")
2690 316 : .map_err(CompactionError::Other)?;
2691 316 : if needs_download.is_some() {
2692 0 : remote_layer_size += layer.layer_desc().file_size;
2693 316 : }
2694 316 : all_layer_size += layer.layer_desc().file_size;
2695 : }
2696 108 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2697 108 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2698 : {
2699 0 : return Err(CompactionError::Other(anyhow!(
2700 0 : "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2701 0 : available_space,
2702 0 : allocated_space,
2703 0 : all_layer_size,
2704 0 : remote_layer_size,
2705 0 : all_layer_size + remote_layer_size
2706 0 : )));
2707 108 : }
2708 108 : Ok(())
2709 108 : }
2710 :
2711 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2712 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2713 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2714 : /// here.
2715 112 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2716 112 : let gc_cutoff_lsn = {
2717 112 : let gc_info = self.gc_info.read().unwrap();
2718 112 : gc_info.min_cutoff()
2719 112 : };
2720 112 :
2721 112 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2722 112 : // let watermark = watermark.min(self.standby_horizon.load());
2723 112 :
2724 112 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2725 112 : // them when computing the watermark.
2726 112 : gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
2727 112 : }
2728 :
2729 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2730 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2731 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2732 : /// like a full compaction of the specified keyspace.
2733 0 : pub(crate) async fn gc_compaction_split_jobs(
2734 0 : self: &Arc<Self>,
2735 0 : job: GcCompactJob,
2736 0 : sub_compaction_max_job_size_mb: Option<u64>,
2737 0 : ) -> Result<Vec<GcCompactJob>, CompactionError> {
2738 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2739 0 : job.compact_lsn_range.end
2740 : } else {
2741 0 : self.get_gc_compaction_watermark()
2742 : };
2743 :
2744 0 : if compact_below_lsn == Lsn::INVALID {
2745 0 : tracing::warn!(
2746 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2747 : );
2748 0 : return Ok(vec![]);
2749 0 : }
2750 :
2751 : // Split compaction job to about 4GB each
2752 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2753 0 : let sub_compaction_max_job_size_mb =
2754 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2755 0 :
2756 0 : let mut compact_jobs = Vec::<GcCompactJob>::new();
2757 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2758 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2759 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2760 : // Truncate the key range to be within user specified compaction range.
2761 0 : fn truncate_to(
2762 0 : source_start: &Key,
2763 0 : source_end: &Key,
2764 0 : target_start: &Key,
2765 0 : target_end: &Key,
2766 0 : ) -> Option<(Key, Key)> {
2767 0 : let start = source_start.max(target_start);
2768 0 : let end = source_end.min(target_end);
2769 0 : if start < end {
2770 0 : Some((*start, *end))
2771 : } else {
2772 0 : None
2773 : }
2774 0 : }
2775 0 : let mut split_key_ranges = Vec::new();
2776 0 : let ranges = dense_ks
2777 0 : .parts
2778 0 : .iter()
2779 0 : .map(|partition| partition.ranges.iter())
2780 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2781 0 : .flatten()
2782 0 : .cloned()
2783 0 : .collect_vec();
2784 0 : for range in ranges.iter() {
2785 0 : let Some((start, end)) = truncate_to(
2786 0 : &range.start,
2787 0 : &range.end,
2788 0 : &job.compact_key_range.start,
2789 0 : &job.compact_key_range.end,
2790 0 : ) else {
2791 0 : continue;
2792 : };
2793 0 : split_key_ranges.push((start, end));
2794 : }
2795 0 : split_key_ranges.sort();
2796 0 : let all_layers = {
2797 0 : let guard = self.layers.read().await;
2798 0 : let layer_map = guard.layer_map()?;
2799 0 : layer_map.iter_historic_layers().collect_vec()
2800 0 : };
2801 0 : let mut current_start = None;
2802 0 : let ranges_num = split_key_ranges.len();
2803 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2804 0 : if current_start.is_none() {
2805 0 : current_start = Some(start);
2806 0 : }
2807 0 : let start = current_start.unwrap();
2808 0 : if start >= end {
2809 : // We have already processed this partition.
2810 0 : continue;
2811 0 : }
2812 0 : let overlapping_layers = {
2813 0 : let mut desc = Vec::new();
2814 0 : for layer in all_layers.iter() {
2815 0 : if overlaps_with(&layer.get_key_range(), &(start..end))
2816 0 : && layer.get_lsn_range().start <= compact_below_lsn
2817 0 : {
2818 0 : desc.push(layer.clone());
2819 0 : }
2820 : }
2821 0 : desc
2822 0 : };
2823 0 : let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
2824 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2825 : // Try to extend the compaction range so that we include at least one full layer file.
2826 0 : let extended_end = overlapping_layers
2827 0 : .iter()
2828 0 : .map(|layer| layer.key_range.end)
2829 0 : .min();
2830 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2831 : // In this case, we simply use the specified key range end.
2832 0 : let end = if let Some(extended_end) = extended_end {
2833 0 : extended_end.max(end)
2834 : } else {
2835 0 : end
2836 : };
2837 0 : let end = if ranges_num == idx + 1 {
2838 : // extend the compaction range to the end of the key range if it's the last partition
2839 0 : end.max(job.compact_key_range.end)
2840 : } else {
2841 0 : end
2842 : };
2843 0 : if total_size == 0 && !compact_jobs.is_empty() {
2844 0 : info!(
2845 0 : "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
2846 : start, end, total_size
2847 : );
2848 0 : compact_jobs.last_mut().unwrap().compact_key_range.end = end;
2849 0 : current_start = Some(end);
2850 : } else {
2851 0 : info!(
2852 0 : "splitting compaction job: {}..{}, estimated_size={}",
2853 : start, end, total_size
2854 : );
2855 0 : compact_jobs.push(GcCompactJob {
2856 0 : dry_run: job.dry_run,
2857 0 : compact_key_range: start..end,
2858 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
2859 0 : });
2860 0 : current_start = Some(end);
2861 : }
2862 0 : }
2863 : }
2864 0 : Ok(compact_jobs)
2865 0 : }
2866 :
2867 : /// An experimental compaction building block that combines compaction with garbage collection.
2868 : ///
2869 : /// The current implementation picks all delta + image layers that are below or intersecting with
2870 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
2871 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
2872 : /// and create delta layers with all deltas >= gc horizon.
2873 : ///
2874 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
2875 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
2876 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
2877 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
2878 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
2879 : /// part of the range.
2880 : ///
2881 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
2882 : /// the LSN. Otherwise, it will use the gc cutoff by default.
2883 112 : pub(crate) async fn compact_with_gc(
2884 112 : self: &Arc<Self>,
2885 112 : cancel: &CancellationToken,
2886 112 : options: CompactOptions,
2887 112 : ctx: &RequestContext,
2888 112 : ) -> Result<CompactionOutcome, CompactionError> {
2889 112 : let sub_compaction = options.sub_compaction;
2890 112 : let job = GcCompactJob::from_compact_options(options.clone());
2891 112 : let yield_for_l0 = options.flags.contains(CompactFlags::YieldForL0);
2892 112 : if sub_compaction {
2893 0 : info!(
2894 0 : "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
2895 : );
2896 0 : let jobs = self
2897 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
2898 0 : .await?;
2899 0 : let jobs_len = jobs.len();
2900 0 : for (idx, job) in jobs.into_iter().enumerate() {
2901 0 : info!(
2902 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
2903 0 : idx + 1,
2904 : jobs_len
2905 : );
2906 0 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
2907 0 : .await?;
2908 : }
2909 0 : if jobs_len == 0 {
2910 0 : info!("no jobs to run, skipping gc bottom-most compaction");
2911 0 : }
2912 0 : return Ok(CompactionOutcome::Done);
2913 112 : }
2914 112 : self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
2915 112 : .await
2916 112 : }
2917 :
2918 112 : async fn compact_with_gc_inner(
2919 112 : self: &Arc<Self>,
2920 112 : cancel: &CancellationToken,
2921 112 : job: GcCompactJob,
2922 112 : ctx: &RequestContext,
2923 112 : yield_for_l0: bool,
2924 112 : ) -> Result<CompactionOutcome, CompactionError> {
2925 112 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
2926 112 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
2927 112 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
2928 112 :
2929 112 : let timer = Instant::now();
2930 112 : let begin_timer = timer;
2931 112 :
2932 112 : let gc_lock = async {
2933 112 : tokio::select! {
2934 112 : guard = self.gc_lock.lock() => Ok(guard),
2935 112 : _ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
2936 : }
2937 112 : };
2938 :
2939 112 : let time_acquire_lock = timer.elapsed();
2940 112 : let timer = Instant::now();
2941 :
2942 112 : let gc_lock = crate::timed(
2943 112 : gc_lock,
2944 112 : "acquires gc lock",
2945 112 : std::time::Duration::from_secs(5),
2946 112 : )
2947 112 : .await?;
2948 :
2949 112 : let dry_run = job.dry_run;
2950 112 : let compact_key_range = job.compact_key_range;
2951 112 : let compact_lsn_range = job.compact_lsn_range;
2952 :
2953 112 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2954 :
2955 112 : info!(
2956 0 : "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
2957 : compact_key_range.start,
2958 : compact_key_range.end,
2959 : compact_lsn_range.start,
2960 : compact_lsn_range.end
2961 : );
2962 :
2963 112 : scopeguard::defer! {
2964 112 : info!("done enhanced gc bottom-most compaction");
2965 112 : };
2966 112 :
2967 112 : let mut stat = CompactionStatistics::default();
2968 :
2969 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
2970 : // The layer selection has the following properties:
2971 : // 1. If a layer is in the selection, all layers below it are in the selection.
2972 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
2973 108 : let job_desc = {
2974 112 : let guard = self.layers.read().await;
2975 112 : let layers = guard.layer_map()?;
2976 112 : let gc_info = self.gc_info.read().unwrap();
2977 112 : let mut retain_lsns_below_horizon = Vec::new();
2978 112 : let gc_cutoff = {
2979 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2980 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2981 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2982 : // to get the truth data.
2983 112 : let real_gc_cutoff = self.get_gc_compaction_watermark();
2984 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2985 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2986 : // the real cutoff.
2987 112 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2988 100 : if real_gc_cutoff == Lsn::INVALID {
2989 : // If the gc_cutoff is not generated yet, we should not compact anything.
2990 0 : tracing::warn!(
2991 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2992 : );
2993 0 : return Ok(CompactionOutcome::Skipped);
2994 100 : }
2995 100 : real_gc_cutoff
2996 : } else {
2997 12 : compact_lsn_range.end
2998 : };
2999 112 : if gc_cutoff > real_gc_cutoff {
3000 8 : warn!(
3001 0 : "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
3002 : gc_cutoff, real_gc_cutoff
3003 : );
3004 8 : gc_cutoff = real_gc_cutoff;
3005 104 : }
3006 112 : gc_cutoff
3007 : };
3008 140 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
3009 140 : if lsn < &gc_cutoff {
3010 140 : retain_lsns_below_horizon.push(*lsn);
3011 140 : }
3012 : }
3013 112 : for lsn in gc_info.leases.keys() {
3014 0 : if lsn < &gc_cutoff {
3015 0 : retain_lsns_below_horizon.push(*lsn);
3016 0 : }
3017 : }
3018 112 : let mut selected_layers: Vec<Layer> = Vec::new();
3019 112 : drop(gc_info);
3020 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
3021 112 : let Some(max_layer_lsn) = layers
3022 112 : .iter_historic_layers()
3023 500 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
3024 428 : .map(|desc| desc.get_lsn_range().end)
3025 112 : .max()
3026 : else {
3027 0 : info!(
3028 0 : "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
3029 : gc_cutoff
3030 : );
3031 0 : return Ok(CompactionOutcome::Done);
3032 : };
3033 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
3034 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
3035 : // it is a branch.
3036 112 : let Some(min_layer_lsn) = layers
3037 112 : .iter_historic_layers()
3038 500 : .filter(|desc| {
3039 500 : if compact_lsn_range.start == Lsn::INVALID {
3040 408 : true // select all layers below if start == Lsn(0)
3041 : } else {
3042 92 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
3043 : }
3044 500 : })
3045 464 : .map(|desc| desc.get_lsn_range().start)
3046 112 : .min()
3047 : else {
3048 0 : info!(
3049 0 : "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
3050 : compact_lsn_range.end
3051 : );
3052 0 : return Ok(CompactionOutcome::Done);
3053 : };
3054 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
3055 : // layers to compact.
3056 112 : let mut rewrite_layers = Vec::new();
3057 500 : for desc in layers.iter_historic_layers() {
3058 500 : if desc.get_lsn_range().end <= max_layer_lsn
3059 428 : && desc.get_lsn_range().start >= min_layer_lsn
3060 392 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
3061 : {
3062 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
3063 : // even if it might contain extra keys
3064 316 : selected_layers.push(guard.get_from_desc(&desc));
3065 316 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
3066 316 : // to overlap image layers)
3067 316 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
3068 4 : {
3069 4 : rewrite_layers.push(desc);
3070 312 : }
3071 184 : }
3072 : }
3073 112 : if selected_layers.is_empty() {
3074 4 : info!(
3075 0 : "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
3076 : gc_cutoff, compact_key_range.start, compact_key_range.end
3077 : );
3078 4 : return Ok(CompactionOutcome::Done);
3079 108 : }
3080 108 : retain_lsns_below_horizon.sort();
3081 108 : GcCompactionJobDescription {
3082 108 : selected_layers,
3083 108 : gc_cutoff,
3084 108 : retain_lsns_below_horizon,
3085 108 : min_layer_lsn,
3086 108 : max_layer_lsn,
3087 108 : compaction_key_range: compact_key_range,
3088 108 : rewrite_layers,
3089 108 : }
3090 : };
3091 108 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
3092 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
3093 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
3094 16 : (true, job_desc.min_layer_lsn)
3095 92 : } else if self.ancestor_timeline.is_some() {
3096 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
3097 : // LSN ranges all the way to the ancestor timeline.
3098 4 : (true, self.ancestor_lsn)
3099 : } else {
3100 88 : let res = job_desc
3101 88 : .retain_lsns_below_horizon
3102 88 : .first()
3103 88 : .copied()
3104 88 : .unwrap_or(job_desc.gc_cutoff);
3105 88 : if debug_mode {
3106 88 : assert_eq!(
3107 88 : res,
3108 88 : job_desc
3109 88 : .retain_lsns_below_horizon
3110 88 : .iter()
3111 88 : .min()
3112 88 : .copied()
3113 88 : .unwrap_or(job_desc.gc_cutoff)
3114 88 : );
3115 0 : }
3116 88 : (false, res)
3117 : };
3118 :
3119 108 : let verification = self.get_gc_compaction_settings().gc_compaction_verification;
3120 108 :
3121 108 : info!(
3122 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
3123 0 : job_desc.selected_layers.len(),
3124 0 : job_desc.rewrite_layers.len(),
3125 : job_desc.max_layer_lsn,
3126 : job_desc.min_layer_lsn,
3127 : job_desc.gc_cutoff,
3128 : lowest_retain_lsn,
3129 : job_desc.compaction_key_range.start,
3130 : job_desc.compaction_key_range.end,
3131 : has_data_below,
3132 : );
3133 :
3134 108 : let time_analyze = timer.elapsed();
3135 108 : let timer = Instant::now();
3136 :
3137 424 : for layer in &job_desc.selected_layers {
3138 316 : debug!("read layer: {}", layer.layer_desc().key());
3139 : }
3140 112 : for layer in &job_desc.rewrite_layers {
3141 4 : debug!("rewrite layer: {}", layer.key());
3142 : }
3143 :
3144 108 : self.check_compaction_space(&job_desc.selected_layers)
3145 108 : .await?;
3146 :
3147 : // Generate statistics for the compaction
3148 424 : for layer in &job_desc.selected_layers {
3149 316 : let desc = layer.layer_desc();
3150 316 : if desc.is_delta() {
3151 176 : stat.visit_delta_layer(desc.file_size());
3152 176 : } else {
3153 140 : stat.visit_image_layer(desc.file_size());
3154 140 : }
3155 : }
3156 :
3157 : // Step 1: construct a k-merge iterator over all layers.
3158 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
3159 108 : let layer_names = job_desc
3160 108 : .selected_layers
3161 108 : .iter()
3162 316 : .map(|layer| layer.layer_desc().layer_name())
3163 108 : .collect_vec();
3164 108 : if let Some(err) = check_valid_layermap(&layer_names) {
3165 0 : return Err(CompactionError::Other(anyhow!(
3166 0 : "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
3167 0 : err
3168 0 : )));
3169 108 : }
3170 108 : // The maximum LSN we are processing in this compaction loop
3171 108 : let end_lsn = job_desc
3172 108 : .selected_layers
3173 108 : .iter()
3174 316 : .map(|l| l.layer_desc().lsn_range.end)
3175 108 : .max()
3176 108 : .unwrap();
3177 108 : let mut delta_layers = Vec::new();
3178 108 : let mut image_layers = Vec::new();
3179 108 : let mut downloaded_layers = Vec::new();
3180 108 : let mut total_downloaded_size = 0;
3181 108 : let mut total_layer_size = 0;
3182 424 : for layer in &job_desc.selected_layers {
3183 316 : if layer
3184 316 : .needs_download()
3185 316 : .await
3186 316 : .context("failed to check if layer needs download")
3187 316 : .map_err(CompactionError::Other)?
3188 316 : .is_some()
3189 0 : {
3190 0 : total_downloaded_size += layer.layer_desc().file_size;
3191 316 : }
3192 316 : total_layer_size += layer.layer_desc().file_size;
3193 316 : if cancel.is_cancelled() {
3194 0 : return Err(CompactionError::ShuttingDown);
3195 316 : }
3196 316 : let should_yield = yield_for_l0
3197 0 : && self
3198 0 : .l0_compaction_trigger
3199 0 : .notified()
3200 0 : .now_or_never()
3201 0 : .is_some();
3202 316 : if should_yield {
3203 0 : tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers");
3204 0 : return Ok(CompactionOutcome::YieldForL0);
3205 316 : }
3206 316 : let resident_layer = layer
3207 316 : .download_and_keep_resident(ctx)
3208 316 : .await
3209 316 : .context("failed to download and keep resident layer")
3210 316 : .map_err(CompactionError::Other)?;
3211 316 : downloaded_layers.push(resident_layer);
3212 : }
3213 108 : info!(
3214 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
3215 0 : total_downloaded_size,
3216 0 : total_layer_size,
3217 0 : total_downloaded_size as f64 / total_layer_size as f64
3218 : );
3219 424 : for resident_layer in &downloaded_layers {
3220 316 : if resident_layer.layer_desc().is_delta() {
3221 176 : let layer = resident_layer
3222 176 : .get_as_delta(ctx)
3223 176 : .await
3224 176 : .context("failed to get delta layer")
3225 176 : .map_err(CompactionError::Other)?;
3226 176 : delta_layers.push(layer);
3227 : } else {
3228 140 : let layer = resident_layer
3229 140 : .get_as_image(ctx)
3230 140 : .await
3231 140 : .context("failed to get image layer")
3232 140 : .map_err(CompactionError::Other)?;
3233 140 : image_layers.push(layer);
3234 : }
3235 : }
3236 108 : let (dense_ks, sparse_ks) = self
3237 108 : .collect_gc_compaction_keyspace()
3238 108 : .await
3239 108 : .context("failed to collect gc compaction keyspace")
3240 108 : .map_err(CompactionError::Other)?;
3241 108 : let mut merge_iter = FilterIterator::create(
3242 108 : MergeIterator::create(&delta_layers, &image_layers, ctx),
3243 108 : dense_ks,
3244 108 : sparse_ks,
3245 108 : )
3246 108 : .context("failed to create filter iterator")
3247 108 : .map_err(CompactionError::Other)?;
3248 :
3249 108 : let time_download_layer = timer.elapsed();
3250 108 : let mut timer = Instant::now();
3251 108 :
3252 108 : // Step 2: Produce images+deltas.
3253 108 : let mut accumulated_values = Vec::new();
3254 108 : let mut last_key: Option<Key> = None;
3255 :
3256 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
3257 : // when some condition meet.
3258 108 : let mut image_layer_writer = if !has_data_below {
3259 : Some(
3260 88 : SplitImageLayerWriter::new(
3261 88 : self.conf,
3262 88 : self.timeline_id,
3263 88 : self.tenant_shard_id,
3264 88 : job_desc.compaction_key_range.start,
3265 88 : lowest_retain_lsn,
3266 88 : self.get_compaction_target_size(),
3267 88 : &self.gate,
3268 88 : self.cancel.clone(),
3269 88 : ctx,
3270 88 : )
3271 88 : .await
3272 88 : .context("failed to create image layer writer")
3273 88 : .map_err(CompactionError::Other)?,
3274 : )
3275 : } else {
3276 20 : None
3277 : };
3278 :
3279 108 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
3280 108 : self.conf,
3281 108 : self.timeline_id,
3282 108 : self.tenant_shard_id,
3283 108 : lowest_retain_lsn..end_lsn,
3284 108 : self.get_compaction_target_size(),
3285 108 : &self.gate,
3286 108 : self.cancel.clone(),
3287 108 : )
3288 108 : .await
3289 108 : .context("failed to create delta layer writer")
3290 108 : .map_err(CompactionError::Other)?;
3291 :
3292 : #[derive(Default)]
3293 : struct RewritingLayers {
3294 : before: Option<DeltaLayerWriter>,
3295 : after: Option<DeltaLayerWriter>,
3296 : }
3297 108 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
3298 :
3299 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
3300 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
3301 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
3302 : ///
3303 : /// This function unifies the cases so that later code doesn't have to think about it.
3304 : ///
3305 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
3306 : /// is needed for reconstruction. This should be fixed in the future.
3307 : ///
3308 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
3309 : /// images.
3310 1280 : async fn get_ancestor_image(
3311 1280 : this_tline: &Arc<Timeline>,
3312 1280 : key: Key,
3313 1280 : ctx: &RequestContext,
3314 1280 : has_data_below: bool,
3315 1280 : history_lsn_point: Lsn,
3316 1280 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
3317 1280 : if !has_data_below {
3318 1204 : return Ok(None);
3319 76 : };
3320 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
3321 : // as much existing code as possible.
3322 76 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
3323 76 : Ok(Some((key, history_lsn_point, img)))
3324 1280 : }
3325 :
3326 : // Actually, we can decide not to write to the image layer at all at this point because
3327 : // the key and LSN range are determined. However, to keep things simple here, we still
3328 : // create this writer, and discard the writer in the end.
3329 108 : let mut time_to_first_kv_pair = None;
3330 :
3331 1984 : while let Some(((key, lsn, val), desc)) = merge_iter
3332 1984 : .next_with_trace()
3333 1984 : .await
3334 1984 : .context("failed to get next key-value pair")
3335 1984 : .map_err(CompactionError::Other)?
3336 : {
3337 1880 : if time_to_first_kv_pair.is_none() {
3338 108 : time_to_first_kv_pair = Some(timer.elapsed());
3339 108 : timer = Instant::now();
3340 1772 : }
3341 :
3342 1880 : if cancel.is_cancelled() {
3343 0 : return Err(CompactionError::ShuttingDown);
3344 1880 : }
3345 :
3346 1880 : let should_yield = yield_for_l0
3347 0 : && self
3348 0 : .l0_compaction_trigger
3349 0 : .notified()
3350 0 : .now_or_never()
3351 0 : .is_some();
3352 1880 : if should_yield {
3353 0 : tracing::info!("preempt gc-compaction in the main loop: too many L0 layers");
3354 0 : return Ok(CompactionOutcome::YieldForL0);
3355 1880 : }
3356 1880 : if self.shard_identity.is_key_disposable(&key) {
3357 : // If this shard does not need to store this key, simply skip it.
3358 : //
3359 : // This is not handled in the filter iterator because shard is determined by hash.
3360 : // Therefore, it does not give us any performance benefit to do things like skip
3361 : // a whole layer file as handling key spaces (ranges).
3362 0 : if cfg!(debug_assertions) {
3363 0 : let shard = self.shard_identity.shard_index();
3364 0 : let owner = self.shard_identity.get_shard_number(&key);
3365 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
3366 0 : }
3367 0 : continue;
3368 1880 : }
3369 1880 : if !job_desc.compaction_key_range.contains(&key) {
3370 128 : if !desc.is_delta {
3371 120 : continue;
3372 8 : }
3373 8 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
3374 8 : let rewriter = if key < job_desc.compaction_key_range.start {
3375 0 : if rewriter.before.is_none() {
3376 0 : rewriter.before = Some(
3377 0 : DeltaLayerWriter::new(
3378 0 : self.conf,
3379 0 : self.timeline_id,
3380 0 : self.tenant_shard_id,
3381 0 : desc.key_range.start,
3382 0 : desc.lsn_range.clone(),
3383 0 : &self.gate,
3384 0 : self.cancel.clone(),
3385 0 : ctx,
3386 0 : )
3387 0 : .await
3388 0 : .context("failed to create delta layer writer")
3389 0 : .map_err(CompactionError::Other)?,
3390 : );
3391 0 : }
3392 0 : rewriter.before.as_mut().unwrap()
3393 8 : } else if key >= job_desc.compaction_key_range.end {
3394 8 : if rewriter.after.is_none() {
3395 4 : rewriter.after = Some(
3396 4 : DeltaLayerWriter::new(
3397 4 : self.conf,
3398 4 : self.timeline_id,
3399 4 : self.tenant_shard_id,
3400 4 : job_desc.compaction_key_range.end,
3401 4 : desc.lsn_range.clone(),
3402 4 : &self.gate,
3403 4 : self.cancel.clone(),
3404 4 : ctx,
3405 4 : )
3406 4 : .await
3407 4 : .context("failed to create delta layer writer")
3408 4 : .map_err(CompactionError::Other)?,
3409 : );
3410 4 : }
3411 8 : rewriter.after.as_mut().unwrap()
3412 : } else {
3413 0 : unreachable!()
3414 : };
3415 8 : rewriter
3416 8 : .put_value(key, lsn, val, ctx)
3417 8 : .await
3418 8 : .context("failed to put value")
3419 8 : .map_err(CompactionError::Other)?;
3420 8 : continue;
3421 1752 : }
3422 1752 : match val {
3423 1260 : Value::Image(_) => stat.visit_image_key(&val),
3424 492 : Value::WalRecord(_) => stat.visit_wal_key(&val),
3425 : }
3426 1752 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
3427 576 : if last_key.is_none() {
3428 108 : last_key = Some(key);
3429 468 : }
3430 576 : accumulated_values.push((key, lsn, val));
3431 : } else {
3432 1176 : let last_key: &mut Key = last_key.as_mut().unwrap();
3433 1176 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
3434 1176 : let retention = self
3435 1176 : .generate_key_retention(
3436 1176 : *last_key,
3437 1176 : &accumulated_values,
3438 1176 : job_desc.gc_cutoff,
3439 1176 : &job_desc.retain_lsns_below_horizon,
3440 1176 : COMPACTION_DELTA_THRESHOLD,
3441 1176 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
3442 1176 : .await
3443 1176 : .context("failed to get ancestor image")
3444 1176 : .map_err(CompactionError::Other)?,
3445 1176 : verification,
3446 1176 : )
3447 1176 : .await
3448 1176 : .context("failed to generate key retention")
3449 1176 : .map_err(CompactionError::Other)?;
3450 1172 : retention
3451 1172 : .pipe_to(
3452 1172 : *last_key,
3453 1172 : &mut delta_layer_writer,
3454 1172 : image_layer_writer.as_mut(),
3455 1172 : &mut stat,
3456 1172 : ctx,
3457 1172 : )
3458 1172 : .await
3459 1172 : .context("failed to pipe to delta layer writer")
3460 1172 : .map_err(CompactionError::Other)?;
3461 1172 : accumulated_values.clear();
3462 1172 : *last_key = key;
3463 1172 : accumulated_values.push((key, lsn, val));
3464 : }
3465 : }
3466 :
3467 : // TODO: move the below part to the loop body
3468 104 : let Some(last_key) = last_key else {
3469 0 : return Err(CompactionError::Other(anyhow!(
3470 0 : "no keys produced during compaction"
3471 0 : )));
3472 : };
3473 104 : stat.on_unique_key_visited();
3474 :
3475 104 : let retention = self
3476 104 : .generate_key_retention(
3477 104 : last_key,
3478 104 : &accumulated_values,
3479 104 : job_desc.gc_cutoff,
3480 104 : &job_desc.retain_lsns_below_horizon,
3481 104 : COMPACTION_DELTA_THRESHOLD,
3482 104 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn)
3483 104 : .await
3484 104 : .context("failed to get ancestor image")
3485 104 : .map_err(CompactionError::Other)?,
3486 104 : verification,
3487 104 : )
3488 104 : .await
3489 104 : .context("failed to generate key retention")
3490 104 : .map_err(CompactionError::Other)?;
3491 104 : retention
3492 104 : .pipe_to(
3493 104 : last_key,
3494 104 : &mut delta_layer_writer,
3495 104 : image_layer_writer.as_mut(),
3496 104 : &mut stat,
3497 104 : ctx,
3498 104 : )
3499 104 : .await
3500 104 : .context("failed to pipe to delta layer writer")
3501 104 : .map_err(CompactionError::Other)?;
3502 : // end: move the above part to the loop body
3503 :
3504 104 : let time_main_loop = timer.elapsed();
3505 104 : let timer = Instant::now();
3506 104 :
3507 104 : let mut rewrote_delta_layers = Vec::new();
3508 108 : for (key, writers) in delta_layer_rewriters {
3509 4 : if let Some(delta_writer_before) = writers.before {
3510 0 : let (desc, path) = delta_writer_before
3511 0 : .finish(job_desc.compaction_key_range.start, ctx)
3512 0 : .await
3513 0 : .context("failed to finish delta layer writer")
3514 0 : .map_err(CompactionError::Other)?;
3515 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3516 0 : .context("failed to finish creating delta layer")
3517 0 : .map_err(CompactionError::Other)?;
3518 0 : rewrote_delta_layers.push(layer);
3519 4 : }
3520 4 : if let Some(delta_writer_after) = writers.after {
3521 4 : let (desc, path) = delta_writer_after
3522 4 : .finish(key.key_range.end, ctx)
3523 4 : .await
3524 4 : .context("failed to finish delta layer writer")
3525 4 : .map_err(CompactionError::Other)?;
3526 4 : let layer = Layer::finish_creating(self.conf, self, desc, &path)
3527 4 : .context("failed to finish creating delta layer")
3528 4 : .map_err(CompactionError::Other)?;
3529 4 : rewrote_delta_layers.push(layer);
3530 0 : }
3531 : }
3532 :
3533 148 : let discard = |key: &PersistentLayerKey| {
3534 148 : let key = key.clone();
3535 148 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
3536 148 : };
3537 :
3538 104 : let produced_image_layers = if let Some(writer) = image_layer_writer {
3539 84 : if !dry_run {
3540 76 : let end_key = job_desc.compaction_key_range.end;
3541 76 : writer
3542 76 : .finish_with_discard_fn(self, ctx, end_key, discard)
3543 76 : .await
3544 76 : .context("failed to finish image layer writer")
3545 76 : .map_err(CompactionError::Other)?
3546 : } else {
3547 8 : drop(writer);
3548 8 : Vec::new()
3549 : }
3550 : } else {
3551 20 : Vec::new()
3552 : };
3553 :
3554 104 : let produced_delta_layers = if !dry_run {
3555 96 : delta_layer_writer
3556 96 : .finish_with_discard_fn(self, ctx, discard)
3557 96 : .await
3558 96 : .context("failed to finish delta layer writer")
3559 96 : .map_err(CompactionError::Other)?
3560 : } else {
3561 8 : drop(delta_layer_writer);
3562 8 : Vec::new()
3563 : };
3564 :
3565 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
3566 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
3567 104 : let mut compact_to = Vec::new();
3568 104 : let mut keep_layers = HashSet::new();
3569 104 : let produced_delta_layers_len = produced_delta_layers.len();
3570 104 : let produced_image_layers_len = produced_image_layers.len();
3571 104 :
3572 104 : let layer_selection_by_key = job_desc
3573 104 : .selected_layers
3574 104 : .iter()
3575 304 : .map(|l| (l.layer_desc().key(), l.layer_desc().clone()))
3576 104 : .collect::<HashMap<_, _>>();
3577 :
3578 176 : for action in produced_delta_layers {
3579 72 : match action {
3580 44 : BatchWriterResult::Produced(layer) => {
3581 44 : if cfg!(debug_assertions) {
3582 44 : info!("produced delta layer: {}", layer.layer_desc().key());
3583 0 : }
3584 44 : stat.produce_delta_layer(layer.layer_desc().file_size());
3585 44 : compact_to.push(layer);
3586 : }
3587 28 : BatchWriterResult::Discarded(l) => {
3588 28 : if cfg!(debug_assertions) {
3589 28 : info!("discarded delta layer: {}", l);
3590 0 : }
3591 28 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3592 28 : stat.discard_delta_layer(layer_desc.file_size());
3593 28 : } else {
3594 0 : tracing::warn!(
3595 0 : "discarded delta layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3596 : l
3597 : );
3598 0 : stat.discard_delta_layer(0);
3599 : }
3600 28 : keep_layers.insert(l);
3601 : }
3602 : }
3603 : }
3604 108 : for layer in &rewrote_delta_layers {
3605 4 : debug!(
3606 0 : "produced rewritten delta layer: {}",
3607 0 : layer.layer_desc().key()
3608 : );
3609 : // For now, we include rewritten delta layer size in the "produce_delta_layer". We could
3610 : // make it a separate statistics in the future.
3611 4 : stat.produce_delta_layer(layer.layer_desc().file_size());
3612 : }
3613 104 : compact_to.extend(rewrote_delta_layers);
3614 180 : for action in produced_image_layers {
3615 76 : match action {
3616 60 : BatchWriterResult::Produced(layer) => {
3617 60 : debug!("produced image layer: {}", layer.layer_desc().key());
3618 60 : stat.produce_image_layer(layer.layer_desc().file_size());
3619 60 : compact_to.push(layer);
3620 : }
3621 16 : BatchWriterResult::Discarded(l) => {
3622 16 : debug!("discarded image layer: {}", l);
3623 16 : if let Some(layer_desc) = layer_selection_by_key.get(&l) {
3624 16 : stat.discard_image_layer(layer_desc.file_size());
3625 16 : } else {
3626 0 : tracing::warn!(
3627 0 : "discarded image layer not in layer_selection: {}, produced a layer outside of the compaction key range?",
3628 : l
3629 : );
3630 0 : stat.discard_image_layer(0);
3631 : }
3632 16 : keep_layers.insert(l);
3633 : }
3634 : }
3635 : }
3636 :
3637 104 : let mut layer_selection = job_desc.selected_layers;
3638 :
3639 : // Partial compaction might select more data than it processes, e.g., if
3640 : // the compaction_key_range only partially overlaps:
3641 : //
3642 : // [---compaction_key_range---]
3643 : // [---A----][----B----][----C----][----D----]
3644 : //
3645 : // For delta layers, we will rewrite the layers so that it is cut exactly at
3646 : // the compaction key range, so we can always discard them. However, for image
3647 : // layers, as we do not rewrite them for now, we need to handle them differently.
3648 : // Assume image layers A, B, C, D are all in the `layer_selection`.
3649 : //
3650 : // The created image layers contain whatever is needed from B, C, and from
3651 : // `----]` of A, and from `[---` of D.
3652 : //
3653 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
3654 : // keep that data.
3655 : //
3656 : // The solution for now is to keep A and D completely if they are image layers.
3657 : // (layer_selection is what we'll remove from the layer map, so, retain what
3658 : // is _not_ fully covered by compaction_key_range).
3659 408 : for layer in &layer_selection {
3660 304 : if !layer.layer_desc().is_delta() {
3661 132 : if !overlaps_with(
3662 132 : &layer.layer_desc().key_range,
3663 132 : &job_desc.compaction_key_range,
3664 132 : ) {
3665 0 : return Err(CompactionError::Other(anyhow!(
3666 0 : "violated constraint: image layer outside of compaction key range"
3667 0 : )));
3668 132 : }
3669 132 : if !fully_contains(
3670 132 : &job_desc.compaction_key_range,
3671 132 : &layer.layer_desc().key_range,
3672 132 : ) {
3673 16 : keep_layers.insert(layer.layer_desc().key());
3674 116 : }
3675 172 : }
3676 : }
3677 :
3678 304 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
3679 104 :
3680 104 : let time_final_phase = timer.elapsed();
3681 104 :
3682 104 : stat.time_final_phase_secs = time_final_phase.as_secs_f64();
3683 104 : stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
3684 104 : .unwrap_or(Duration::ZERO)
3685 104 : .as_secs_f64();
3686 104 : stat.time_main_loop_secs = time_main_loop.as_secs_f64();
3687 104 : stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
3688 104 : stat.time_download_layer_secs = time_download_layer.as_secs_f64();
3689 104 : stat.time_analyze_secs = time_analyze.as_secs_f64();
3690 104 : stat.time_total_secs = begin_timer.elapsed().as_secs_f64();
3691 104 : stat.finalize();
3692 104 :
3693 104 : info!(
3694 0 : "gc-compaction statistics: {}",
3695 0 : serde_json::to_string(&stat)
3696 0 : .context("failed to serialize gc-compaction statistics")
3697 0 : .map_err(CompactionError::Other)?
3698 : );
3699 :
3700 104 : if dry_run {
3701 8 : return Ok(CompactionOutcome::Done);
3702 96 : }
3703 96 :
3704 96 : info!(
3705 0 : "produced {} delta layers and {} image layers, {} layers are kept",
3706 0 : produced_delta_layers_len,
3707 0 : produced_image_layers_len,
3708 0 : keep_layers.len()
3709 : );
3710 :
3711 : // Step 3: Place back to the layer map.
3712 :
3713 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
3714 96 : let all_layers = {
3715 96 : let guard = self.layers.read().await;
3716 96 : let layer_map = guard.layer_map()?;
3717 96 : layer_map.iter_historic_layers().collect_vec()
3718 96 : };
3719 96 :
3720 96 : let mut final_layers = all_layers
3721 96 : .iter()
3722 428 : .map(|layer| layer.layer_name())
3723 96 : .collect::<HashSet<_>>();
3724 304 : for layer in &layer_selection {
3725 208 : final_layers.remove(&layer.layer_desc().layer_name());
3726 208 : }
3727 204 : for layer in &compact_to {
3728 108 : final_layers.insert(layer.layer_desc().layer_name());
3729 108 : }
3730 96 : let final_layers = final_layers.into_iter().collect_vec();
3731 :
3732 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
3733 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
3734 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
3735 96 : if let Some(err) = check_valid_layermap(&final_layers) {
3736 0 : return Err(CompactionError::Other(anyhow!(
3737 0 : "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
3738 0 : err
3739 0 : )));
3740 96 : }
3741 :
3742 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
3743 : // operate on L1 layers.
3744 : {
3745 : // Gc-compaction will rewrite the history of a key. This could happen in two ways:
3746 : //
3747 : // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
3748 : // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
3749 : // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
3750 : // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
3751 : // now gets stored in I at the compact LSN.
3752 : //
3753 : // --------------- ---------------
3754 : // delta1@LSN20 image1@LSN20
3755 : // --------------- (read path collects delta@LSN20, => --------------- (read path cannot find anything
3756 : // delta1@LSN10 yields) below LSN 20)
3757 : // ---------------
3758 : //
3759 : // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
3760 : // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
3761 : // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
3762 : //
3763 : // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
3764 : // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
3765 : // layer containing 4, yields, and we update the layer map to put the delta layer.
3766 : //
3767 : // --------------- ---------------
3768 : // delta1@LSN4 image1@LSN4
3769 : // --------------- (read path collects delta@LSN4, => --------------- (read path collects LSN4 and LSN1,
3770 : // delta1@LSN1-3 yields) delta1@LSN1 which is an invalid history)
3771 : // --------------- ---------------
3772 : //
3773 : // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
3774 : // and only allow reads to continue after the update is finished.
3775 :
3776 96 : let update_guard = self.gc_compaction_layer_update_lock.write().await;
3777 : // Acquiring the update guard ensures current read operations end and new read operations are blocked.
3778 : // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
3779 96 : let mut guard = self.layers.write().await;
3780 96 : guard
3781 96 : .open_mut()?
3782 96 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
3783 96 : drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
3784 96 : };
3785 96 :
3786 96 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
3787 96 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
3788 96 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
3789 96 : // be batched into `schedule_compaction_update`.
3790 96 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
3791 96 : self.schedule_uploads(disk_consistent_lsn, None)
3792 96 : .context("failed to schedule uploads")
3793 96 : .map_err(CompactionError::Other)?;
3794 : // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
3795 : // of `compact_from`.
3796 96 : let compact_from = {
3797 96 : let mut compact_from = Vec::new();
3798 96 : let mut compact_to_set = HashMap::new();
3799 204 : for layer in &compact_to {
3800 108 : compact_to_set.insert(layer.layer_desc().key(), layer);
3801 108 : }
3802 304 : for layer in &layer_selection {
3803 208 : if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
3804 0 : tracing::info!(
3805 0 : "skipping delete {} because found same layer key at different generation {}",
3806 : layer,
3807 : to
3808 : );
3809 208 : } else {
3810 208 : compact_from.push(layer.clone());
3811 208 : }
3812 : }
3813 96 : compact_from
3814 96 : };
3815 96 : self.remote_client
3816 96 : .schedule_compaction_update(&compact_from, &compact_to)?;
3817 :
3818 96 : drop(gc_lock);
3819 96 :
3820 96 : Ok(CompactionOutcome::Done)
3821 112 : }
3822 : }
3823 :
3824 : struct TimelineAdaptor {
3825 : timeline: Arc<Timeline>,
3826 :
3827 : keyspace: (Lsn, KeySpace),
3828 :
3829 : new_deltas: Vec<ResidentLayer>,
3830 : new_images: Vec<ResidentLayer>,
3831 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
3832 : }
3833 :
3834 : impl TimelineAdaptor {
3835 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
3836 0 : Self {
3837 0 : timeline: timeline.clone(),
3838 0 : keyspace,
3839 0 : new_images: Vec::new(),
3840 0 : new_deltas: Vec::new(),
3841 0 : layers_to_delete: Vec::new(),
3842 0 : }
3843 0 : }
3844 :
3845 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
3846 0 : let layers_to_delete = {
3847 0 : let guard = self.timeline.layers.read().await;
3848 0 : self.layers_to_delete
3849 0 : .iter()
3850 0 : .map(|x| guard.get_from_desc(x))
3851 0 : .collect::<Vec<Layer>>()
3852 0 : };
3853 0 : self.timeline
3854 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
3855 0 : .await?;
3856 :
3857 0 : self.timeline
3858 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
3859 :
3860 0 : self.new_deltas.clear();
3861 0 : self.layers_to_delete.clear();
3862 0 : Ok(())
3863 0 : }
3864 : }
3865 :
3866 : #[derive(Clone)]
3867 : struct ResidentDeltaLayer(ResidentLayer);
3868 : #[derive(Clone)]
3869 : struct ResidentImageLayer(ResidentLayer);
3870 :
3871 : impl CompactionJobExecutor for TimelineAdaptor {
3872 : type Key = pageserver_api::key::Key;
3873 :
3874 : type Layer = OwnArc<PersistentLayerDesc>;
3875 : type DeltaLayer = ResidentDeltaLayer;
3876 : type ImageLayer = ResidentImageLayer;
3877 :
3878 : type RequestContext = crate::context::RequestContext;
3879 :
3880 0 : fn get_shard_identity(&self) -> &ShardIdentity {
3881 0 : self.timeline.get_shard_identity()
3882 0 : }
3883 :
3884 0 : async fn get_layers(
3885 0 : &mut self,
3886 0 : key_range: &Range<Key>,
3887 0 : lsn_range: &Range<Lsn>,
3888 0 : _ctx: &RequestContext,
3889 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
3890 0 : self.flush_updates().await?;
3891 :
3892 0 : let guard = self.timeline.layers.read().await;
3893 0 : let layer_map = guard.layer_map()?;
3894 :
3895 0 : let result = layer_map
3896 0 : .iter_historic_layers()
3897 0 : .filter(|l| {
3898 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
3899 0 : })
3900 0 : .map(OwnArc)
3901 0 : .collect();
3902 0 : Ok(result)
3903 0 : }
3904 :
3905 0 : async fn get_keyspace(
3906 0 : &mut self,
3907 0 : key_range: &Range<Key>,
3908 0 : lsn: Lsn,
3909 0 : _ctx: &RequestContext,
3910 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
3911 0 : if lsn == self.keyspace.0 {
3912 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
3913 0 : &self.keyspace.1.ranges,
3914 0 : key_range,
3915 0 : ))
3916 : } else {
3917 : // The current compaction implementation only ever requests the key space
3918 : // at the compaction end LSN.
3919 0 : anyhow::bail!("keyspace not available for requested lsn");
3920 : }
3921 0 : }
3922 :
3923 0 : async fn downcast_delta_layer(
3924 0 : &self,
3925 0 : layer: &OwnArc<PersistentLayerDesc>,
3926 0 : ctx: &RequestContext,
3927 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
3928 0 : // this is a lot more complex than a simple downcast...
3929 0 : if layer.is_delta() {
3930 0 : let l = {
3931 0 : let guard = self.timeline.layers.read().await;
3932 0 : guard.get_from_desc(layer)
3933 : };
3934 0 : let result = l.download_and_keep_resident(ctx).await?;
3935 :
3936 0 : Ok(Some(ResidentDeltaLayer(result)))
3937 : } else {
3938 0 : Ok(None)
3939 : }
3940 0 : }
3941 :
3942 0 : async fn create_image(
3943 0 : &mut self,
3944 0 : lsn: Lsn,
3945 0 : key_range: &Range<Key>,
3946 0 : ctx: &RequestContext,
3947 0 : ) -> anyhow::Result<()> {
3948 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
3949 0 : }
3950 :
3951 0 : async fn create_delta(
3952 0 : &mut self,
3953 0 : lsn_range: &Range<Lsn>,
3954 0 : key_range: &Range<Key>,
3955 0 : input_layers: &[ResidentDeltaLayer],
3956 0 : ctx: &RequestContext,
3957 0 : ) -> anyhow::Result<()> {
3958 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3959 :
3960 0 : let mut all_entries = Vec::new();
3961 0 : for dl in input_layers.iter() {
3962 0 : all_entries.extend(dl.load_keys(ctx).await?);
3963 : }
3964 :
3965 : // The current stdlib sorting implementation is designed in a way where it is
3966 : // particularly fast where the slice is made up of sorted sub-ranges.
3967 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3968 :
3969 0 : let mut writer = DeltaLayerWriter::new(
3970 0 : self.timeline.conf,
3971 0 : self.timeline.timeline_id,
3972 0 : self.timeline.tenant_shard_id,
3973 0 : key_range.start,
3974 0 : lsn_range.clone(),
3975 0 : &self.timeline.gate,
3976 0 : self.timeline.cancel.clone(),
3977 0 : ctx,
3978 0 : )
3979 0 : .await?;
3980 :
3981 0 : let mut dup_values = 0;
3982 0 :
3983 0 : // This iterator walks through all key-value pairs from all the layers
3984 0 : // we're compacting, in key, LSN order.
3985 0 : let mut prev: Option<(Key, Lsn)> = None;
3986 : for &DeltaEntry {
3987 0 : key, lsn, ref val, ..
3988 0 : } in all_entries.iter()
3989 : {
3990 0 : if prev == Some((key, lsn)) {
3991 : // This is a duplicate. Skip it.
3992 : //
3993 : // It can happen if compaction is interrupted after writing some
3994 : // layers but not all, and we are compacting the range again.
3995 : // The calculations in the algorithm assume that there are no
3996 : // duplicates, so the math on targeted file size is likely off,
3997 : // and we will create smaller files than expected.
3998 0 : dup_values += 1;
3999 0 : continue;
4000 0 : }
4001 :
4002 0 : let value = val.load(ctx).await?;
4003 :
4004 0 : writer.put_value(key, lsn, value, ctx).await?;
4005 :
4006 0 : prev = Some((key, lsn));
4007 : }
4008 :
4009 0 : if dup_values > 0 {
4010 0 : warn!("delta layer created with {} duplicate values", dup_values);
4011 0 : }
4012 :
4013 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
4014 0 : Err(anyhow::anyhow!(
4015 0 : "failpoint delta-layer-writer-fail-before-finish"
4016 0 : ))
4017 0 : });
4018 :
4019 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
4020 0 : let new_delta_layer =
4021 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4022 :
4023 0 : self.new_deltas.push(new_delta_layer);
4024 0 : Ok(())
4025 0 : }
4026 :
4027 0 : async fn delete_layer(
4028 0 : &mut self,
4029 0 : layer: &OwnArc<PersistentLayerDesc>,
4030 0 : _ctx: &RequestContext,
4031 0 : ) -> anyhow::Result<()> {
4032 0 : self.layers_to_delete.push(layer.clone().0);
4033 0 : Ok(())
4034 0 : }
4035 : }
4036 :
4037 : impl TimelineAdaptor {
4038 0 : async fn create_image_impl(
4039 0 : &mut self,
4040 0 : lsn: Lsn,
4041 0 : key_range: &Range<Key>,
4042 0 : ctx: &RequestContext,
4043 0 : ) -> Result<(), CreateImageLayersError> {
4044 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
4045 :
4046 0 : let image_layer_writer = ImageLayerWriter::new(
4047 0 : self.timeline.conf,
4048 0 : self.timeline.timeline_id,
4049 0 : self.timeline.tenant_shard_id,
4050 0 : key_range,
4051 0 : lsn,
4052 0 : &self.timeline.gate,
4053 0 : self.timeline.cancel.clone(),
4054 0 : ctx,
4055 0 : )
4056 0 : .await?;
4057 :
4058 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
4059 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
4060 0 : "failpoint image-layer-writer-fail-before-finish"
4061 0 : )))
4062 0 : });
4063 :
4064 0 : let keyspace = KeySpace {
4065 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
4066 : };
4067 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
4068 0 : let outcome = self
4069 0 : .timeline
4070 0 : .create_image_layer_for_rel_blocks(
4071 0 : &keyspace,
4072 0 : image_layer_writer,
4073 0 : lsn,
4074 0 : ctx,
4075 0 : key_range.clone(),
4076 0 : IoConcurrency::sequential(),
4077 0 : )
4078 0 : .await?;
4079 :
4080 : if let ImageLayerCreationOutcome::Generated {
4081 0 : unfinished_image_layer,
4082 0 : } = outcome
4083 : {
4084 0 : let (desc, path) = unfinished_image_layer.finish(ctx).await?;
4085 0 : let image_layer =
4086 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
4087 0 : self.new_images.push(image_layer);
4088 0 : }
4089 :
4090 0 : timer.stop_and_record();
4091 0 :
4092 0 : Ok(())
4093 0 : }
4094 : }
4095 :
4096 : impl CompactionRequestContext for crate::context::RequestContext {}
4097 :
4098 : #[derive(Debug, Clone)]
4099 : pub struct OwnArc<T>(pub Arc<T>);
4100 :
4101 : impl<T> Deref for OwnArc<T> {
4102 : type Target = <Arc<T> as Deref>::Target;
4103 0 : fn deref(&self) -> &Self::Target {
4104 0 : &self.0
4105 0 : }
4106 : }
4107 :
4108 : impl<T> AsRef<T> for OwnArc<T> {
4109 0 : fn as_ref(&self) -> &T {
4110 0 : self.0.as_ref()
4111 0 : }
4112 : }
4113 :
4114 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
4115 0 : fn key_range(&self) -> &Range<Key> {
4116 0 : &self.key_range
4117 0 : }
4118 0 : fn lsn_range(&self) -> &Range<Lsn> {
4119 0 : &self.lsn_range
4120 0 : }
4121 0 : fn file_size(&self) -> u64 {
4122 0 : self.file_size
4123 0 : }
4124 0 : fn short_id(&self) -> std::string::String {
4125 0 : self.as_ref().short_id().to_string()
4126 0 : }
4127 0 : fn is_delta(&self) -> bool {
4128 0 : self.as_ref().is_delta()
4129 0 : }
4130 : }
4131 :
4132 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
4133 0 : fn key_range(&self) -> &Range<Key> {
4134 0 : &self.layer_desc().key_range
4135 0 : }
4136 0 : fn lsn_range(&self) -> &Range<Lsn> {
4137 0 : &self.layer_desc().lsn_range
4138 0 : }
4139 0 : fn file_size(&self) -> u64 {
4140 0 : self.layer_desc().file_size
4141 0 : }
4142 0 : fn short_id(&self) -> std::string::String {
4143 0 : self.layer_desc().short_id().to_string()
4144 0 : }
4145 0 : fn is_delta(&self) -> bool {
4146 0 : true
4147 0 : }
4148 : }
4149 :
4150 : impl CompactionLayer<Key> for ResidentDeltaLayer {
4151 0 : fn key_range(&self) -> &Range<Key> {
4152 0 : &self.0.layer_desc().key_range
4153 0 : }
4154 0 : fn lsn_range(&self) -> &Range<Lsn> {
4155 0 : &self.0.layer_desc().lsn_range
4156 0 : }
4157 0 : fn file_size(&self) -> u64 {
4158 0 : self.0.layer_desc().file_size
4159 0 : }
4160 0 : fn short_id(&self) -> std::string::String {
4161 0 : self.0.layer_desc().short_id().to_string()
4162 0 : }
4163 0 : fn is_delta(&self) -> bool {
4164 0 : true
4165 0 : }
4166 : }
4167 :
4168 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
4169 : type DeltaEntry<'a> = DeltaEntry<'a>;
4170 :
4171 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
4172 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
4173 0 : }
4174 : }
4175 :
4176 : impl CompactionLayer<Key> for ResidentImageLayer {
4177 0 : fn key_range(&self) -> &Range<Key> {
4178 0 : &self.0.layer_desc().key_range
4179 0 : }
4180 0 : fn lsn_range(&self) -> &Range<Lsn> {
4181 0 : &self.0.layer_desc().lsn_range
4182 0 : }
4183 0 : fn file_size(&self) -> u64 {
4184 0 : self.0.layer_desc().file_size
4185 0 : }
4186 0 : fn short_id(&self) -> std::string::String {
4187 0 : self.0.layer_desc().short_id().to_string()
4188 0 : }
4189 0 : fn is_delta(&self) -> bool {
4190 0 : false
4191 0 : }
4192 : }
4193 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|