Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
14 : GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
15 : Timeline,
16 : };
17 :
18 : use anyhow::{Context, anyhow, bail};
19 : use bytes::Bytes;
20 : use enumset::EnumSet;
21 : use fail::fail_point;
22 : use itertools::Itertools;
23 : use once_cell::sync::Lazy;
24 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
25 : use pageserver_api::key::{KEY_SIZE, Key};
26 : use pageserver_api::keyspace::{KeySpace, ShardedRange};
27 : use pageserver_api::models::CompactInfoResponse;
28 : use pageserver_api::record::NeonWalRecord;
29 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
30 : use pageserver_api::value::Value;
31 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
32 : use pageserver_compaction::interface::*;
33 : use serde::Serialize;
34 : use tokio::sync::{OwnedSemaphorePermit, Semaphore};
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::{Instrument, debug, error, info, info_span, trace, warn};
37 : use utils::critical;
38 : use utils::id::TimelineId;
39 : use utils::lsn::Lsn;
40 :
41 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
42 : use crate::page_cache;
43 : use crate::statvfs::Statvfs;
44 : use crate::tenant::checks::check_valid_layermap;
45 : use crate::tenant::gc_block::GcBlock;
46 : use crate::tenant::layer_map::LayerMap;
47 : use crate::tenant::remote_timeline_client::WaitCompletionError;
48 : use crate::tenant::remote_timeline_client::index::GcCompactionState;
49 : use crate::tenant::storage_layer::batch_split_writer::{
50 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
51 : };
52 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
53 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
54 : use crate::tenant::storage_layer::{
55 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
56 : };
57 : use crate::tenant::timeline::{
58 : DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
59 : ResidentLayer, drop_rlock,
60 : };
61 : use crate::tenant::{DeltaLayer, MaybeOffloaded, gc_block};
62 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
63 :
64 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
65 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
66 :
67 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
68 : pub struct GcCompactionJobId(pub usize);
69 :
70 : impl std::fmt::Display for GcCompactionJobId {
71 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 0 : write!(f, "{}", self.0)
73 0 : }
74 : }
75 :
76 : pub struct GcCompactionCombinedSettings {
77 : pub gc_compaction_enabled: bool,
78 : pub gc_compaction_initial_threshold_kb: u64,
79 : pub gc_compaction_ratio_percent: u64,
80 : }
81 :
82 : #[derive(Debug, Clone)]
83 : pub enum GcCompactionQueueItem {
84 : MetaJob {
85 : /// Compaction options
86 : options: CompactOptions,
87 : /// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
88 : auto: bool,
89 : },
90 : SubCompactionJob(CompactOptions),
91 : Notify(GcCompactionJobId, Option<Lsn>),
92 : }
93 :
94 : impl GcCompactionQueueItem {
95 0 : pub fn into_compact_info_resp(
96 0 : self,
97 0 : id: GcCompactionJobId,
98 0 : running: bool,
99 0 : ) -> Option<CompactInfoResponse> {
100 0 : match self {
101 0 : GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
102 0 : compact_key_range: options.compact_key_range,
103 0 : compact_lsn_range: options.compact_lsn_range,
104 0 : sub_compaction: options.sub_compaction,
105 0 : running,
106 0 : job_id: id.0,
107 0 : }),
108 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
109 0 : compact_key_range: options.compact_key_range,
110 0 : compact_lsn_range: options.compact_lsn_range,
111 0 : sub_compaction: options.sub_compaction,
112 0 : running,
113 0 : job_id: id.0,
114 0 : }),
115 0 : GcCompactionQueueItem::Notify(_, _) => None,
116 : }
117 0 : }
118 : }
119 :
120 : #[derive(Default)]
121 : struct GcCompactionGuardItems {
122 : notify: Option<tokio::sync::oneshot::Sender<()>>,
123 : gc_guard: Option<gc_block::Guard>,
124 : permit: Option<OwnedSemaphorePermit>,
125 : }
126 :
127 : struct GcCompactionQueueInner {
128 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
129 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
130 : guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
131 : last_id: GcCompactionJobId,
132 : }
133 :
134 : impl GcCompactionQueueInner {
135 0 : fn next_id(&mut self) -> GcCompactionJobId {
136 0 : let id = self.last_id;
137 0 : self.last_id = GcCompactionJobId(id.0 + 1);
138 0 : id
139 0 : }
140 : }
141 :
142 : /// A structure to store gc_compaction jobs.
143 : pub struct GcCompactionQueue {
144 : /// All items in the queue, and the currently-running job.
145 : inner: std::sync::Mutex<GcCompactionQueueInner>,
146 : /// Ensure only one thread is consuming the queue.
147 : consumer_lock: tokio::sync::Mutex<()>,
148 : }
149 :
150 0 : static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
151 0 : // Only allow two timelines on one pageserver to run gc compaction at a time.
152 0 : Arc::new(Semaphore::new(2))
153 0 : });
154 :
155 : impl GcCompactionQueue {
156 0 : pub fn new() -> Self {
157 0 : GcCompactionQueue {
158 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
159 0 : running: None,
160 0 : queued: VecDeque::new(),
161 0 : guards: HashMap::new(),
162 0 : last_id: GcCompactionJobId(0),
163 0 : }),
164 0 : consumer_lock: tokio::sync::Mutex::new(()),
165 0 : }
166 0 : }
167 :
168 0 : pub fn cancel_scheduled(&self) {
169 0 : let mut guard = self.inner.lock().unwrap();
170 0 : guard.queued.clear();
171 0 : // TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
172 0 : // API is only used for testing purposes, so we can drop everything here.
173 0 : guard.guards.clear();
174 0 : }
175 :
176 : /// Schedule a manual compaction job.
177 0 : pub fn schedule_manual_compaction(
178 0 : &self,
179 0 : options: CompactOptions,
180 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
181 0 : ) -> GcCompactionJobId {
182 0 : let mut guard = self.inner.lock().unwrap();
183 0 : let id = guard.next_id();
184 0 : guard.queued.push_back((
185 0 : id,
186 0 : GcCompactionQueueItem::MetaJob {
187 0 : options,
188 0 : auto: false,
189 0 : },
190 0 : ));
191 0 : guard.guards.entry(id).or_default().notify = notify;
192 0 : info!("scheduled compaction job id={}", id);
193 0 : id
194 0 : }
195 :
196 : /// Schedule an auto compaction job.
197 0 : fn schedule_auto_compaction(
198 0 : &self,
199 0 : options: CompactOptions,
200 0 : permit: OwnedSemaphorePermit,
201 0 : ) -> GcCompactionJobId {
202 0 : let mut guard = self.inner.lock().unwrap();
203 0 : let id = guard.next_id();
204 0 : guard.queued.push_back((
205 0 : id,
206 0 : GcCompactionQueueItem::MetaJob {
207 0 : options,
208 0 : auto: true,
209 0 : },
210 0 : ));
211 0 : guard.guards.entry(id).or_default().permit = Some(permit);
212 0 : id
213 0 : }
214 :
215 : /// Trigger an auto compaction.
216 0 : pub async fn trigger_auto_compaction(
217 0 : &self,
218 0 : timeline: &Arc<Timeline>,
219 0 : ) -> Result<(), CompactionError> {
220 0 : let GcCompactionCombinedSettings {
221 0 : gc_compaction_enabled,
222 0 : gc_compaction_initial_threshold_kb,
223 0 : gc_compaction_ratio_percent,
224 0 : } = timeline.get_gc_compaction_settings();
225 0 : if !gc_compaction_enabled {
226 0 : return Ok(());
227 0 : }
228 0 : if self.remaining_jobs_num() > 0 {
229 : // Only schedule auto compaction when the queue is empty
230 0 : return Ok(());
231 0 : }
232 0 : if timeline.ancestor_timeline().is_some() {
233 : // Do not trigger auto compaction for child timelines. We haven't tested
234 : // it enough in staging yet.
235 0 : return Ok(());
236 0 : }
237 :
238 0 : let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
239 : // Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
240 : // the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
241 : // to ensure the fairness while avoid starving other tasks.
242 0 : return Ok(());
243 : };
244 :
245 0 : let gc_compaction_state = timeline.get_gc_compaction_state();
246 0 : let l2_lsn = gc_compaction_state
247 0 : .map(|x| x.last_completed_lsn)
248 0 : .unwrap_or(Lsn::INVALID);
249 :
250 0 : let layers = {
251 0 : let guard = timeline.layers.read().await;
252 0 : let layer_map = guard.layer_map()?;
253 0 : layer_map.iter_historic_layers().collect_vec()
254 0 : };
255 0 : let mut l2_size: u64 = 0;
256 0 : let mut l1_size = 0;
257 0 : let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
258 0 : for layer in layers {
259 0 : if layer.lsn_range.start <= l2_lsn {
260 0 : l2_size += layer.file_size();
261 0 : } else if layer.lsn_range.start <= gc_cutoff {
262 0 : l1_size += layer.file_size();
263 0 : }
264 : }
265 :
266 0 : fn trigger_compaction(
267 0 : l1_size: u64,
268 0 : l2_size: u64,
269 0 : gc_compaction_initial_threshold_kb: u64,
270 0 : gc_compaction_ratio_percent: u64,
271 0 : ) -> bool {
272 : const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
273 0 : if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
274 : // Do not auto-trigger when physical size >= 150GB
275 0 : return false;
276 0 : }
277 0 : // initial trigger
278 0 : if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
279 0 : info!(
280 0 : "trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
281 : l1_size, gc_compaction_initial_threshold_kb
282 : );
283 0 : return true;
284 0 : }
285 0 : // size ratio trigger
286 0 : if l2_size == 0 {
287 0 : return false;
288 0 : }
289 0 : if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
290 0 : info!(
291 0 : "trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
292 : l1_size, l2_size, gc_compaction_ratio_percent
293 : );
294 0 : return true;
295 0 : }
296 0 : false
297 0 : }
298 :
299 0 : if trigger_compaction(
300 0 : l1_size,
301 0 : l2_size,
302 0 : gc_compaction_initial_threshold_kb,
303 0 : gc_compaction_ratio_percent,
304 0 : ) {
305 0 : self.schedule_auto_compaction(
306 0 : CompactOptions {
307 0 : flags: {
308 0 : let mut flags = EnumSet::new();
309 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
310 0 : flags
311 0 : },
312 0 : sub_compaction: true,
313 0 : compact_key_range: None,
314 0 : compact_lsn_range: None,
315 0 : sub_compaction_max_job_size_mb: None,
316 0 : },
317 0 : permit,
318 0 : );
319 0 : info!(
320 0 : "scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
321 : l1_size, l2_size, l2_lsn, gc_cutoff
322 : );
323 : } else {
324 0 : debug!(
325 0 : "did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
326 : l1_size, l2_size, l2_lsn, gc_cutoff
327 : );
328 : }
329 0 : Ok(())
330 0 : }
331 :
332 : /// Notify the caller the job has finished and unblock GC.
333 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
334 0 : info!("compaction job id={} finished", id);
335 0 : let mut guard = self.inner.lock().unwrap();
336 0 : if let Some(items) = guard.guards.remove(&id) {
337 0 : drop(items.gc_guard);
338 0 : if let Some(tx) = items.notify {
339 0 : let _ = tx.send(());
340 0 : }
341 0 : }
342 0 : }
343 :
344 0 : async fn handle_sub_compaction(
345 0 : &self,
346 0 : id: GcCompactionJobId,
347 0 : options: CompactOptions,
348 0 : timeline: &Arc<Timeline>,
349 0 : gc_block: &GcBlock,
350 0 : auto: bool,
351 0 : ) -> Result<(), CompactionError> {
352 0 : info!(
353 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
354 : );
355 0 : let jobs = timeline
356 0 : .gc_compaction_split_jobs(
357 0 : GcCompactJob::from_compact_options(options.clone()),
358 0 : options.sub_compaction_max_job_size_mb,
359 0 : )
360 0 : .await
361 0 : .map_err(CompactionError::Other)?;
362 0 : if jobs.is_empty() {
363 0 : info!("no jobs to run, skipping scheduled compaction task");
364 0 : self.notify_and_unblock(id);
365 : } else {
366 0 : let gc_guard = match gc_block.start().await {
367 0 : Ok(guard) => guard,
368 0 : Err(e) => {
369 0 : return Err(CompactionError::Other(anyhow!(
370 0 : "cannot run gc-compaction because gc is blocked: {}",
371 0 : e
372 0 : )));
373 : }
374 : };
375 :
376 0 : let jobs_len = jobs.len();
377 0 : let mut pending_tasks = Vec::new();
378 0 : // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
379 0 : // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
380 0 : let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
381 0 : for job in jobs {
382 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
383 : // until we do further refactors to allow directly call `compact_with_gc`.
384 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
385 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
386 0 : if job.dry_run {
387 0 : flags |= CompactFlags::DryRun;
388 0 : }
389 0 : let options = CompactOptions {
390 0 : flags,
391 0 : sub_compaction: false,
392 0 : compact_key_range: Some(job.compact_key_range.into()),
393 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
394 0 : sub_compaction_max_job_size_mb: None,
395 0 : };
396 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
397 : }
398 :
399 0 : if !auto {
400 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
401 0 : } else {
402 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
403 0 : }
404 :
405 : {
406 0 : let mut guard = self.inner.lock().unwrap();
407 0 : guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
408 0 : let mut tasks = Vec::new();
409 0 : for task in pending_tasks {
410 0 : let id = guard.next_id();
411 0 : tasks.push((id, task));
412 0 : }
413 0 : tasks.reverse();
414 0 : for item in tasks {
415 0 : guard.queued.push_front(item);
416 0 : }
417 : }
418 0 : info!(
419 0 : "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs",
420 : jobs_len
421 : );
422 : }
423 0 : Ok(())
424 0 : }
425 :
426 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
427 0 : pub async fn iteration(
428 0 : &self,
429 0 : cancel: &CancellationToken,
430 0 : ctx: &RequestContext,
431 0 : gc_block: &GcBlock,
432 0 : timeline: &Arc<Timeline>,
433 0 : ) -> Result<CompactionOutcome, CompactionError> {
434 0 : let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
435 0 : return Err(CompactionError::AlreadyRunning(
436 0 : "cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
437 0 : ));
438 : };
439 : let has_pending_tasks;
440 0 : let Some((id, item)) = ({
441 0 : let mut guard = self.inner.lock().unwrap();
442 0 : if let Some((id, item)) = guard.queued.pop_front() {
443 0 : guard.running = Some((id, item.clone()));
444 0 : has_pending_tasks = !guard.queued.is_empty();
445 0 : Some((id, item))
446 : } else {
447 0 : has_pending_tasks = false;
448 0 : None
449 : }
450 : }) else {
451 0 : self.trigger_auto_compaction(timeline).await?;
452 : // Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
453 : // have not implemented preemption mechanism yet. We always want to yield it to more important
454 : // tasks if there is one.
455 0 : return Ok(CompactionOutcome::Done);
456 : };
457 0 : match item {
458 0 : GcCompactionQueueItem::MetaJob { options, auto } => {
459 0 : if !options
460 0 : .flags
461 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
462 : {
463 0 : warn!(
464 0 : "ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}",
465 : options
466 : );
467 0 : } else if options.sub_compaction {
468 0 : info!(
469 0 : "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
470 : );
471 0 : self.handle_sub_compaction(id, options, timeline, gc_block, auto)
472 0 : .await?;
473 : } else {
474 : // Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
475 : // in this branch.
476 0 : let gc_guard = match gc_block.start().await {
477 0 : Ok(guard) => guard,
478 0 : Err(e) => {
479 0 : return Err(CompactionError::Other(anyhow!(
480 0 : "cannot run gc-compaction because gc is blocked: {}",
481 0 : e
482 0 : )));
483 : }
484 : };
485 0 : {
486 0 : let mut guard = self.inner.lock().unwrap();
487 0 : guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
488 0 : }
489 0 : let _ = timeline.compact_with_options(cancel, options, ctx).await?;
490 0 : self.notify_and_unblock(id);
491 : }
492 : }
493 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
494 0 : // TODO: error handling, clear the queue if any task fails?
495 0 : let _ = timeline.compact_with_options(cancel, options, ctx).await?;
496 : }
497 0 : GcCompactionQueueItem::Notify(id, l2_lsn) => {
498 0 : self.notify_and_unblock(id);
499 0 : if let Some(l2_lsn) = l2_lsn {
500 0 : let current_l2_lsn = timeline
501 0 : .get_gc_compaction_state()
502 0 : .map(|x| x.last_completed_lsn)
503 0 : .unwrap_or(Lsn::INVALID);
504 0 : if l2_lsn >= current_l2_lsn {
505 0 : info!("l2_lsn updated to {}", l2_lsn);
506 0 : timeline
507 0 : .update_gc_compaction_state(GcCompactionState {
508 0 : last_completed_lsn: l2_lsn,
509 0 : })
510 0 : .map_err(CompactionError::Other)?;
511 : } else {
512 0 : warn!(
513 0 : "l2_lsn updated to {} but it is less than the current l2_lsn {}",
514 : l2_lsn, current_l2_lsn
515 : );
516 : }
517 0 : }
518 : }
519 : }
520 0 : {
521 0 : let mut guard = self.inner.lock().unwrap();
522 0 : guard.running = None;
523 0 : }
524 0 : Ok(if has_pending_tasks {
525 0 : CompactionOutcome::Pending
526 : } else {
527 0 : CompactionOutcome::Done
528 : })
529 0 : }
530 :
531 : #[allow(clippy::type_complexity)]
532 0 : pub fn remaining_jobs(
533 0 : &self,
534 0 : ) -> (
535 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
536 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
537 0 : ) {
538 0 : let guard = self.inner.lock().unwrap();
539 0 : (guard.running.clone(), guard.queued.clone())
540 0 : }
541 :
542 0 : pub fn remaining_jobs_num(&self) -> usize {
543 0 : let guard = self.inner.lock().unwrap();
544 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
545 0 : }
546 : }
547 :
548 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
549 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
550 : /// called.
551 : #[derive(Debug, Clone)]
552 : pub(crate) struct GcCompactJob {
553 : pub dry_run: bool,
554 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
555 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
556 : pub compact_key_range: Range<Key>,
557 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
558 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
559 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
560 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
561 : pub compact_lsn_range: Range<Lsn>,
562 : }
563 :
564 : impl GcCompactJob {
565 108 : pub fn from_compact_options(options: CompactOptions) -> Self {
566 108 : GcCompactJob {
567 108 : dry_run: options.flags.contains(CompactFlags::DryRun),
568 108 : compact_key_range: options
569 108 : .compact_key_range
570 108 : .map(|x| x.into())
571 108 : .unwrap_or(Key::MIN..Key::MAX),
572 108 : compact_lsn_range: options
573 108 : .compact_lsn_range
574 108 : .map(|x| x.into())
575 108 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
576 108 : }
577 108 : }
578 : }
579 :
580 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
581 : /// and contains the exact layers we want to compact.
582 : pub struct GcCompactionJobDescription {
583 : /// All layers to read in the compaction job
584 : selected_layers: Vec<Layer>,
585 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
586 : /// keep all deltas <= this LSN or generate an image == this LSN.
587 : gc_cutoff: Lsn,
588 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
589 : /// generate an image == this LSN.
590 : retain_lsns_below_horizon: Vec<Lsn>,
591 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
592 : /// \>= this LSN will be kept and will not be rewritten.
593 : max_layer_lsn: Lsn,
594 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
595 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
596 : /// k-merge within gc-compaction.
597 : min_layer_lsn: Lsn,
598 : /// Only compact layers overlapping with this range.
599 : compaction_key_range: Range<Key>,
600 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
601 : /// This field is here solely for debugging. The field will not be read once the compaction
602 : /// description is generated.
603 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
604 : }
605 :
606 : /// The result of bottom-most compaction for a single key at each LSN.
607 : #[derive(Debug)]
608 : #[cfg_attr(test, derive(PartialEq))]
609 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
610 :
611 : /// The result of bottom-most compaction.
612 : #[derive(Debug)]
613 : #[cfg_attr(test, derive(PartialEq))]
614 : pub(crate) struct KeyHistoryRetention {
615 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
616 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
617 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
618 : pub(crate) above_horizon: KeyLogAtLsn,
619 : }
620 :
621 : impl KeyHistoryRetention {
622 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
623 : ///
624 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
625 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
626 : /// And we have branches at LSN 0x10, 0x20, 0x30.
627 : /// Then we delete branch @ 0x20.
628 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
629 : /// And that wouldnt' change the shape of the layer.
630 : ///
631 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
632 : ///
633 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
634 148 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
635 148 : if dry_run {
636 0 : return true;
637 148 : }
638 148 : if LayerMap::is_l0(&key.key_range, key.is_delta) {
639 : // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
640 : // We should ignore such layers.
641 0 : return true;
642 148 : }
643 : let layer_generation;
644 : {
645 148 : let guard = tline.layers.read().await;
646 148 : if !guard.contains_key(key) {
647 104 : return false;
648 44 : }
649 44 : layer_generation = guard.get_from_key(key).metadata().generation;
650 44 : }
651 44 : if layer_generation == tline.generation {
652 44 : info!(
653 : key=%key,
654 : ?layer_generation,
655 0 : "discard layer due to duplicated layer key in the same generation",
656 : );
657 44 : true
658 : } else {
659 0 : false
660 : }
661 148 : }
662 :
663 : /// Pipe a history of a single key to the writers.
664 : ///
665 : /// If `image_writer` is none, the images will be placed into the delta layers.
666 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
667 : #[allow(clippy::too_many_arguments)]
668 1244 : async fn pipe_to(
669 1244 : self,
670 1244 : key: Key,
671 1244 : delta_writer: &mut SplitDeltaLayerWriter,
672 1244 : mut image_writer: Option<&mut SplitImageLayerWriter>,
673 1244 : stat: &mut CompactionStatistics,
674 1244 : ctx: &RequestContext,
675 1244 : ) -> anyhow::Result<()> {
676 1244 : let mut first_batch = true;
677 4024 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
678 2780 : if first_batch {
679 1244 : if logs.len() == 1 && logs[0].1.is_image() {
680 1168 : let Value::Image(img) = &logs[0].1 else {
681 0 : unreachable!()
682 : };
683 1168 : stat.produce_image_key(img);
684 1168 : if let Some(image_writer) = image_writer.as_mut() {
685 1168 : image_writer.put_image(key, img.clone(), ctx).await?;
686 : } else {
687 0 : delta_writer
688 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
689 0 : .await?;
690 : }
691 : } else {
692 132 : for (lsn, val) in logs {
693 56 : stat.produce_key(&val);
694 56 : delta_writer.put_value(key, lsn, val, ctx).await?;
695 : }
696 : }
697 1244 : first_batch = false;
698 : } else {
699 1768 : for (lsn, val) in logs {
700 232 : stat.produce_key(&val);
701 232 : delta_writer.put_value(key, lsn, val, ctx).await?;
702 : }
703 : }
704 : }
705 1244 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
706 1360 : for (lsn, val) in above_horizon_logs {
707 116 : stat.produce_key(&val);
708 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
709 : }
710 1244 : Ok(())
711 1244 : }
712 : }
713 :
714 : #[derive(Debug, Serialize, Default)]
715 : struct CompactionStatisticsNumSize {
716 : num: u64,
717 : size: u64,
718 : }
719 :
720 : #[derive(Debug, Serialize, Default)]
721 : pub struct CompactionStatistics {
722 : delta_layer_visited: CompactionStatisticsNumSize,
723 : image_layer_visited: CompactionStatisticsNumSize,
724 : delta_layer_produced: CompactionStatisticsNumSize,
725 : image_layer_produced: CompactionStatisticsNumSize,
726 : num_delta_layer_discarded: usize,
727 : num_image_layer_discarded: usize,
728 : num_unique_keys_visited: usize,
729 : wal_keys_visited: CompactionStatisticsNumSize,
730 : image_keys_visited: CompactionStatisticsNumSize,
731 : wal_produced: CompactionStatisticsNumSize,
732 : image_produced: CompactionStatisticsNumSize,
733 : }
734 :
735 : impl CompactionStatistics {
736 2084 : fn estimated_size_of_value(val: &Value) -> usize {
737 864 : match val {
738 1220 : Value::Image(img) => img.len(),
739 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
740 864 : _ => std::mem::size_of::<NeonWalRecord>(),
741 : }
742 2084 : }
743 3272 : fn estimated_size_of_key() -> usize {
744 3272 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
745 3272 : }
746 172 : fn visit_delta_layer(&mut self, size: u64) {
747 172 : self.delta_layer_visited.num += 1;
748 172 : self.delta_layer_visited.size += size;
749 172 : }
750 132 : fn visit_image_layer(&mut self, size: u64) {
751 132 : self.image_layer_visited.num += 1;
752 132 : self.image_layer_visited.size += size;
753 132 : }
754 1244 : fn on_unique_key_visited(&mut self) {
755 1244 : self.num_unique_keys_visited += 1;
756 1244 : }
757 480 : fn visit_wal_key(&mut self, val: &Value) {
758 480 : self.wal_keys_visited.num += 1;
759 480 : self.wal_keys_visited.size +=
760 480 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
761 480 : }
762 1220 : fn visit_image_key(&mut self, val: &Value) {
763 1220 : self.image_keys_visited.num += 1;
764 1220 : self.image_keys_visited.size +=
765 1220 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
766 1220 : }
767 404 : fn produce_key(&mut self, val: &Value) {
768 404 : match val {
769 20 : Value::Image(img) => self.produce_image_key(img),
770 384 : Value::WalRecord(_) => self.produce_wal_key(val),
771 : }
772 404 : }
773 384 : fn produce_wal_key(&mut self, val: &Value) {
774 384 : self.wal_produced.num += 1;
775 384 : self.wal_produced.size +=
776 384 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
777 384 : }
778 1188 : fn produce_image_key(&mut self, val: &Bytes) {
779 1188 : self.image_produced.num += 1;
780 1188 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
781 1188 : }
782 28 : fn discard_delta_layer(&mut self) {
783 28 : self.num_delta_layer_discarded += 1;
784 28 : }
785 16 : fn discard_image_layer(&mut self) {
786 16 : self.num_image_layer_discarded += 1;
787 16 : }
788 44 : fn produce_delta_layer(&mut self, size: u64) {
789 44 : self.delta_layer_produced.num += 1;
790 44 : self.delta_layer_produced.size += size;
791 44 : }
792 60 : fn produce_image_layer(&mut self, size: u64) {
793 60 : self.image_layer_produced.num += 1;
794 60 : self.image_layer_produced.size += size;
795 60 : }
796 : }
797 :
798 : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
799 : pub enum CompactionOutcome {
800 : #[default]
801 : /// No layers need to be compacted after this round. Compaction doesn't need
802 : /// to be immediately scheduled.
803 : Done,
804 : /// Still has pending layers to be compacted after this round. Ideally, the scheduler
805 : /// should immediately schedule another compaction.
806 : Pending,
807 : /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
808 : /// guaranteed when `compaction_l0_first` is enabled).
809 : YieldForL0,
810 : /// Compaction was skipped, because the timeline is ineligible for compaction.
811 : Skipped,
812 : }
813 :
814 : impl Timeline {
815 : /// TODO: cancellation
816 : ///
817 : /// Returns whether the compaction has pending tasks.
818 728 : pub(crate) async fn compact_legacy(
819 728 : self: &Arc<Self>,
820 728 : cancel: &CancellationToken,
821 728 : options: CompactOptions,
822 728 : ctx: &RequestContext,
823 728 : ) -> Result<CompactionOutcome, CompactionError> {
824 728 : if options
825 728 : .flags
826 728 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
827 : {
828 0 : self.compact_with_gc(cancel, options, ctx)
829 0 : .await
830 0 : .map_err(CompactionError::Other)?;
831 0 : return Ok(CompactionOutcome::Done);
832 728 : }
833 728 :
834 728 : if options.flags.contains(CompactFlags::DryRun) {
835 0 : return Err(CompactionError::Other(anyhow!(
836 0 : "dry-run mode is not supported for legacy compaction for now"
837 0 : )));
838 728 : }
839 728 :
840 728 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
841 : // maybe useful in the future? could implement this at some point
842 0 : return Err(CompactionError::Other(anyhow!(
843 0 : "compaction range is not supported for legacy compaction for now"
844 0 : )));
845 728 : }
846 728 :
847 728 : // High level strategy for compaction / image creation:
848 728 : //
849 728 : // 1. First, do a L0 compaction to ensure we move the L0
850 728 : // layers into the historic layer map get flat levels of
851 728 : // layers. If we did not compact all L0 layers, we will
852 728 : // prioritize compacting the timeline again and not do
853 728 : // any of the compactions below.
854 728 : //
855 728 : // 2. Then, calculate the desired "partitioning" of the
856 728 : // currently in-use key space. The goal is to partition the
857 728 : // key space into roughly fixed-size chunks, but also take into
858 728 : // account any existing image layers, and try to align the
859 728 : // chunk boundaries with the existing image layers to avoid
860 728 : // too much churn. Also try to align chunk boundaries with
861 728 : // relation boundaries. In principle, we don't know about
862 728 : // relation boundaries here, we just deal with key-value
863 728 : // pairs, and the code in pgdatadir_mapping.rs knows how to
864 728 : // map relations into key-value pairs. But in practice we know
865 728 : // that 'field6' is the block number, and the fields 1-5
866 728 : // identify a relation. This is just an optimization,
867 728 : // though.
868 728 : //
869 728 : // 3. Once we know the partitioning, for each partition,
870 728 : // decide if it's time to create a new image layer. The
871 728 : // criteria is: there has been too much "churn" since the last
872 728 : // image layer? The "churn" is fuzzy concept, it's a
873 728 : // combination of too many delta files, or too much WAL in
874 728 : // total in the delta file. Or perhaps: if creating an image
875 728 : // file would allow to delete some older files.
876 728 : //
877 728 : // 4. In the end, if the tenant gets auto-sharded, we will run
878 728 : // a shard-ancestor compaction.
879 728 :
880 728 : // Is the timeline being deleted?
881 728 : if self.is_stopping() {
882 0 : trace!("Dropping out of compaction on timeline shutdown");
883 0 : return Err(CompactionError::ShuttingDown);
884 728 : }
885 728 :
886 728 : let target_file_size = self.get_checkpoint_distance();
887 :
888 : // Define partitioning schema if needed
889 :
890 : // 1. L0 Compact
891 728 : let l0_outcome = {
892 728 : let timer = self.metrics.compact_time_histo.start_timer();
893 728 : let l0_outcome = self
894 728 : .compact_level0(
895 728 : target_file_size,
896 728 : options.flags.contains(CompactFlags::ForceL0Compaction),
897 728 : ctx,
898 728 : )
899 728 : .await?;
900 728 : timer.stop_and_record();
901 728 : l0_outcome
902 728 : };
903 728 :
904 728 : if options.flags.contains(CompactFlags::OnlyL0Compaction) {
905 0 : return Ok(l0_outcome);
906 728 : }
907 728 :
908 728 : // Yield if we have pending L0 compaction. The scheduler will do another pass.
909 728 : if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
910 0 : && !options.flags.contains(CompactFlags::NoYield)
911 : {
912 0 : info!("image/ancestor compaction yielding for L0 compaction");
913 0 : return Ok(CompactionOutcome::YieldForL0);
914 728 : }
915 728 :
916 728 : // 2. Repartition and create image layers if necessary
917 728 : match self
918 728 : .repartition(
919 728 : self.get_last_record_lsn(),
920 728 : self.get_compaction_target_size(),
921 728 : options.flags,
922 728 : ctx,
923 728 : )
924 728 : .await
925 : {
926 728 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
927 728 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
928 728 : let image_ctx = RequestContextBuilder::extend(ctx)
929 728 : .access_stats_behavior(AccessStatsBehavior::Skip)
930 728 : .build();
931 728 :
932 728 : let mut partitioning = dense_partitioning;
933 728 : partitioning
934 728 : .parts
935 728 : .extend(sparse_partitioning.into_dense().parts);
936 :
937 : // 3. Create new image layers for partitions that have been modified "enough".
938 728 : let (image_layers, outcome) = self
939 728 : .create_image_layers(
940 728 : &partitioning,
941 728 : lsn,
942 728 : if options
943 728 : .flags
944 728 : .contains(CompactFlags::ForceImageLayerCreation)
945 : {
946 28 : ImageLayerCreationMode::Force
947 : } else {
948 700 : ImageLayerCreationMode::Try
949 : },
950 728 : &image_ctx,
951 728 : self.last_image_layer_creation_status
952 728 : .load()
953 728 : .as_ref()
954 728 : .clone(),
955 728 : !options.flags.contains(CompactFlags::NoYield),
956 728 : )
957 728 : .await
958 728 : .inspect_err(|err| {
959 : if let CreateImageLayersError::GetVectoredError(
960 : GetVectoredError::MissingKey(_),
961 0 : ) = err
962 : {
963 0 : critical!("missing key during compaction: {err:?}");
964 0 : }
965 728 : })?;
966 :
967 728 : self.last_image_layer_creation_status
968 728 : .store(Arc::new(outcome.clone()));
969 728 :
970 728 : self.upload_new_image_layers(image_layers)?;
971 728 : if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
972 : // Yield and do not do any other kind of compaction.
973 0 : info!(
974 0 : "skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
975 : );
976 0 : return Ok(CompactionOutcome::YieldForL0);
977 728 : }
978 : }
979 :
980 : // Suppress errors when cancelled.
981 0 : Err(_) if self.cancel.is_cancelled() => {}
982 0 : Err(err) if err.is_cancel() => {}
983 :
984 : // Alert on critical errors that indicate data corruption.
985 0 : Err(err) if err.is_critical() => {
986 0 : critical!("could not compact, repartitioning keyspace failed: {err:?}");
987 : }
988 :
989 : // Log other errors. No partitioning? This is normal, if the timeline was just created
990 : // as an empty timeline. Also in unit tests, when we use the timeline as a simple
991 : // key-value store, ignoring the datadir layout. Log the error but continue.
992 0 : Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
993 : };
994 :
995 728 : let partition_count = self.partitioning.read().0.0.parts.len();
996 728 :
997 728 : // 4. Shard ancestor compaction
998 728 :
999 728 : if self.shard_identity.count >= ShardCount::new(2) {
1000 : // Limit the number of layer rewrites to the number of partitions: this means its
1001 : // runtime should be comparable to a full round of image layer creations, rather than
1002 : // being potentially much longer.
1003 0 : let rewrite_max = partition_count;
1004 0 :
1005 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
1006 728 : }
1007 :
1008 728 : Ok(CompactionOutcome::Done)
1009 728 : }
1010 :
1011 : /// Check for layers that are elegible to be rewritten:
1012 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
1013 : /// we don't indefinitely retain keys in this shard that aren't needed.
1014 : /// - For future use: layers beyond pitr_interval that are in formats we would
1015 : /// rather not maintain compatibility with indefinitely.
1016 : ///
1017 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
1018 : /// how much work it will try to do in each compaction pass.
1019 0 : async fn compact_shard_ancestors(
1020 0 : self: &Arc<Self>,
1021 0 : rewrite_max: usize,
1022 0 : ctx: &RequestContext,
1023 0 : ) -> Result<(), CompactionError> {
1024 0 : let mut drop_layers = Vec::new();
1025 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
1026 0 :
1027 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
1028 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
1029 0 : // pitr_interval, for example because a branchpoint references it.
1030 0 : //
1031 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
1032 0 : // are rewriting layers.
1033 0 : let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
1034 0 :
1035 0 : tracing::info!(
1036 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
1037 0 : *latest_gc_cutoff,
1038 0 : self.gc_info.read().unwrap().cutoffs.time
1039 : );
1040 :
1041 0 : let layers = self.layers.read().await;
1042 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
1043 0 : let layer = layers.get_from_desc(&layer_desc);
1044 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
1045 : // This layer does not belong to a historic ancestor, no need to re-image it.
1046 0 : continue;
1047 0 : }
1048 0 :
1049 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
1050 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
1051 0 : let layer_local_page_count = sharded_range.page_count();
1052 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
1053 0 : if layer_local_page_count == 0 {
1054 : // This ancestral layer only covers keys that belong to other shards.
1055 : // We include the full metadata in the log: if we had some critical bug that caused
1056 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
1057 0 : info!(%layer, old_metadata=?layer.metadata(),
1058 0 : "dropping layer after shard split, contains no keys for this shard.",
1059 : );
1060 :
1061 0 : if cfg!(debug_assertions) {
1062 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
1063 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
1064 : // should be !is_key_disposable()
1065 0 : let range = layer_desc.get_key_range();
1066 0 : let mut key = range.start;
1067 0 : while key < range.end {
1068 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
1069 0 : key = key.next();
1070 : }
1071 0 : }
1072 :
1073 0 : drop_layers.push(layer);
1074 0 : continue;
1075 0 : } else if layer_local_page_count != u32::MAX
1076 0 : && layer_local_page_count == layer_raw_page_count
1077 : {
1078 0 : debug!(%layer,
1079 0 : "layer is entirely shard local ({} keys), no need to filter it",
1080 : layer_local_page_count
1081 : );
1082 0 : continue;
1083 0 : }
1084 0 :
1085 0 : // Don't bother re-writing a layer unless it will at least halve its size
1086 0 : if layer_local_page_count != u32::MAX
1087 0 : && layer_local_page_count > layer_raw_page_count / 2
1088 : {
1089 0 : debug!(%layer,
1090 0 : "layer is already mostly local ({}/{}), not rewriting",
1091 : layer_local_page_count,
1092 : layer_raw_page_count
1093 : );
1094 0 : }
1095 :
1096 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
1097 : // without incurring the I/O cost of a rewrite.
1098 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
1099 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
1100 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
1101 0 : continue;
1102 0 : }
1103 0 :
1104 0 : if layer_desc.is_delta() {
1105 : // We do not yet implement rewrite of delta layers
1106 0 : debug!(%layer, "Skipping rewrite of delta layer");
1107 0 : continue;
1108 0 : }
1109 0 :
1110 0 : // Only rewrite layers if their generations differ. This guarantees:
1111 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
1112 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
1113 0 : if layer.metadata().generation == self.generation {
1114 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
1115 0 : continue;
1116 0 : }
1117 0 :
1118 0 : if layers_to_rewrite.len() >= rewrite_max {
1119 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
1120 0 : layers_to_rewrite.len()
1121 : );
1122 0 : continue;
1123 0 : }
1124 0 :
1125 0 : // Fall through: all our conditions for doing a rewrite passed.
1126 0 : layers_to_rewrite.push(layer);
1127 : }
1128 :
1129 : // Drop read lock on layer map before we start doing time-consuming I/O
1130 0 : drop(layers);
1131 0 :
1132 0 : let mut replace_image_layers = Vec::new();
1133 :
1134 0 : for layer in layers_to_rewrite {
1135 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
1136 0 : let mut image_layer_writer = ImageLayerWriter::new(
1137 0 : self.conf,
1138 0 : self.timeline_id,
1139 0 : self.tenant_shard_id,
1140 0 : &layer.layer_desc().key_range,
1141 0 : layer.layer_desc().image_layer_lsn(),
1142 0 : ctx,
1143 0 : )
1144 0 : .await
1145 0 : .map_err(CompactionError::Other)?;
1146 :
1147 : // Safety of layer rewrites:
1148 : // - We are writing to a different local file path than we are reading from, so the old Layer
1149 : // cannot interfere with the new one.
1150 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
1151 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
1152 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
1153 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
1154 : // - Any readers that have a reference to the old layer will keep it alive until they are done
1155 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
1156 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
1157 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
1158 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
1159 : // - ingestion, which only inserts layers, therefore cannot collide with us.
1160 0 : let resident = layer.download_and_keep_resident(ctx).await?;
1161 :
1162 0 : let keys_written = resident
1163 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
1164 0 : .await?;
1165 :
1166 0 : if keys_written > 0 {
1167 0 : let (desc, path) = image_layer_writer
1168 0 : .finish(ctx)
1169 0 : .await
1170 0 : .map_err(CompactionError::Other)?;
1171 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
1172 0 : .map_err(CompactionError::Other)?;
1173 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
1174 0 : layer.metadata().file_size,
1175 0 : new_layer.metadata().file_size);
1176 :
1177 0 : replace_image_layers.push((layer, new_layer));
1178 0 : } else {
1179 0 : // Drop the old layer. Usually for this case we would already have noticed that
1180 0 : // the layer has no data for us with the ShardedRange check above, but
1181 0 : drop_layers.push(layer);
1182 0 : }
1183 : }
1184 :
1185 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
1186 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
1187 : // to remote index) and be removed. This is inefficient but safe.
1188 0 : fail::fail_point!("compact-shard-ancestors-localonly");
1189 0 :
1190 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
1191 0 : self.rewrite_layers(replace_image_layers, drop_layers)
1192 0 : .await?;
1193 :
1194 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
1195 0 :
1196 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
1197 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
1198 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
1199 0 : // load.
1200 0 : match self.remote_client.wait_completion().await {
1201 0 : Ok(()) => (),
1202 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
1203 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
1204 0 : return Err(CompactionError::ShuttingDown);
1205 : }
1206 : }
1207 :
1208 0 : fail::fail_point!("compact-shard-ancestors-persistent");
1209 0 :
1210 0 : Ok(())
1211 0 : }
1212 :
1213 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
1214 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
1215 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
1216 : ///
1217 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
1218 : /// that we know won't be needed for reads.
1219 468 : pub(crate) async fn update_layer_visibility(
1220 468 : &self,
1221 468 : ) -> Result<(), super::layer_manager::Shutdown> {
1222 468 : let head_lsn = self.get_last_record_lsn();
1223 :
1224 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
1225 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
1226 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
1227 : // they will be subject to L0->L1 compaction in the near future.
1228 468 : let layer_manager = self.layers.read().await;
1229 468 : let layer_map = layer_manager.layer_map()?;
1230 :
1231 468 : let readable_points = {
1232 468 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
1233 468 :
1234 468 : let mut readable_points = Vec::with_capacity(children.len() + 1);
1235 468 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
1236 0 : if *is_offloaded == MaybeOffloaded::Yes {
1237 0 : continue;
1238 0 : }
1239 0 : readable_points.push(*child_lsn);
1240 : }
1241 468 : readable_points.push(head_lsn);
1242 468 : readable_points
1243 468 : };
1244 468 :
1245 468 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
1246 1184 : for (layer_desc, visibility) in layer_visibility {
1247 716 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
1248 716 : let layer = layer_manager.get_from_desc(&layer_desc);
1249 716 : layer.set_visibility(visibility);
1250 716 : }
1251 :
1252 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
1253 : // avoid assuming that everything at a branch point is visible.
1254 468 : drop(covered);
1255 468 : Ok(())
1256 468 : }
1257 :
1258 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1259 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1260 728 : async fn compact_level0(
1261 728 : self: &Arc<Self>,
1262 728 : target_file_size: u64,
1263 728 : force_compaction_ignore_threshold: bool,
1264 728 : ctx: &RequestContext,
1265 728 : ) -> Result<CompactionOutcome, CompactionError> {
1266 : let CompactLevel0Phase1Result {
1267 728 : new_layers,
1268 728 : deltas_to_compact,
1269 728 : outcome,
1270 : } = {
1271 728 : let phase1_span = info_span!("compact_level0_phase1");
1272 728 : let ctx = ctx.attached_child();
1273 728 : let mut stats = CompactLevel0Phase1StatsBuilder {
1274 728 : version: Some(2),
1275 728 : tenant_id: Some(self.tenant_shard_id),
1276 728 : timeline_id: Some(self.timeline_id),
1277 728 : ..Default::default()
1278 728 : };
1279 728 :
1280 728 : let begin = tokio::time::Instant::now();
1281 728 : let phase1_layers_locked = self.layers.read().await;
1282 728 : let now = tokio::time::Instant::now();
1283 728 : stats.read_lock_acquisition_micros =
1284 728 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1285 728 : self.compact_level0_phase1(
1286 728 : phase1_layers_locked,
1287 728 : stats,
1288 728 : target_file_size,
1289 728 : force_compaction_ignore_threshold,
1290 728 : &ctx,
1291 728 : )
1292 728 : .instrument(phase1_span)
1293 728 : .await?
1294 : };
1295 :
1296 728 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1297 : // nothing to do
1298 672 : return Ok(CompactionOutcome::Done);
1299 56 : }
1300 56 :
1301 56 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1302 56 : .await?;
1303 56 : Ok(outcome)
1304 728 : }
1305 :
1306 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1307 728 : async fn compact_level0_phase1<'a>(
1308 728 : self: &'a Arc<Self>,
1309 728 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1310 728 : mut stats: CompactLevel0Phase1StatsBuilder,
1311 728 : target_file_size: u64,
1312 728 : force_compaction_ignore_threshold: bool,
1313 728 : ctx: &RequestContext,
1314 728 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1315 728 : stats.read_lock_held_spawn_blocking_startup_micros =
1316 728 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1317 728 : let layers = guard.layer_map()?;
1318 728 : let level0_deltas = layers.level0_deltas();
1319 728 : stats.level0_deltas_count = Some(level0_deltas.len());
1320 728 :
1321 728 : // Only compact if enough layers have accumulated.
1322 728 : let threshold = self.get_compaction_threshold();
1323 728 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1324 672 : if force_compaction_ignore_threshold {
1325 0 : if !level0_deltas.is_empty() {
1326 0 : info!(
1327 0 : level0_deltas = level0_deltas.len(),
1328 0 : threshold, "too few deltas to compact, but forcing compaction"
1329 : );
1330 : } else {
1331 0 : info!(
1332 0 : level0_deltas = level0_deltas.len(),
1333 0 : threshold, "too few deltas to compact, cannot force compaction"
1334 : );
1335 0 : return Ok(CompactLevel0Phase1Result::default());
1336 : }
1337 : } else {
1338 672 : debug!(
1339 0 : level0_deltas = level0_deltas.len(),
1340 0 : threshold, "too few deltas to compact"
1341 : );
1342 672 : return Ok(CompactLevel0Phase1Result::default());
1343 : }
1344 56 : }
1345 :
1346 56 : let mut level0_deltas = level0_deltas
1347 56 : .iter()
1348 804 : .map(|x| guard.get_from_desc(x))
1349 56 : .collect::<Vec<_>>();
1350 56 :
1351 56 : // Gather the files to compact in this iteration.
1352 56 : //
1353 56 : // Start with the oldest Level 0 delta file, and collect any other
1354 56 : // level 0 files that form a contiguous sequence, such that the end
1355 56 : // LSN of previous file matches the start LSN of the next file.
1356 56 : //
1357 56 : // Note that if the files don't form such a sequence, we might
1358 56 : // "compact" just a single file. That's a bit pointless, but it allows
1359 56 : // us to get rid of the level 0 file, and compact the other files on
1360 56 : // the next iteration. This could probably made smarter, but such
1361 56 : // "gaps" in the sequence of level 0 files should only happen in case
1362 56 : // of a crash, partial download from cloud storage, or something like
1363 56 : // that, so it's not a big deal in practice.
1364 1496 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1365 56 : let mut level0_deltas_iter = level0_deltas.iter();
1366 56 :
1367 56 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1368 56 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1369 56 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1370 56 :
1371 56 : // Accumulate the size of layers in `deltas_to_compact`
1372 56 : let mut deltas_to_compact_bytes = 0;
1373 56 :
1374 56 : // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
1375 56 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1376 56 : // work in this function to only operate on this much delta data at once.
1377 56 : //
1378 56 : // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
1379 56 : // the constraint is not respected, we use the larger of the two.
1380 56 : let delta_size_limit = std::cmp::max(
1381 56 : self.get_compaction_upper_limit(),
1382 56 : self.get_compaction_threshold(),
1383 56 : ) as u64
1384 56 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1385 56 :
1386 56 : let mut fully_compacted = true;
1387 56 :
1388 56 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
1389 804 : for l in level0_deltas_iter {
1390 748 : let lsn_range = &l.layer_desc().lsn_range;
1391 748 :
1392 748 : if lsn_range.start != prev_lsn_end {
1393 0 : break;
1394 748 : }
1395 748 : deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
1396 748 : deltas_to_compact_bytes += l.metadata().file_size;
1397 748 : prev_lsn_end = lsn_range.end;
1398 748 :
1399 748 : if deltas_to_compact_bytes >= delta_size_limit {
1400 0 : info!(
1401 0 : l0_deltas_selected = deltas_to_compact.len(),
1402 0 : l0_deltas_total = level0_deltas.len(),
1403 0 : "L0 compaction picker hit max delta layer size limit: {}",
1404 : delta_size_limit
1405 : );
1406 0 : fully_compacted = false;
1407 0 :
1408 0 : // Proceed with compaction, but only a subset of L0s
1409 0 : break;
1410 748 : }
1411 : }
1412 56 : let lsn_range = Range {
1413 56 : start: deltas_to_compact
1414 56 : .first()
1415 56 : .unwrap()
1416 56 : .layer_desc()
1417 56 : .lsn_range
1418 56 : .start,
1419 56 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1420 56 : };
1421 56 :
1422 56 : info!(
1423 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1424 0 : lsn_range.start,
1425 0 : lsn_range.end,
1426 0 : deltas_to_compact.len(),
1427 0 : level0_deltas.len()
1428 : );
1429 :
1430 804 : for l in deltas_to_compact.iter() {
1431 804 : info!("compact includes {l}");
1432 : }
1433 :
1434 : // We don't need the original list of layers anymore. Drop it so that
1435 : // we don't accidentally use it later in the function.
1436 56 : drop(level0_deltas);
1437 56 :
1438 56 : stats.read_lock_held_prerequisites_micros = stats
1439 56 : .read_lock_held_spawn_blocking_startup_micros
1440 56 : .till_now();
1441 :
1442 : // TODO: replace with streaming k-merge
1443 56 : let all_keys = {
1444 56 : let mut all_keys = Vec::new();
1445 804 : for l in deltas_to_compact.iter() {
1446 804 : if self.cancel.is_cancelled() {
1447 0 : return Err(CompactionError::ShuttingDown);
1448 804 : }
1449 804 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1450 804 : let keys = delta
1451 804 : .index_entries(ctx)
1452 804 : .await
1453 804 : .map_err(CompactionError::Other)?;
1454 804 : all_keys.extend(keys);
1455 : }
1456 : // The current stdlib sorting implementation is designed in a way where it is
1457 : // particularly fast where the slice is made up of sorted sub-ranges.
1458 8847544 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1459 56 : all_keys
1460 56 : };
1461 56 :
1462 56 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1463 :
1464 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1465 : //
1466 : // A hole is a key range for which this compaction doesn't have any WAL records.
1467 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1468 : // cover the hole, but actually don't contain any WAL records for that key range.
1469 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1470 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1471 : //
1472 : // The algorithm chooses holes as follows.
1473 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1474 : // - Filter: min threshold on range length
1475 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1476 : //
1477 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1478 : #[derive(PartialEq, Eq)]
1479 : struct Hole {
1480 : key_range: Range<Key>,
1481 : coverage_size: usize,
1482 : }
1483 56 : let holes: Vec<Hole> = {
1484 : use std::cmp::Ordering;
1485 : impl Ord for Hole {
1486 0 : fn cmp(&self, other: &Self) -> Ordering {
1487 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1488 0 : }
1489 : }
1490 : impl PartialOrd for Hole {
1491 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1492 0 : Some(self.cmp(other))
1493 0 : }
1494 : }
1495 56 : let max_holes = deltas_to_compact.len();
1496 56 : let last_record_lsn = self.get_last_record_lsn();
1497 56 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1498 56 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1499 56 : // min-heap (reserve space for one more element added before eviction)
1500 56 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1501 56 : let mut prev: Option<Key> = None;
1502 :
1503 4128076 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1504 4128076 : if let Some(prev_key) = prev {
1505 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1506 : // compaction is the gap between data key and metadata keys.
1507 4128020 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1508 0 : && !Key::is_metadata_key(&prev_key)
1509 : {
1510 0 : let key_range = prev_key..next_key;
1511 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1512 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1513 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1514 0 : // That is why it is better to measure size of hole as number of covering image layers.
1515 0 : let coverage_size =
1516 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1517 0 : if coverage_size >= min_hole_coverage_size {
1518 0 : heap.push(Hole {
1519 0 : key_range,
1520 0 : coverage_size,
1521 0 : });
1522 0 : if heap.len() > max_holes {
1523 0 : heap.pop(); // remove smallest hole
1524 0 : }
1525 0 : }
1526 4128020 : }
1527 56 : }
1528 4128076 : prev = Some(next_key.next());
1529 : }
1530 56 : let mut holes = heap.into_vec();
1531 56 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1532 56 : holes
1533 56 : };
1534 56 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1535 56 : drop_rlock(guard);
1536 56 :
1537 56 : if self.cancel.is_cancelled() {
1538 0 : return Err(CompactionError::ShuttingDown);
1539 56 : }
1540 56 :
1541 56 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1542 :
1543 : // This iterator walks through all key-value pairs from all the layers
1544 : // we're compacting, in key, LSN order.
1545 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1546 : // then the Value::Image is ordered before Value::WalRecord.
1547 56 : let mut all_values_iter = {
1548 56 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1549 804 : for l in deltas_to_compact.iter() {
1550 804 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1551 804 : deltas.push(l);
1552 : }
1553 56 : MergeIterator::create(&deltas, &[], ctx)
1554 56 : };
1555 56 :
1556 56 : // This iterator walks through all keys and is needed to calculate size used by each key
1557 56 : let mut all_keys_iter = all_keys
1558 56 : .iter()
1559 4128076 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1560 4128020 : .coalesce(|mut prev, cur| {
1561 4128020 : // Coalesce keys that belong to the same key pair.
1562 4128020 : // This ensures that compaction doesn't put them
1563 4128020 : // into different layer files.
1564 4128020 : // Still limit this by the target file size,
1565 4128020 : // so that we keep the size of the files in
1566 4128020 : // check.
1567 4128020 : if prev.0 == cur.0 && prev.2 < target_file_size {
1568 80076 : prev.2 += cur.2;
1569 80076 : Ok(prev)
1570 : } else {
1571 4047944 : Err((prev, cur))
1572 : }
1573 4128020 : });
1574 56 :
1575 56 : // Merge the contents of all the input delta layers into a new set
1576 56 : // of delta layers, based on the current partitioning.
1577 56 : //
1578 56 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1579 56 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1580 56 : // would be too large. In that case, we also split on the LSN dimension.
1581 56 : //
1582 56 : // LSN
1583 56 : // ^
1584 56 : // |
1585 56 : // | +-----------+ +--+--+--+--+
1586 56 : // | | | | | | | |
1587 56 : // | +-----------+ | | | | |
1588 56 : // | | | | | | | |
1589 56 : // | +-----------+ ==> | | | | |
1590 56 : // | | | | | | | |
1591 56 : // | +-----------+ | | | | |
1592 56 : // | | | | | | | |
1593 56 : // | +-----------+ +--+--+--+--+
1594 56 : // |
1595 56 : // +--------------> key
1596 56 : //
1597 56 : //
1598 56 : // If one key (X) has a lot of page versions:
1599 56 : //
1600 56 : // LSN
1601 56 : // ^
1602 56 : // | (X)
1603 56 : // | +-----------+ +--+--+--+--+
1604 56 : // | | | | | | | |
1605 56 : // | +-----------+ | | +--+ |
1606 56 : // | | | | | | | |
1607 56 : // | +-----------+ ==> | | | | |
1608 56 : // | | | | | +--+ |
1609 56 : // | +-----------+ | | | | |
1610 56 : // | | | | | | | |
1611 56 : // | +-----------+ +--+--+--+--+
1612 56 : // |
1613 56 : // +--------------> key
1614 56 : // TODO: this actually divides the layers into fixed-size chunks, not
1615 56 : // based on the partitioning.
1616 56 : //
1617 56 : // TODO: we should also opportunistically materialize and
1618 56 : // garbage collect what we can.
1619 56 : let mut new_layers = Vec::new();
1620 56 : let mut prev_key: Option<Key> = None;
1621 56 : let mut writer: Option<DeltaLayerWriter> = None;
1622 56 : let mut key_values_total_size = 0u64;
1623 56 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1624 56 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1625 56 : let mut next_hole = 0; // index of next hole in holes vector
1626 56 :
1627 56 : let mut keys = 0;
1628 :
1629 4128132 : while let Some((key, lsn, value)) = all_values_iter
1630 4128132 : .next()
1631 4128132 : .await
1632 4128132 : .map_err(CompactionError::Other)?
1633 : {
1634 4128076 : keys += 1;
1635 4128076 :
1636 4128076 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1637 : // avoid hitting the cancellation token on every key. in benches, we end up
1638 : // shuffling an order of million keys per layer, this means we'll check it
1639 : // around tens of times per layer.
1640 0 : return Err(CompactionError::ShuttingDown);
1641 4128076 : }
1642 4128076 :
1643 4128076 : let same_key = prev_key == Some(key);
1644 4128076 : // We need to check key boundaries once we reach next key or end of layer with the same key
1645 4128076 : if !same_key || lsn == dup_end_lsn {
1646 4048000 : let mut next_key_size = 0u64;
1647 4048000 : let is_dup_layer = dup_end_lsn.is_valid();
1648 4048000 : dup_start_lsn = Lsn::INVALID;
1649 4048000 : if !same_key {
1650 4048000 : dup_end_lsn = Lsn::INVALID;
1651 4048000 : }
1652 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1653 4048000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1654 4048000 : next_key_size = next_size;
1655 4048000 : if key != next_key {
1656 4047944 : if dup_end_lsn.is_valid() {
1657 0 : // We are writting segment with duplicates:
1658 0 : // place all remaining values of this key in separate segment
1659 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1660 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1661 4047944 : }
1662 4047944 : break;
1663 56 : }
1664 56 : key_values_total_size += next_size;
1665 56 : // Check if it is time to split segment: if total keys size is larger than target file size.
1666 56 : // We need to avoid generation of empty segments if next_size > target_file_size.
1667 56 : if key_values_total_size > target_file_size && lsn != next_lsn {
1668 : // Split key between multiple layers: such layer can contain only single key
1669 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1670 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1671 : } else {
1672 0 : lsn // start with the first LSN for this key
1673 : };
1674 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1675 0 : break;
1676 56 : }
1677 : }
1678 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1679 4048000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1680 0 : dup_start_lsn = dup_end_lsn;
1681 0 : dup_end_lsn = lsn_range.end;
1682 4048000 : }
1683 4048000 : if writer.is_some() {
1684 4047944 : let written_size = writer.as_mut().unwrap().size();
1685 4047944 : let contains_hole =
1686 4047944 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1687 : // check if key cause layer overflow or contains hole...
1688 4047944 : if is_dup_layer
1689 4047944 : || dup_end_lsn.is_valid()
1690 4047944 : || written_size + key_values_total_size > target_file_size
1691 4047384 : || contains_hole
1692 : {
1693 : // ... if so, flush previous layer and prepare to write new one
1694 560 : let (desc, path) = writer
1695 560 : .take()
1696 560 : .unwrap()
1697 560 : .finish(prev_key.unwrap().next(), ctx)
1698 560 : .await
1699 560 : .map_err(CompactionError::Other)?;
1700 560 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1701 560 : .map_err(CompactionError::Other)?;
1702 :
1703 560 : new_layers.push(new_delta);
1704 560 : writer = None;
1705 560 :
1706 560 : if contains_hole {
1707 0 : // skip hole
1708 0 : next_hole += 1;
1709 560 : }
1710 4047384 : }
1711 56 : }
1712 : // Remember size of key value because at next iteration we will access next item
1713 4048000 : key_values_total_size = next_key_size;
1714 80076 : }
1715 4128076 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1716 0 : Err(CompactionError::Other(anyhow::anyhow!(
1717 0 : "failpoint delta-layer-writer-fail-before-finish"
1718 0 : )))
1719 4128076 : });
1720 :
1721 4128076 : if !self.shard_identity.is_key_disposable(&key) {
1722 4128076 : if writer.is_none() {
1723 616 : if self.cancel.is_cancelled() {
1724 : // to be somewhat responsive to cancellation, check for each new layer
1725 0 : return Err(CompactionError::ShuttingDown);
1726 616 : }
1727 : // Create writer if not initiaized yet
1728 616 : writer = Some(
1729 : DeltaLayerWriter::new(
1730 616 : self.conf,
1731 616 : self.timeline_id,
1732 616 : self.tenant_shard_id,
1733 616 : key,
1734 616 : if dup_end_lsn.is_valid() {
1735 : // this is a layer containing slice of values of the same key
1736 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1737 0 : dup_start_lsn..dup_end_lsn
1738 : } else {
1739 616 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1740 616 : lsn_range.clone()
1741 : },
1742 616 : ctx,
1743 616 : )
1744 616 : .await
1745 616 : .map_err(CompactionError::Other)?,
1746 : );
1747 :
1748 616 : keys = 0;
1749 4127460 : }
1750 :
1751 4128076 : writer
1752 4128076 : .as_mut()
1753 4128076 : .unwrap()
1754 4128076 : .put_value(key, lsn, value, ctx)
1755 4128076 : .await
1756 4128076 : .map_err(CompactionError::Other)?;
1757 : } else {
1758 0 : let owner = self.shard_identity.get_shard_number(&key);
1759 0 :
1760 0 : // This happens after a shard split, when we're compacting an L0 created by our parent shard
1761 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
1762 : }
1763 :
1764 4128076 : if !new_layers.is_empty() {
1765 39572 : fail_point!("after-timeline-compacted-first-L1");
1766 4088504 : }
1767 :
1768 4128076 : prev_key = Some(key);
1769 : }
1770 56 : if let Some(writer) = writer {
1771 56 : let (desc, path) = writer
1772 56 : .finish(prev_key.unwrap().next(), ctx)
1773 56 : .await
1774 56 : .map_err(CompactionError::Other)?;
1775 56 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1776 56 : .map_err(CompactionError::Other)?;
1777 56 : new_layers.push(new_delta);
1778 0 : }
1779 :
1780 : // Sync layers
1781 56 : if !new_layers.is_empty() {
1782 : // Print a warning if the created layer is larger than double the target size
1783 : // Add two pages for potential overhead. This should in theory be already
1784 : // accounted for in the target calculation, but for very small targets,
1785 : // we still might easily hit the limit otherwise.
1786 56 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1787 616 : for layer in new_layers.iter() {
1788 616 : if layer.layer_desc().file_size > warn_limit {
1789 0 : warn!(
1790 : %layer,
1791 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1792 : );
1793 616 : }
1794 : }
1795 :
1796 : // The writer.finish() above already did the fsync of the inodes.
1797 : // We just need to fsync the directory in which these inodes are linked,
1798 : // which we know to be the timeline directory.
1799 : //
1800 : // We use fatal_err() below because the after writer.finish() returns with success,
1801 : // the in-memory state of the filesystem already has the layer file in its final place,
1802 : // and subsequent pageserver code could think it's durable while it really isn't.
1803 56 : let timeline_dir = VirtualFile::open(
1804 56 : &self
1805 56 : .conf
1806 56 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1807 56 : ctx,
1808 56 : )
1809 56 : .await
1810 56 : .fatal_err("VirtualFile::open for timeline dir fsync");
1811 56 : timeline_dir
1812 56 : .sync_all()
1813 56 : .await
1814 56 : .fatal_err("VirtualFile::sync_all timeline dir");
1815 0 : }
1816 :
1817 56 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1818 56 : stats.new_deltas_count = Some(new_layers.len());
1819 616 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1820 56 :
1821 56 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1822 56 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1823 : {
1824 56 : Ok(stats_json) => {
1825 56 : info!(
1826 0 : stats_json = stats_json.as_str(),
1827 0 : "compact_level0_phase1 stats available"
1828 : )
1829 : }
1830 0 : Err(e) => {
1831 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1832 : }
1833 : }
1834 :
1835 : // Without this, rustc complains about deltas_to_compact still
1836 : // being borrowed when we `.into_iter()` below.
1837 56 : drop(all_values_iter);
1838 56 :
1839 56 : Ok(CompactLevel0Phase1Result {
1840 56 : new_layers,
1841 56 : deltas_to_compact: deltas_to_compact
1842 56 : .into_iter()
1843 804 : .map(|x| x.drop_eviction_guard())
1844 56 : .collect::<Vec<_>>(),
1845 56 : outcome: if fully_compacted {
1846 56 : CompactionOutcome::Done
1847 : } else {
1848 0 : CompactionOutcome::Pending
1849 : },
1850 : })
1851 728 : }
1852 : }
1853 :
1854 : #[derive(Default)]
1855 : struct CompactLevel0Phase1Result {
1856 : new_layers: Vec<ResidentLayer>,
1857 : deltas_to_compact: Vec<Layer>,
1858 : // Whether we have included all L0 layers, or selected only part of them due to the
1859 : // L0 compaction size limit.
1860 : outcome: CompactionOutcome,
1861 : }
1862 :
1863 : #[derive(Default)]
1864 : struct CompactLevel0Phase1StatsBuilder {
1865 : version: Option<u64>,
1866 : tenant_id: Option<TenantShardId>,
1867 : timeline_id: Option<TimelineId>,
1868 : read_lock_acquisition_micros: DurationRecorder,
1869 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1870 : read_lock_held_key_sort_micros: DurationRecorder,
1871 : read_lock_held_prerequisites_micros: DurationRecorder,
1872 : read_lock_held_compute_holes_micros: DurationRecorder,
1873 : read_lock_drop_micros: DurationRecorder,
1874 : write_layer_files_micros: DurationRecorder,
1875 : level0_deltas_count: Option<usize>,
1876 : new_deltas_count: Option<usize>,
1877 : new_deltas_size: Option<u64>,
1878 : }
1879 :
1880 : #[derive(serde::Serialize)]
1881 : struct CompactLevel0Phase1Stats {
1882 : version: u64,
1883 : tenant_id: TenantShardId,
1884 : timeline_id: TimelineId,
1885 : read_lock_acquisition_micros: RecordedDuration,
1886 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1887 : read_lock_held_key_sort_micros: RecordedDuration,
1888 : read_lock_held_prerequisites_micros: RecordedDuration,
1889 : read_lock_held_compute_holes_micros: RecordedDuration,
1890 : read_lock_drop_micros: RecordedDuration,
1891 : write_layer_files_micros: RecordedDuration,
1892 : level0_deltas_count: usize,
1893 : new_deltas_count: usize,
1894 : new_deltas_size: u64,
1895 : }
1896 :
1897 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1898 : type Error = anyhow::Error;
1899 :
1900 56 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1901 56 : Ok(Self {
1902 56 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1903 56 : tenant_id: value
1904 56 : .tenant_id
1905 56 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1906 56 : timeline_id: value
1907 56 : .timeline_id
1908 56 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1909 56 : read_lock_acquisition_micros: value
1910 56 : .read_lock_acquisition_micros
1911 56 : .into_recorded()
1912 56 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1913 56 : read_lock_held_spawn_blocking_startup_micros: value
1914 56 : .read_lock_held_spawn_blocking_startup_micros
1915 56 : .into_recorded()
1916 56 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1917 56 : read_lock_held_key_sort_micros: value
1918 56 : .read_lock_held_key_sort_micros
1919 56 : .into_recorded()
1920 56 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1921 56 : read_lock_held_prerequisites_micros: value
1922 56 : .read_lock_held_prerequisites_micros
1923 56 : .into_recorded()
1924 56 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1925 56 : read_lock_held_compute_holes_micros: value
1926 56 : .read_lock_held_compute_holes_micros
1927 56 : .into_recorded()
1928 56 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1929 56 : read_lock_drop_micros: value
1930 56 : .read_lock_drop_micros
1931 56 : .into_recorded()
1932 56 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1933 56 : write_layer_files_micros: value
1934 56 : .write_layer_files_micros
1935 56 : .into_recorded()
1936 56 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1937 56 : level0_deltas_count: value
1938 56 : .level0_deltas_count
1939 56 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1940 56 : new_deltas_count: value
1941 56 : .new_deltas_count
1942 56 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1943 56 : new_deltas_size: value
1944 56 : .new_deltas_size
1945 56 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1946 : })
1947 56 : }
1948 : }
1949 :
1950 : impl Timeline {
1951 : /// Entry point for new tiered compaction algorithm.
1952 : ///
1953 : /// All the real work is in the implementation in the pageserver_compaction
1954 : /// crate. The code here would apply to any algorithm implemented by the
1955 : /// same interface, but tiered is the only one at the moment.
1956 : ///
1957 : /// TODO: cancellation
1958 0 : pub(crate) async fn compact_tiered(
1959 0 : self: &Arc<Self>,
1960 0 : _cancel: &CancellationToken,
1961 0 : ctx: &RequestContext,
1962 0 : ) -> Result<(), CompactionError> {
1963 0 : let fanout = self.get_compaction_threshold() as u64;
1964 0 : let target_file_size = self.get_checkpoint_distance();
1965 :
1966 : // Find the top of the historical layers
1967 0 : let end_lsn = {
1968 0 : let guard = self.layers.read().await;
1969 0 : let layers = guard.layer_map()?;
1970 :
1971 0 : let l0_deltas = layers.level0_deltas();
1972 0 :
1973 0 : // As an optimization, if we find that there are too few L0 layers,
1974 0 : // bail out early. We know that the compaction algorithm would do
1975 0 : // nothing in that case.
1976 0 : if l0_deltas.len() < fanout as usize {
1977 : // doesn't need compacting
1978 0 : return Ok(());
1979 0 : }
1980 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1981 0 : };
1982 0 :
1983 0 : // Is the timeline being deleted?
1984 0 : if self.is_stopping() {
1985 0 : trace!("Dropping out of compaction on timeline shutdown");
1986 0 : return Err(CompactionError::ShuttingDown);
1987 0 : }
1988 :
1989 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1990 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1991 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1992 0 :
1993 0 : pageserver_compaction::compact_tiered::compact_tiered(
1994 0 : &mut adaptor,
1995 0 : end_lsn,
1996 0 : target_file_size,
1997 0 : fanout,
1998 0 : ctx,
1999 0 : )
2000 0 : .await
2001 : // TODO: compact_tiered needs to return CompactionError
2002 0 : .map_err(CompactionError::Other)?;
2003 :
2004 0 : adaptor.flush_updates().await?;
2005 0 : Ok(())
2006 0 : }
2007 :
2008 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
2009 : ///
2010 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
2011 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
2012 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
2013 : ///
2014 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
2015 : ///
2016 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
2017 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
2018 : ///
2019 : /// The function will produce:
2020 : ///
2021 : /// ```plain
2022 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
2023 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
2024 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
2025 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
2026 : /// ```
2027 : ///
2028 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
2029 1260 : pub(crate) async fn generate_key_retention(
2030 1260 : self: &Arc<Timeline>,
2031 1260 : key: Key,
2032 1260 : full_history: &[(Key, Lsn, Value)],
2033 1260 : horizon: Lsn,
2034 1260 : retain_lsn_below_horizon: &[Lsn],
2035 1260 : delta_threshold_cnt: usize,
2036 1260 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
2037 1260 : ) -> anyhow::Result<KeyHistoryRetention> {
2038 : // Pre-checks for the invariants
2039 :
2040 1260 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2041 :
2042 1260 : if debug_mode {
2043 3060 : for (log_key, _, _) in full_history {
2044 1800 : assert_eq!(log_key, &key, "mismatched key");
2045 : }
2046 1260 : for i in 1..full_history.len() {
2047 540 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
2048 540 : if full_history[i - 1].1 == full_history[i].1 {
2049 0 : assert!(
2050 0 : matches!(full_history[i - 1].2, Value::Image(_)),
2051 0 : "unordered delta/image, or duplicated delta"
2052 : );
2053 540 : }
2054 : }
2055 : // There was an assertion for no base image that checks if the first
2056 : // record in the history is `will_init` before, but it was removed.
2057 : // This is explained in the test cases for generate_key_retention.
2058 : // Search "incomplete history" for more information.
2059 2820 : for lsn in retain_lsn_below_horizon {
2060 1560 : assert!(lsn < &horizon, "retain lsn must be below horizon")
2061 : }
2062 1260 : for i in 1..retain_lsn_below_horizon.len() {
2063 712 : assert!(
2064 712 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
2065 0 : "unordered LSN"
2066 : );
2067 : }
2068 0 : }
2069 1260 : let has_ancestor = base_img_from_ancestor.is_some();
2070 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
2071 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
2072 1260 : let (mut split_history, lsn_split_points) = {
2073 1260 : let mut split_history = Vec::new();
2074 1260 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
2075 1260 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
2076 2820 : for lsn in retain_lsn_below_horizon {
2077 1560 : lsn_split_points.push(*lsn);
2078 1560 : }
2079 1260 : lsn_split_points.push(horizon);
2080 1260 : let mut current_idx = 0;
2081 3060 : for item @ (_, lsn, _) in full_history {
2082 2288 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
2083 488 : current_idx += 1;
2084 488 : }
2085 1800 : split_history[current_idx].push(item);
2086 : }
2087 1260 : (split_history, lsn_split_points)
2088 : };
2089 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
2090 5340 : for split_for_lsn in &mut split_history {
2091 4080 : let mut prev_lsn = None;
2092 4080 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
2093 4080 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
2094 1800 : if let Some(prev_lsn) = &prev_lsn {
2095 236 : if *prev_lsn == lsn {
2096 : // The case that we have an LSN with both data from the delta layer and the image layer. As
2097 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
2098 : // drop this delta and keep the image.
2099 : //
2100 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
2101 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
2102 : // dropped.
2103 : //
2104 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
2105 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
2106 0 : continue;
2107 236 : }
2108 1564 : }
2109 1800 : prev_lsn = Some(lsn);
2110 1800 : new_split_for_lsn.push(record);
2111 : }
2112 4080 : *split_for_lsn = new_split_for_lsn;
2113 : }
2114 : // Step 3: generate images when necessary
2115 1260 : let mut retention = Vec::with_capacity(split_history.len());
2116 1260 : let mut records_since_last_image = 0;
2117 1260 : let batch_cnt = split_history.len();
2118 1260 : assert!(
2119 1260 : batch_cnt >= 2,
2120 0 : "should have at least below + above horizon batches"
2121 : );
2122 1260 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
2123 1260 : if let Some((key, lsn, img)) = base_img_from_ancestor {
2124 84 : replay_history.push((key, lsn, Value::Image(img)));
2125 1176 : }
2126 :
2127 : /// Generate debug information for the replay history
2128 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
2129 : use std::fmt::Write;
2130 0 : let mut output = String::new();
2131 0 : if let Some((key, _, _)) = replay_history.first() {
2132 0 : write!(output, "key={} ", key).unwrap();
2133 0 : let mut cnt = 0;
2134 0 : for (_, lsn, val) in replay_history {
2135 0 : if val.is_image() {
2136 0 : write!(output, "i@{} ", lsn).unwrap();
2137 0 : } else if val.will_init() {
2138 0 : write!(output, "di@{} ", lsn).unwrap();
2139 0 : } else {
2140 0 : write!(output, "d@{} ", lsn).unwrap();
2141 0 : }
2142 0 : cnt += 1;
2143 0 : if cnt >= 128 {
2144 0 : write!(output, "... and more").unwrap();
2145 0 : break;
2146 0 : }
2147 : }
2148 0 : } else {
2149 0 : write!(output, "<no history>").unwrap();
2150 0 : }
2151 0 : output
2152 0 : }
2153 :
2154 0 : fn generate_debug_trace(
2155 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
2156 0 : full_history: &[(Key, Lsn, Value)],
2157 0 : lsns: &[Lsn],
2158 0 : horizon: Lsn,
2159 0 : ) -> String {
2160 : use std::fmt::Write;
2161 0 : let mut output = String::new();
2162 0 : if let Some(replay_history) = replay_history {
2163 0 : writeln!(
2164 0 : output,
2165 0 : "replay_history: {}",
2166 0 : generate_history_trace(replay_history)
2167 0 : )
2168 0 : .unwrap();
2169 0 : } else {
2170 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
2171 0 : }
2172 0 : writeln!(
2173 0 : output,
2174 0 : "full_history: {}",
2175 0 : generate_history_trace(full_history)
2176 0 : )
2177 0 : .unwrap();
2178 0 : writeln!(
2179 0 : output,
2180 0 : "when processing: [{}] horizon={}",
2181 0 : lsns.iter().map(|l| format!("{l}")).join(","),
2182 0 : horizon
2183 0 : )
2184 0 : .unwrap();
2185 0 : output
2186 0 : }
2187 :
2188 1260 : let mut key_exists = false;
2189 4080 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
2190 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
2191 4080 : records_since_last_image += split_for_lsn.len();
2192 : // Whether to produce an image into the final layer files
2193 4080 : let produce_image = if i == 0 && !has_ancestor {
2194 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
2195 1176 : true
2196 2904 : } else if i == batch_cnt - 1 {
2197 : // Do not generate images for the last batch (above horizon)
2198 1260 : false
2199 1644 : } else if records_since_last_image == 0 {
2200 1288 : false
2201 356 : } else if records_since_last_image >= delta_threshold_cnt {
2202 : // Generate images when there are too many records
2203 12 : true
2204 : } else {
2205 344 : false
2206 : };
2207 4080 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
2208 : // Only retain the items after the last image record
2209 5028 : for idx in (0..replay_history.len()).rev() {
2210 5028 : if replay_history[idx].2.will_init() {
2211 4080 : replay_history = replay_history[idx..].to_vec();
2212 4080 : break;
2213 948 : }
2214 : }
2215 4080 : if replay_history.is_empty() && !key_exists {
2216 : // The key does not exist at earlier LSN, we can skip this iteration.
2217 0 : retention.push(Vec::new());
2218 0 : continue;
2219 4080 : } else {
2220 4080 : key_exists = true;
2221 4080 : }
2222 4080 : let Some((_, _, val)) = replay_history.first() else {
2223 0 : unreachable!("replay history should not be empty once it exists")
2224 : };
2225 4080 : if !val.will_init() {
2226 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
2227 0 : generate_debug_trace(
2228 0 : Some(&replay_history),
2229 0 : full_history,
2230 0 : retain_lsn_below_horizon,
2231 0 : horizon,
2232 0 : )
2233 0 : });
2234 4080 : }
2235 : // Whether to reconstruct the image. In debug mode, we will generate an image
2236 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
2237 : // image into the final layer.
2238 4080 : let generate_image = produce_image || debug_mode;
2239 4080 : if produce_image {
2240 1188 : records_since_last_image = 0;
2241 2892 : }
2242 4080 : let img_and_lsn = if generate_image {
2243 4080 : let replay_history_for_debug = if debug_mode {
2244 4080 : Some(replay_history.clone())
2245 : } else {
2246 0 : None
2247 : };
2248 4080 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
2249 4080 : let history = if produce_image {
2250 1188 : std::mem::take(&mut replay_history)
2251 : } else {
2252 2892 : replay_history.clone()
2253 : };
2254 4080 : let mut img = None;
2255 4080 : let mut records = Vec::with_capacity(history.len());
2256 4080 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
2257 4036 : img = Some((*lsn, val.clone()));
2258 4036 : for (_, lsn, val) in history.into_iter().skip(1) {
2259 920 : let Value::WalRecord(rec) = val else {
2260 0 : return Err(anyhow::anyhow!(
2261 0 : "invalid record, first record is image, expect walrecords"
2262 0 : ))
2263 0 : .with_context(|| {
2264 0 : generate_debug_trace(
2265 0 : replay_history_for_debug_ref,
2266 0 : full_history,
2267 0 : retain_lsn_below_horizon,
2268 0 : horizon,
2269 0 : )
2270 0 : });
2271 : };
2272 920 : records.push((lsn, rec));
2273 : }
2274 : } else {
2275 72 : for (_, lsn, val) in history.into_iter() {
2276 72 : let Value::WalRecord(rec) = val else {
2277 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2278 0 : .with_context(|| generate_debug_trace(
2279 0 : replay_history_for_debug_ref,
2280 0 : full_history,
2281 0 : retain_lsn_below_horizon,
2282 0 : horizon,
2283 0 : ));
2284 : };
2285 72 : records.push((lsn, rec));
2286 : }
2287 : }
2288 4080 : records.reverse();
2289 4080 : let state = ValueReconstructState { img, records };
2290 : // last batch does not generate image so i is always in range, unless we force generate
2291 : // an image during testing
2292 4080 : let request_lsn = if i >= lsn_split_points.len() {
2293 1260 : Lsn::MAX
2294 : } else {
2295 2820 : lsn_split_points[i]
2296 : };
2297 4080 : let img = self.reconstruct_value(key, request_lsn, state).await?;
2298 4080 : Some((request_lsn, img))
2299 : } else {
2300 0 : None
2301 : };
2302 4080 : if produce_image {
2303 1188 : let (request_lsn, img) = img_and_lsn.unwrap();
2304 1188 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2305 1188 : retention.push(vec![(request_lsn, Value::Image(img))]);
2306 2892 : } else {
2307 2892 : let deltas = split_for_lsn
2308 2892 : .iter()
2309 2892 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2310 2892 : .collect_vec();
2311 2892 : retention.push(deltas);
2312 2892 : }
2313 : }
2314 1260 : let mut result = Vec::with_capacity(retention.len());
2315 1260 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2316 4080 : for (idx, logs) in retention.into_iter().enumerate() {
2317 4080 : if idx == lsn_split_points.len() {
2318 1260 : return Ok(KeyHistoryRetention {
2319 1260 : below_horizon: result,
2320 1260 : above_horizon: KeyLogAtLsn(logs),
2321 1260 : });
2322 2820 : } else {
2323 2820 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2324 2820 : }
2325 : }
2326 0 : unreachable!("key retention is empty")
2327 1260 : }
2328 :
2329 : /// Check how much space is left on the disk
2330 104 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2331 104 : let tenants_dir = self.conf.tenants_path();
2332 :
2333 104 : let stat = Statvfs::get(&tenants_dir, None)
2334 104 : .context("statvfs failed, presumably directory got unlinked")?;
2335 :
2336 104 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2337 104 :
2338 104 : Ok(avail_bytes)
2339 104 : }
2340 :
2341 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2342 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2343 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2344 : /// compaction.
2345 104 : async fn check_compaction_space(
2346 104 : self: &Arc<Self>,
2347 104 : layer_selection: &[Layer],
2348 104 : ) -> anyhow::Result<()> {
2349 104 : let available_space = self.check_available_space().await?;
2350 104 : let mut remote_layer_size = 0;
2351 104 : let mut all_layer_size = 0;
2352 408 : for layer in layer_selection {
2353 304 : let needs_download = layer.needs_download().await?;
2354 304 : if needs_download.is_some() {
2355 0 : remote_layer_size += layer.layer_desc().file_size;
2356 304 : }
2357 304 : all_layer_size += layer.layer_desc().file_size;
2358 : }
2359 104 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2360 104 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2361 : {
2362 0 : return Err(anyhow!(
2363 0 : "not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2364 0 : available_space,
2365 0 : allocated_space,
2366 0 : all_layer_size,
2367 0 : remote_layer_size,
2368 0 : all_layer_size + remote_layer_size
2369 0 : ));
2370 104 : }
2371 104 : Ok(())
2372 104 : }
2373 :
2374 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2375 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2376 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2377 : /// here.
2378 108 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2379 108 : let gc_cutoff_lsn = {
2380 108 : let gc_info = self.gc_info.read().unwrap();
2381 108 : gc_info.min_cutoff()
2382 108 : };
2383 108 :
2384 108 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2385 108 : // let watermark = watermark.min(self.standby_horizon.load());
2386 108 :
2387 108 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2388 108 : // them when computing the watermark.
2389 108 : gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
2390 108 : }
2391 :
2392 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2393 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2394 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2395 : /// like a full compaction of the specified keyspace.
2396 0 : pub(crate) async fn gc_compaction_split_jobs(
2397 0 : self: &Arc<Self>,
2398 0 : job: GcCompactJob,
2399 0 : sub_compaction_max_job_size_mb: Option<u64>,
2400 0 : ) -> anyhow::Result<Vec<GcCompactJob>> {
2401 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2402 0 : job.compact_lsn_range.end
2403 : } else {
2404 0 : self.get_gc_compaction_watermark()
2405 : };
2406 :
2407 0 : if compact_below_lsn == Lsn::INVALID {
2408 0 : tracing::warn!(
2409 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2410 : );
2411 0 : return Ok(vec![]);
2412 0 : }
2413 :
2414 : // Split compaction job to about 4GB each
2415 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2416 0 : let sub_compaction_max_job_size_mb =
2417 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2418 0 :
2419 0 : let mut compact_jobs = Vec::<GcCompactJob>::new();
2420 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2421 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2422 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2423 : // Truncate the key range to be within user specified compaction range.
2424 0 : fn truncate_to(
2425 0 : source_start: &Key,
2426 0 : source_end: &Key,
2427 0 : target_start: &Key,
2428 0 : target_end: &Key,
2429 0 : ) -> Option<(Key, Key)> {
2430 0 : let start = source_start.max(target_start);
2431 0 : let end = source_end.min(target_end);
2432 0 : if start < end {
2433 0 : Some((*start, *end))
2434 : } else {
2435 0 : None
2436 : }
2437 0 : }
2438 0 : let mut split_key_ranges = Vec::new();
2439 0 : let ranges = dense_ks
2440 0 : .parts
2441 0 : .iter()
2442 0 : .map(|partition| partition.ranges.iter())
2443 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2444 0 : .flatten()
2445 0 : .cloned()
2446 0 : .collect_vec();
2447 0 : for range in ranges.iter() {
2448 0 : let Some((start, end)) = truncate_to(
2449 0 : &range.start,
2450 0 : &range.end,
2451 0 : &job.compact_key_range.start,
2452 0 : &job.compact_key_range.end,
2453 0 : ) else {
2454 0 : continue;
2455 : };
2456 0 : split_key_ranges.push((start, end));
2457 : }
2458 0 : split_key_ranges.sort();
2459 0 : let all_layers = {
2460 0 : let guard = self.layers.read().await;
2461 0 : let layer_map = guard.layer_map()?;
2462 0 : layer_map.iter_historic_layers().collect_vec()
2463 0 : };
2464 0 : let mut current_start = None;
2465 0 : let ranges_num = split_key_ranges.len();
2466 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2467 0 : if current_start.is_none() {
2468 0 : current_start = Some(start);
2469 0 : }
2470 0 : let start = current_start.unwrap();
2471 0 : if start >= end {
2472 : // We have already processed this partition.
2473 0 : continue;
2474 0 : }
2475 0 : let overlapping_layers = {
2476 0 : let mut desc = Vec::new();
2477 0 : for layer in all_layers.iter() {
2478 0 : if overlaps_with(&layer.get_key_range(), &(start..end))
2479 0 : && layer.get_lsn_range().start <= compact_below_lsn
2480 0 : {
2481 0 : desc.push(layer.clone());
2482 0 : }
2483 : }
2484 0 : desc
2485 0 : };
2486 0 : let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
2487 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2488 : // Try to extend the compaction range so that we include at least one full layer file.
2489 0 : let extended_end = overlapping_layers
2490 0 : .iter()
2491 0 : .map(|layer| layer.key_range.end)
2492 0 : .min();
2493 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2494 : // In this case, we simply use the specified key range end.
2495 0 : let end = if let Some(extended_end) = extended_end {
2496 0 : extended_end.max(end)
2497 : } else {
2498 0 : end
2499 : };
2500 0 : let end = if ranges_num == idx + 1 {
2501 : // extend the compaction range to the end of the key range if it's the last partition
2502 0 : end.max(job.compact_key_range.end)
2503 : } else {
2504 0 : end
2505 : };
2506 0 : if total_size == 0 && !compact_jobs.is_empty() {
2507 0 : info!(
2508 0 : "splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
2509 : start, end, total_size
2510 : );
2511 0 : compact_jobs.last_mut().unwrap().compact_key_range.end = end;
2512 0 : current_start = Some(end);
2513 : } else {
2514 0 : info!(
2515 0 : "splitting compaction job: {}..{}, estimated_size={}",
2516 : start, end, total_size
2517 : );
2518 0 : compact_jobs.push(GcCompactJob {
2519 0 : dry_run: job.dry_run,
2520 0 : compact_key_range: start..end,
2521 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
2522 0 : });
2523 0 : current_start = Some(end);
2524 : }
2525 0 : }
2526 : }
2527 0 : Ok(compact_jobs)
2528 0 : }
2529 :
2530 : /// An experimental compaction building block that combines compaction with garbage collection.
2531 : ///
2532 : /// The current implementation picks all delta + image layers that are below or intersecting with
2533 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
2534 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
2535 : /// and create delta layers with all deltas >= gc horizon.
2536 : ///
2537 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
2538 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
2539 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
2540 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
2541 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
2542 : /// part of the range.
2543 : ///
2544 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
2545 : /// the LSN. Otherwise, it will use the gc cutoff by default.
2546 108 : pub(crate) async fn compact_with_gc(
2547 108 : self: &Arc<Self>,
2548 108 : cancel: &CancellationToken,
2549 108 : options: CompactOptions,
2550 108 : ctx: &RequestContext,
2551 108 : ) -> anyhow::Result<()> {
2552 108 : let sub_compaction = options.sub_compaction;
2553 108 : let job = GcCompactJob::from_compact_options(options.clone());
2554 108 : if sub_compaction {
2555 0 : info!(
2556 0 : "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"
2557 : );
2558 0 : let jobs = self
2559 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
2560 0 : .await?;
2561 0 : let jobs_len = jobs.len();
2562 0 : for (idx, job) in jobs.into_iter().enumerate() {
2563 0 : info!(
2564 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
2565 0 : idx + 1,
2566 : jobs_len
2567 : );
2568 0 : self.compact_with_gc_inner(cancel, job, ctx).await?;
2569 : }
2570 0 : if jobs_len == 0 {
2571 0 : info!("no jobs to run, skipping gc bottom-most compaction");
2572 0 : }
2573 0 : return Ok(());
2574 108 : }
2575 108 : self.compact_with_gc_inner(cancel, job, ctx).await
2576 108 : }
2577 :
2578 108 : async fn compact_with_gc_inner(
2579 108 : self: &Arc<Self>,
2580 108 : cancel: &CancellationToken,
2581 108 : job: GcCompactJob,
2582 108 : ctx: &RequestContext,
2583 108 : ) -> anyhow::Result<()> {
2584 108 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
2585 108 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
2586 108 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
2587 108 :
2588 108 : let gc_lock = async {
2589 108 : tokio::select! {
2590 108 : guard = self.gc_lock.lock() => Ok(guard),
2591 : // TODO: refactor to CompactionError to correctly pass cancelled error
2592 108 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
2593 : }
2594 108 : };
2595 :
2596 108 : let gc_lock = crate::timed(
2597 108 : gc_lock,
2598 108 : "acquires gc lock",
2599 108 : std::time::Duration::from_secs(5),
2600 108 : )
2601 108 : .await?;
2602 :
2603 108 : let dry_run = job.dry_run;
2604 108 : let compact_key_range = job.compact_key_range;
2605 108 : let compact_lsn_range = job.compact_lsn_range;
2606 :
2607 108 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2608 :
2609 108 : info!(
2610 0 : "running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}",
2611 : compact_key_range.start,
2612 : compact_key_range.end,
2613 : compact_lsn_range.start,
2614 : compact_lsn_range.end
2615 : );
2616 :
2617 108 : scopeguard::defer! {
2618 108 : info!("done enhanced gc bottom-most compaction");
2619 108 : };
2620 108 :
2621 108 : let mut stat = CompactionStatistics::default();
2622 :
2623 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
2624 : // The layer selection has the following properties:
2625 : // 1. If a layer is in the selection, all layers below it are in the selection.
2626 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
2627 104 : let job_desc = {
2628 108 : let guard = self.layers.read().await;
2629 108 : let layers = guard.layer_map()?;
2630 108 : let gc_info = self.gc_info.read().unwrap();
2631 108 : let mut retain_lsns_below_horizon = Vec::new();
2632 108 : let gc_cutoff = {
2633 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2634 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2635 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2636 : // to get the truth data.
2637 108 : let real_gc_cutoff = self.get_gc_compaction_watermark();
2638 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2639 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2640 : // the real cutoff.
2641 108 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2642 96 : if real_gc_cutoff == Lsn::INVALID {
2643 : // If the gc_cutoff is not generated yet, we should not compact anything.
2644 0 : tracing::warn!(
2645 0 : "no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction"
2646 : );
2647 0 : return Ok(());
2648 96 : }
2649 96 : real_gc_cutoff
2650 : } else {
2651 12 : compact_lsn_range.end
2652 : };
2653 108 : if gc_cutoff > real_gc_cutoff {
2654 8 : warn!(
2655 0 : "provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff",
2656 : gc_cutoff, real_gc_cutoff
2657 : );
2658 8 : gc_cutoff = real_gc_cutoff;
2659 100 : }
2660 108 : gc_cutoff
2661 : };
2662 140 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
2663 140 : if lsn < &gc_cutoff {
2664 140 : retain_lsns_below_horizon.push(*lsn);
2665 140 : }
2666 : }
2667 108 : for lsn in gc_info.leases.keys() {
2668 0 : if lsn < &gc_cutoff {
2669 0 : retain_lsns_below_horizon.push(*lsn);
2670 0 : }
2671 : }
2672 108 : let mut selected_layers: Vec<Layer> = Vec::new();
2673 108 : drop(gc_info);
2674 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
2675 108 : let Some(max_layer_lsn) = layers
2676 108 : .iter_historic_layers()
2677 488 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
2678 416 : .map(|desc| desc.get_lsn_range().end)
2679 108 : .max()
2680 : else {
2681 0 : info!(
2682 0 : "no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}",
2683 : gc_cutoff
2684 : );
2685 0 : return Ok(());
2686 : };
2687 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
2688 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
2689 : // it is a branch.
2690 108 : let Some(min_layer_lsn) = layers
2691 108 : .iter_historic_layers()
2692 488 : .filter(|desc| {
2693 488 : if compact_lsn_range.start == Lsn::INVALID {
2694 396 : true // select all layers below if start == Lsn(0)
2695 : } else {
2696 92 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
2697 : }
2698 488 : })
2699 452 : .map(|desc| desc.get_lsn_range().start)
2700 108 : .min()
2701 : else {
2702 0 : info!(
2703 0 : "no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}",
2704 : compact_lsn_range.end
2705 : );
2706 0 : return Ok(());
2707 : };
2708 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
2709 : // layers to compact.
2710 108 : let mut rewrite_layers = Vec::new();
2711 488 : for desc in layers.iter_historic_layers() {
2712 488 : if desc.get_lsn_range().end <= max_layer_lsn
2713 416 : && desc.get_lsn_range().start >= min_layer_lsn
2714 380 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
2715 : {
2716 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
2717 : // even if it might contain extra keys
2718 304 : selected_layers.push(guard.get_from_desc(&desc));
2719 304 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
2720 304 : // to overlap image layers)
2721 304 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
2722 4 : {
2723 4 : rewrite_layers.push(desc);
2724 300 : }
2725 184 : }
2726 : }
2727 108 : if selected_layers.is_empty() {
2728 4 : info!(
2729 0 : "no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}",
2730 : gc_cutoff, compact_key_range.start, compact_key_range.end
2731 : );
2732 4 : return Ok(());
2733 104 : }
2734 104 : retain_lsns_below_horizon.sort();
2735 104 : GcCompactionJobDescription {
2736 104 : selected_layers,
2737 104 : gc_cutoff,
2738 104 : retain_lsns_below_horizon,
2739 104 : min_layer_lsn,
2740 104 : max_layer_lsn,
2741 104 : compaction_key_range: compact_key_range,
2742 104 : rewrite_layers,
2743 104 : }
2744 : };
2745 104 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
2746 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
2747 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
2748 16 : (true, job_desc.min_layer_lsn)
2749 88 : } else if self.ancestor_timeline.is_some() {
2750 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
2751 : // LSN ranges all the way to the ancestor timeline.
2752 4 : (true, self.ancestor_lsn)
2753 : } else {
2754 84 : let res = job_desc
2755 84 : .retain_lsns_below_horizon
2756 84 : .first()
2757 84 : .copied()
2758 84 : .unwrap_or(job_desc.gc_cutoff);
2759 84 : if debug_mode {
2760 84 : assert_eq!(
2761 84 : res,
2762 84 : job_desc
2763 84 : .retain_lsns_below_horizon
2764 84 : .iter()
2765 84 : .min()
2766 84 : .copied()
2767 84 : .unwrap_or(job_desc.gc_cutoff)
2768 84 : );
2769 0 : }
2770 84 : (false, res)
2771 : };
2772 104 : info!(
2773 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
2774 0 : job_desc.selected_layers.len(),
2775 0 : job_desc.rewrite_layers.len(),
2776 : job_desc.max_layer_lsn,
2777 : job_desc.min_layer_lsn,
2778 : job_desc.gc_cutoff,
2779 : lowest_retain_lsn,
2780 : job_desc.compaction_key_range.start,
2781 : job_desc.compaction_key_range.end,
2782 : has_data_below,
2783 : );
2784 :
2785 408 : for layer in &job_desc.selected_layers {
2786 304 : debug!("read layer: {}", layer.layer_desc().key());
2787 : }
2788 108 : for layer in &job_desc.rewrite_layers {
2789 4 : debug!("rewrite layer: {}", layer.key());
2790 : }
2791 :
2792 104 : self.check_compaction_space(&job_desc.selected_layers)
2793 104 : .await?;
2794 :
2795 : // Generate statistics for the compaction
2796 408 : for layer in &job_desc.selected_layers {
2797 304 : let desc = layer.layer_desc();
2798 304 : if desc.is_delta() {
2799 172 : stat.visit_delta_layer(desc.file_size());
2800 172 : } else {
2801 132 : stat.visit_image_layer(desc.file_size());
2802 132 : }
2803 : }
2804 :
2805 : // Step 1: construct a k-merge iterator over all layers.
2806 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
2807 104 : let layer_names = job_desc
2808 104 : .selected_layers
2809 104 : .iter()
2810 304 : .map(|layer| layer.layer_desc().layer_name())
2811 104 : .collect_vec();
2812 104 : if let Some(err) = check_valid_layermap(&layer_names) {
2813 0 : bail!(
2814 0 : "gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss",
2815 0 : err
2816 0 : );
2817 104 : }
2818 104 : // The maximum LSN we are processing in this compaction loop
2819 104 : let end_lsn = job_desc
2820 104 : .selected_layers
2821 104 : .iter()
2822 304 : .map(|l| l.layer_desc().lsn_range.end)
2823 104 : .max()
2824 104 : .unwrap();
2825 104 : let mut delta_layers = Vec::new();
2826 104 : let mut image_layers = Vec::new();
2827 104 : let mut downloaded_layers = Vec::new();
2828 104 : let mut total_downloaded_size = 0;
2829 104 : let mut total_layer_size = 0;
2830 408 : for layer in &job_desc.selected_layers {
2831 304 : if layer.needs_download().await?.is_some() {
2832 0 : total_downloaded_size += layer.layer_desc().file_size;
2833 304 : }
2834 304 : total_layer_size += layer.layer_desc().file_size;
2835 304 : let resident_layer = layer.download_and_keep_resident(ctx).await?;
2836 304 : downloaded_layers.push(resident_layer);
2837 : }
2838 104 : info!(
2839 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
2840 0 : total_downloaded_size,
2841 0 : total_layer_size,
2842 0 : total_downloaded_size as f64 / total_layer_size as f64
2843 : );
2844 408 : for resident_layer in &downloaded_layers {
2845 304 : if resident_layer.layer_desc().is_delta() {
2846 172 : let layer = resident_layer.get_as_delta(ctx).await?;
2847 172 : delta_layers.push(layer);
2848 : } else {
2849 132 : let layer = resident_layer.get_as_image(ctx).await?;
2850 132 : image_layers.push(layer);
2851 : }
2852 : }
2853 104 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
2854 104 : let mut merge_iter = FilterIterator::create(
2855 104 : MergeIterator::create(&delta_layers, &image_layers, ctx),
2856 104 : dense_ks,
2857 104 : sparse_ks,
2858 104 : )?;
2859 :
2860 : // Step 2: Produce images+deltas.
2861 104 : let mut accumulated_values = Vec::new();
2862 104 : let mut last_key: Option<Key> = None;
2863 :
2864 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2865 : // when some condition meet.
2866 104 : let mut image_layer_writer = if !has_data_below {
2867 : Some(
2868 84 : SplitImageLayerWriter::new(
2869 84 : self.conf,
2870 84 : self.timeline_id,
2871 84 : self.tenant_shard_id,
2872 84 : job_desc.compaction_key_range.start,
2873 84 : lowest_retain_lsn,
2874 84 : self.get_compaction_target_size(),
2875 84 : ctx,
2876 84 : )
2877 84 : .await?,
2878 : )
2879 : } else {
2880 20 : None
2881 : };
2882 :
2883 104 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
2884 104 : self.conf,
2885 104 : self.timeline_id,
2886 104 : self.tenant_shard_id,
2887 104 : lowest_retain_lsn..end_lsn,
2888 104 : self.get_compaction_target_size(),
2889 104 : )
2890 104 : .await?;
2891 :
2892 : #[derive(Default)]
2893 : struct RewritingLayers {
2894 : before: Option<DeltaLayerWriter>,
2895 : after: Option<DeltaLayerWriter>,
2896 : }
2897 104 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
2898 :
2899 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
2900 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
2901 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
2902 : ///
2903 : /// This function unifies the cases so that later code doesn't have to think about it.
2904 : ///
2905 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2906 : /// is needed for reconstruction. This should be fixed in the future.
2907 : ///
2908 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2909 : /// images.
2910 1244 : async fn get_ancestor_image(
2911 1244 : this_tline: &Arc<Timeline>,
2912 1244 : key: Key,
2913 1244 : ctx: &RequestContext,
2914 1244 : has_data_below: bool,
2915 1244 : history_lsn_point: Lsn,
2916 1244 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2917 1244 : if !has_data_below {
2918 1168 : return Ok(None);
2919 76 : };
2920 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2921 : // as much existing code as possible.
2922 76 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
2923 76 : Ok(Some((key, history_lsn_point, img)))
2924 1244 : }
2925 :
2926 : // Actually, we can decide not to write to the image layer at all at this point because
2927 : // the key and LSN range are determined. However, to keep things simple here, we still
2928 : // create this writer, and discard the writer in the end.
2929 :
2930 1932 : while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
2931 1828 : if cancel.is_cancelled() {
2932 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2933 1828 : }
2934 1828 : if self.shard_identity.is_key_disposable(&key) {
2935 : // If this shard does not need to store this key, simply skip it.
2936 : //
2937 : // This is not handled in the filter iterator because shard is determined by hash.
2938 : // Therefore, it does not give us any performance benefit to do things like skip
2939 : // a whole layer file as handling key spaces (ranges).
2940 0 : if cfg!(debug_assertions) {
2941 0 : let shard = self.shard_identity.shard_index();
2942 0 : let owner = self.shard_identity.get_shard_number(&key);
2943 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
2944 0 : }
2945 0 : continue;
2946 1828 : }
2947 1828 : if !job_desc.compaction_key_range.contains(&key) {
2948 128 : if !desc.is_delta {
2949 120 : continue;
2950 8 : }
2951 8 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
2952 8 : let rewriter = if key < job_desc.compaction_key_range.start {
2953 0 : if rewriter.before.is_none() {
2954 0 : rewriter.before = Some(
2955 0 : DeltaLayerWriter::new(
2956 0 : self.conf,
2957 0 : self.timeline_id,
2958 0 : self.tenant_shard_id,
2959 0 : desc.key_range.start,
2960 0 : desc.lsn_range.clone(),
2961 0 : ctx,
2962 0 : )
2963 0 : .await?,
2964 : );
2965 0 : }
2966 0 : rewriter.before.as_mut().unwrap()
2967 8 : } else if key >= job_desc.compaction_key_range.end {
2968 8 : if rewriter.after.is_none() {
2969 4 : rewriter.after = Some(
2970 4 : DeltaLayerWriter::new(
2971 4 : self.conf,
2972 4 : self.timeline_id,
2973 4 : self.tenant_shard_id,
2974 4 : job_desc.compaction_key_range.end,
2975 4 : desc.lsn_range.clone(),
2976 4 : ctx,
2977 4 : )
2978 4 : .await?,
2979 : );
2980 4 : }
2981 8 : rewriter.after.as_mut().unwrap()
2982 : } else {
2983 0 : unreachable!()
2984 : };
2985 8 : rewriter.put_value(key, lsn, val, ctx).await?;
2986 8 : continue;
2987 1700 : }
2988 1700 : match val {
2989 1220 : Value::Image(_) => stat.visit_image_key(&val),
2990 480 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2991 : }
2992 1700 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2993 560 : if last_key.is_none() {
2994 104 : last_key = Some(key);
2995 456 : }
2996 560 : accumulated_values.push((key, lsn, val));
2997 : } else {
2998 1140 : let last_key: &mut Key = last_key.as_mut().unwrap();
2999 1140 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
3000 1140 : let retention = self
3001 1140 : .generate_key_retention(
3002 1140 : *last_key,
3003 1140 : &accumulated_values,
3004 1140 : job_desc.gc_cutoff,
3005 1140 : &job_desc.retain_lsns_below_horizon,
3006 1140 : COMPACTION_DELTA_THRESHOLD,
3007 1140 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
3008 1140 : .await?,
3009 : )
3010 1140 : .await?;
3011 1140 : retention
3012 1140 : .pipe_to(
3013 1140 : *last_key,
3014 1140 : &mut delta_layer_writer,
3015 1140 : image_layer_writer.as_mut(),
3016 1140 : &mut stat,
3017 1140 : ctx,
3018 1140 : )
3019 1140 : .await?;
3020 1140 : accumulated_values.clear();
3021 1140 : *last_key = key;
3022 1140 : accumulated_values.push((key, lsn, val));
3023 : }
3024 : }
3025 :
3026 : // TODO: move the below part to the loop body
3027 104 : let last_key = last_key.expect("no keys produced during compaction");
3028 104 : stat.on_unique_key_visited();
3029 :
3030 104 : let retention = self
3031 104 : .generate_key_retention(
3032 104 : last_key,
3033 104 : &accumulated_values,
3034 104 : job_desc.gc_cutoff,
3035 104 : &job_desc.retain_lsns_below_horizon,
3036 104 : COMPACTION_DELTA_THRESHOLD,
3037 104 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
3038 : )
3039 104 : .await?;
3040 104 : retention
3041 104 : .pipe_to(
3042 104 : last_key,
3043 104 : &mut delta_layer_writer,
3044 104 : image_layer_writer.as_mut(),
3045 104 : &mut stat,
3046 104 : ctx,
3047 104 : )
3048 104 : .await?;
3049 : // end: move the above part to the loop body
3050 :
3051 104 : let mut rewrote_delta_layers = Vec::new();
3052 108 : for (key, writers) in delta_layer_rewriters {
3053 4 : if let Some(delta_writer_before) = writers.before {
3054 0 : let (desc, path) = delta_writer_before
3055 0 : .finish(job_desc.compaction_key_range.start, ctx)
3056 0 : .await?;
3057 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
3058 0 : rewrote_delta_layers.push(layer);
3059 4 : }
3060 4 : if let Some(delta_writer_after) = writers.after {
3061 4 : let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
3062 4 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
3063 4 : rewrote_delta_layers.push(layer);
3064 0 : }
3065 : }
3066 :
3067 148 : let discard = |key: &PersistentLayerKey| {
3068 148 : let key = key.clone();
3069 148 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
3070 148 : };
3071 :
3072 104 : let produced_image_layers = if let Some(writer) = image_layer_writer {
3073 84 : if !dry_run {
3074 76 : let end_key = job_desc.compaction_key_range.end;
3075 76 : writer
3076 76 : .finish_with_discard_fn(self, ctx, end_key, discard)
3077 76 : .await?
3078 : } else {
3079 8 : drop(writer);
3080 8 : Vec::new()
3081 : }
3082 : } else {
3083 20 : Vec::new()
3084 : };
3085 :
3086 104 : let produced_delta_layers = if !dry_run {
3087 96 : delta_layer_writer
3088 96 : .finish_with_discard_fn(self, ctx, discard)
3089 96 : .await?
3090 : } else {
3091 8 : drop(delta_layer_writer);
3092 8 : Vec::new()
3093 : };
3094 :
3095 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
3096 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
3097 104 : let mut compact_to = Vec::new();
3098 104 : let mut keep_layers = HashSet::new();
3099 104 : let produced_delta_layers_len = produced_delta_layers.len();
3100 104 : let produced_image_layers_len = produced_image_layers.len();
3101 176 : for action in produced_delta_layers {
3102 72 : match action {
3103 44 : BatchWriterResult::Produced(layer) => {
3104 44 : if cfg!(debug_assertions) {
3105 44 : info!("produced delta layer: {}", layer.layer_desc().key());
3106 0 : }
3107 44 : stat.produce_delta_layer(layer.layer_desc().file_size());
3108 44 : compact_to.push(layer);
3109 : }
3110 28 : BatchWriterResult::Discarded(l) => {
3111 28 : if cfg!(debug_assertions) {
3112 28 : info!("discarded delta layer: {}", l);
3113 0 : }
3114 28 : keep_layers.insert(l);
3115 28 : stat.discard_delta_layer();
3116 : }
3117 : }
3118 : }
3119 108 : for layer in &rewrote_delta_layers {
3120 4 : debug!(
3121 0 : "produced rewritten delta layer: {}",
3122 0 : layer.layer_desc().key()
3123 : );
3124 : }
3125 104 : compact_to.extend(rewrote_delta_layers);
3126 180 : for action in produced_image_layers {
3127 76 : match action {
3128 60 : BatchWriterResult::Produced(layer) => {
3129 60 : debug!("produced image layer: {}", layer.layer_desc().key());
3130 60 : stat.produce_image_layer(layer.layer_desc().file_size());
3131 60 : compact_to.push(layer);
3132 : }
3133 16 : BatchWriterResult::Discarded(l) => {
3134 16 : debug!("discarded image layer: {}", l);
3135 16 : keep_layers.insert(l);
3136 16 : stat.discard_image_layer();
3137 : }
3138 : }
3139 : }
3140 :
3141 104 : let mut layer_selection = job_desc.selected_layers;
3142 :
3143 : // Partial compaction might select more data than it processes, e.g., if
3144 : // the compaction_key_range only partially overlaps:
3145 : //
3146 : // [---compaction_key_range---]
3147 : // [---A----][----B----][----C----][----D----]
3148 : //
3149 : // For delta layers, we will rewrite the layers so that it is cut exactly at
3150 : // the compaction key range, so we can always discard them. However, for image
3151 : // layers, as we do not rewrite them for now, we need to handle them differently.
3152 : // Assume image layers A, B, C, D are all in the `layer_selection`.
3153 : //
3154 : // The created image layers contain whatever is needed from B, C, and from
3155 : // `----]` of A, and from `[---` of D.
3156 : //
3157 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
3158 : // keep that data.
3159 : //
3160 : // The solution for now is to keep A and D completely if they are image layers.
3161 : // (layer_selection is what we'll remove from the layer map, so, retain what
3162 : // is _not_ fully covered by compaction_key_range).
3163 408 : for layer in &layer_selection {
3164 304 : if !layer.layer_desc().is_delta() {
3165 132 : if !overlaps_with(
3166 132 : &layer.layer_desc().key_range,
3167 132 : &job_desc.compaction_key_range,
3168 132 : ) {
3169 0 : bail!("violated constraint: image layer outside of compaction key range");
3170 132 : }
3171 132 : if !fully_contains(
3172 132 : &job_desc.compaction_key_range,
3173 132 : &layer.layer_desc().key_range,
3174 132 : ) {
3175 16 : keep_layers.insert(layer.layer_desc().key());
3176 116 : }
3177 172 : }
3178 : }
3179 :
3180 304 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
3181 104 :
3182 104 : info!(
3183 0 : "gc-compaction statistics: {}",
3184 0 : serde_json::to_string(&stat)?
3185 : );
3186 :
3187 104 : if dry_run {
3188 8 : return Ok(());
3189 96 : }
3190 96 :
3191 96 : info!(
3192 0 : "produced {} delta layers and {} image layers, {} layers are kept",
3193 0 : produced_delta_layers_len,
3194 0 : produced_image_layers_len,
3195 0 : keep_layers.len()
3196 : );
3197 :
3198 : // Step 3: Place back to the layer map.
3199 :
3200 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
3201 96 : let all_layers = {
3202 96 : let guard = self.layers.read().await;
3203 96 : let layer_map = guard.layer_map()?;
3204 96 : layer_map.iter_historic_layers().collect_vec()
3205 96 : };
3206 96 :
3207 96 : let mut final_layers = all_layers
3208 96 : .iter()
3209 428 : .map(|layer| layer.layer_name())
3210 96 : .collect::<HashSet<_>>();
3211 304 : for layer in &layer_selection {
3212 208 : final_layers.remove(&layer.layer_desc().layer_name());
3213 208 : }
3214 204 : for layer in &compact_to {
3215 108 : final_layers.insert(layer.layer_desc().layer_name());
3216 108 : }
3217 96 : let final_layers = final_layers.into_iter().collect_vec();
3218 :
3219 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
3220 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
3221 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
3222 96 : if let Some(err) = check_valid_layermap(&final_layers) {
3223 0 : bail!(
3224 0 : "gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss",
3225 0 : err
3226 0 : );
3227 96 : }
3228 :
3229 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
3230 : // operate on L1 layers.
3231 : {
3232 : // Gc-compaction will rewrite the history of a key. This could happen in two ways:
3233 : //
3234 : // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
3235 : // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
3236 : // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
3237 : // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
3238 : // now gets stored in I at the compact LSN.
3239 : //
3240 : // --------------- ---------------
3241 : // delta1@LSN20 image1@LSN20
3242 : // --------------- (read path collects delta@LSN20, => --------------- (read path cannot find anything
3243 : // delta1@LSN10 yields) below LSN 20)
3244 : // ---------------
3245 : //
3246 : // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
3247 : // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
3248 : // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
3249 : //
3250 : // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
3251 : // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
3252 : // layer containing 4, yields, and we update the layer map to put the delta layer.
3253 : //
3254 : // --------------- ---------------
3255 : // delta1@LSN4 image1@LSN4
3256 : // --------------- (read path collects delta@LSN4, => --------------- (read path collects LSN4 and LSN1,
3257 : // delta1@LSN1-3 yields) delta1@LSN1 which is an invalid history)
3258 : // --------------- ---------------
3259 : //
3260 : // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
3261 : // and only allow reads to continue after the update is finished.
3262 :
3263 96 : let update_guard = self.gc_compaction_layer_update_lock.write().await;
3264 : // Acquiring the update guard ensures current read operations end and new read operations are blocked.
3265 : // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
3266 96 : let mut guard = self.layers.write().await;
3267 96 : guard
3268 96 : .open_mut()?
3269 96 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
3270 96 : drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
3271 96 : };
3272 96 :
3273 96 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
3274 96 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
3275 96 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
3276 96 : // be batched into `schedule_compaction_update`.
3277 96 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
3278 96 : self.schedule_uploads(disk_consistent_lsn, None)?;
3279 : // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
3280 : // of `compact_from`.
3281 96 : let compact_from = {
3282 96 : let mut compact_from = Vec::new();
3283 96 : let mut compact_to_set = HashMap::new();
3284 204 : for layer in &compact_to {
3285 108 : compact_to_set.insert(layer.layer_desc().key(), layer);
3286 108 : }
3287 304 : for layer in &layer_selection {
3288 208 : if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
3289 0 : tracing::info!(
3290 0 : "skipping delete {} because found same layer key at different generation {}",
3291 : layer,
3292 : to
3293 : );
3294 208 : } else {
3295 208 : compact_from.push(layer.clone());
3296 208 : }
3297 : }
3298 96 : compact_from
3299 96 : };
3300 96 : self.remote_client
3301 96 : .schedule_compaction_update(&compact_from, &compact_to)?;
3302 :
3303 96 : drop(gc_lock);
3304 96 :
3305 96 : Ok(())
3306 108 : }
3307 : }
3308 :
3309 : struct TimelineAdaptor {
3310 : timeline: Arc<Timeline>,
3311 :
3312 : keyspace: (Lsn, KeySpace),
3313 :
3314 : new_deltas: Vec<ResidentLayer>,
3315 : new_images: Vec<ResidentLayer>,
3316 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
3317 : }
3318 :
3319 : impl TimelineAdaptor {
3320 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
3321 0 : Self {
3322 0 : timeline: timeline.clone(),
3323 0 : keyspace,
3324 0 : new_images: Vec::new(),
3325 0 : new_deltas: Vec::new(),
3326 0 : layers_to_delete: Vec::new(),
3327 0 : }
3328 0 : }
3329 :
3330 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
3331 0 : let layers_to_delete = {
3332 0 : let guard = self.timeline.layers.read().await;
3333 0 : self.layers_to_delete
3334 0 : .iter()
3335 0 : .map(|x| guard.get_from_desc(x))
3336 0 : .collect::<Vec<Layer>>()
3337 0 : };
3338 0 : self.timeline
3339 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
3340 0 : .await?;
3341 :
3342 0 : self.timeline
3343 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
3344 :
3345 0 : self.new_deltas.clear();
3346 0 : self.layers_to_delete.clear();
3347 0 : Ok(())
3348 0 : }
3349 : }
3350 :
3351 : #[derive(Clone)]
3352 : struct ResidentDeltaLayer(ResidentLayer);
3353 : #[derive(Clone)]
3354 : struct ResidentImageLayer(ResidentLayer);
3355 :
3356 : impl CompactionJobExecutor for TimelineAdaptor {
3357 : type Key = pageserver_api::key::Key;
3358 :
3359 : type Layer = OwnArc<PersistentLayerDesc>;
3360 : type DeltaLayer = ResidentDeltaLayer;
3361 : type ImageLayer = ResidentImageLayer;
3362 :
3363 : type RequestContext = crate::context::RequestContext;
3364 :
3365 0 : fn get_shard_identity(&self) -> &ShardIdentity {
3366 0 : self.timeline.get_shard_identity()
3367 0 : }
3368 :
3369 0 : async fn get_layers(
3370 0 : &mut self,
3371 0 : key_range: &Range<Key>,
3372 0 : lsn_range: &Range<Lsn>,
3373 0 : _ctx: &RequestContext,
3374 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
3375 0 : self.flush_updates().await?;
3376 :
3377 0 : let guard = self.timeline.layers.read().await;
3378 0 : let layer_map = guard.layer_map()?;
3379 :
3380 0 : let result = layer_map
3381 0 : .iter_historic_layers()
3382 0 : .filter(|l| {
3383 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
3384 0 : })
3385 0 : .map(OwnArc)
3386 0 : .collect();
3387 0 : Ok(result)
3388 0 : }
3389 :
3390 0 : async fn get_keyspace(
3391 0 : &mut self,
3392 0 : key_range: &Range<Key>,
3393 0 : lsn: Lsn,
3394 0 : _ctx: &RequestContext,
3395 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
3396 0 : if lsn == self.keyspace.0 {
3397 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
3398 0 : &self.keyspace.1.ranges,
3399 0 : key_range,
3400 0 : ))
3401 : } else {
3402 : // The current compaction implementation only ever requests the key space
3403 : // at the compaction end LSN.
3404 0 : anyhow::bail!("keyspace not available for requested lsn");
3405 : }
3406 0 : }
3407 :
3408 0 : async fn downcast_delta_layer(
3409 0 : &self,
3410 0 : layer: &OwnArc<PersistentLayerDesc>,
3411 0 : ctx: &RequestContext,
3412 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
3413 0 : // this is a lot more complex than a simple downcast...
3414 0 : if layer.is_delta() {
3415 0 : let l = {
3416 0 : let guard = self.timeline.layers.read().await;
3417 0 : guard.get_from_desc(layer)
3418 : };
3419 0 : let result = l.download_and_keep_resident(ctx).await?;
3420 :
3421 0 : Ok(Some(ResidentDeltaLayer(result)))
3422 : } else {
3423 0 : Ok(None)
3424 : }
3425 0 : }
3426 :
3427 0 : async fn create_image(
3428 0 : &mut self,
3429 0 : lsn: Lsn,
3430 0 : key_range: &Range<Key>,
3431 0 : ctx: &RequestContext,
3432 0 : ) -> anyhow::Result<()> {
3433 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
3434 0 : }
3435 :
3436 0 : async fn create_delta(
3437 0 : &mut self,
3438 0 : lsn_range: &Range<Lsn>,
3439 0 : key_range: &Range<Key>,
3440 0 : input_layers: &[ResidentDeltaLayer],
3441 0 : ctx: &RequestContext,
3442 0 : ) -> anyhow::Result<()> {
3443 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3444 :
3445 0 : let mut all_entries = Vec::new();
3446 0 : for dl in input_layers.iter() {
3447 0 : all_entries.extend(dl.load_keys(ctx).await?);
3448 : }
3449 :
3450 : // The current stdlib sorting implementation is designed in a way where it is
3451 : // particularly fast where the slice is made up of sorted sub-ranges.
3452 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3453 :
3454 0 : let mut writer = DeltaLayerWriter::new(
3455 0 : self.timeline.conf,
3456 0 : self.timeline.timeline_id,
3457 0 : self.timeline.tenant_shard_id,
3458 0 : key_range.start,
3459 0 : lsn_range.clone(),
3460 0 : ctx,
3461 0 : )
3462 0 : .await?;
3463 :
3464 0 : let mut dup_values = 0;
3465 0 :
3466 0 : // This iterator walks through all key-value pairs from all the layers
3467 0 : // we're compacting, in key, LSN order.
3468 0 : let mut prev: Option<(Key, Lsn)> = None;
3469 : for &DeltaEntry {
3470 0 : key, lsn, ref val, ..
3471 0 : } in all_entries.iter()
3472 : {
3473 0 : if prev == Some((key, lsn)) {
3474 : // This is a duplicate. Skip it.
3475 : //
3476 : // It can happen if compaction is interrupted after writing some
3477 : // layers but not all, and we are compacting the range again.
3478 : // The calculations in the algorithm assume that there are no
3479 : // duplicates, so the math on targeted file size is likely off,
3480 : // and we will create smaller files than expected.
3481 0 : dup_values += 1;
3482 0 : continue;
3483 0 : }
3484 :
3485 0 : let value = val.load(ctx).await?;
3486 :
3487 0 : writer.put_value(key, lsn, value, ctx).await?;
3488 :
3489 0 : prev = Some((key, lsn));
3490 : }
3491 :
3492 0 : if dup_values > 0 {
3493 0 : warn!("delta layer created with {} duplicate values", dup_values);
3494 0 : }
3495 :
3496 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3497 0 : Err(anyhow::anyhow!(
3498 0 : "failpoint delta-layer-writer-fail-before-finish"
3499 0 : ))
3500 0 : });
3501 :
3502 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
3503 0 : let new_delta_layer =
3504 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
3505 :
3506 0 : self.new_deltas.push(new_delta_layer);
3507 0 : Ok(())
3508 0 : }
3509 :
3510 0 : async fn delete_layer(
3511 0 : &mut self,
3512 0 : layer: &OwnArc<PersistentLayerDesc>,
3513 0 : _ctx: &RequestContext,
3514 0 : ) -> anyhow::Result<()> {
3515 0 : self.layers_to_delete.push(layer.clone().0);
3516 0 : Ok(())
3517 0 : }
3518 : }
3519 :
3520 : impl TimelineAdaptor {
3521 0 : async fn create_image_impl(
3522 0 : &mut self,
3523 0 : lsn: Lsn,
3524 0 : key_range: &Range<Key>,
3525 0 : ctx: &RequestContext,
3526 0 : ) -> Result<(), CreateImageLayersError> {
3527 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
3528 :
3529 0 : let image_layer_writer = ImageLayerWriter::new(
3530 0 : self.timeline.conf,
3531 0 : self.timeline.timeline_id,
3532 0 : self.timeline.tenant_shard_id,
3533 0 : key_range,
3534 0 : lsn,
3535 0 : ctx,
3536 0 : )
3537 0 : .await?;
3538 :
3539 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3540 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3541 0 : "failpoint image-layer-writer-fail-before-finish"
3542 0 : )))
3543 0 : });
3544 :
3545 0 : let keyspace = KeySpace {
3546 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
3547 : };
3548 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
3549 0 : let outcome = self
3550 0 : .timeline
3551 0 : .create_image_layer_for_rel_blocks(
3552 0 : &keyspace,
3553 0 : image_layer_writer,
3554 0 : lsn,
3555 0 : ctx,
3556 0 : key_range.clone(),
3557 0 : IoConcurrency::sequential(),
3558 0 : )
3559 0 : .await?;
3560 :
3561 : if let ImageLayerCreationOutcome::Generated {
3562 0 : unfinished_image_layer,
3563 0 : } = outcome
3564 : {
3565 0 : let (desc, path) = unfinished_image_layer.finish(ctx).await?;
3566 0 : let image_layer =
3567 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
3568 0 : self.new_images.push(image_layer);
3569 0 : }
3570 :
3571 0 : timer.stop_and_record();
3572 0 :
3573 0 : Ok(())
3574 0 : }
3575 : }
3576 :
3577 : impl CompactionRequestContext for crate::context::RequestContext {}
3578 :
3579 : #[derive(Debug, Clone)]
3580 : pub struct OwnArc<T>(pub Arc<T>);
3581 :
3582 : impl<T> Deref for OwnArc<T> {
3583 : type Target = <Arc<T> as Deref>::Target;
3584 0 : fn deref(&self) -> &Self::Target {
3585 0 : &self.0
3586 0 : }
3587 : }
3588 :
3589 : impl<T> AsRef<T> for OwnArc<T> {
3590 0 : fn as_ref(&self) -> &T {
3591 0 : self.0.as_ref()
3592 0 : }
3593 : }
3594 :
3595 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
3596 0 : fn key_range(&self) -> &Range<Key> {
3597 0 : &self.key_range
3598 0 : }
3599 0 : fn lsn_range(&self) -> &Range<Lsn> {
3600 0 : &self.lsn_range
3601 0 : }
3602 0 : fn file_size(&self) -> u64 {
3603 0 : self.file_size
3604 0 : }
3605 0 : fn short_id(&self) -> std::string::String {
3606 0 : self.as_ref().short_id().to_string()
3607 0 : }
3608 0 : fn is_delta(&self) -> bool {
3609 0 : self.as_ref().is_delta()
3610 0 : }
3611 : }
3612 :
3613 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
3614 0 : fn key_range(&self) -> &Range<Key> {
3615 0 : &self.layer_desc().key_range
3616 0 : }
3617 0 : fn lsn_range(&self) -> &Range<Lsn> {
3618 0 : &self.layer_desc().lsn_range
3619 0 : }
3620 0 : fn file_size(&self) -> u64 {
3621 0 : self.layer_desc().file_size
3622 0 : }
3623 0 : fn short_id(&self) -> std::string::String {
3624 0 : self.layer_desc().short_id().to_string()
3625 0 : }
3626 0 : fn is_delta(&self) -> bool {
3627 0 : true
3628 0 : }
3629 : }
3630 :
3631 : use crate::tenant::timeline::DeltaEntry;
3632 :
3633 : impl CompactionLayer<Key> for ResidentDeltaLayer {
3634 0 : fn key_range(&self) -> &Range<Key> {
3635 0 : &self.0.layer_desc().key_range
3636 0 : }
3637 0 : fn lsn_range(&self) -> &Range<Lsn> {
3638 0 : &self.0.layer_desc().lsn_range
3639 0 : }
3640 0 : fn file_size(&self) -> u64 {
3641 0 : self.0.layer_desc().file_size
3642 0 : }
3643 0 : fn short_id(&self) -> std::string::String {
3644 0 : self.0.layer_desc().short_id().to_string()
3645 0 : }
3646 0 : fn is_delta(&self) -> bool {
3647 0 : true
3648 0 : }
3649 : }
3650 :
3651 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
3652 : type DeltaEntry<'a> = DeltaEntry<'a>;
3653 :
3654 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
3655 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
3656 0 : }
3657 : }
3658 :
3659 : impl CompactionLayer<Key> for ResidentImageLayer {
3660 0 : fn key_range(&self) -> &Range<Key> {
3661 0 : &self.0.layer_desc().key_range
3662 0 : }
3663 0 : fn lsn_range(&self) -> &Range<Lsn> {
3664 0 : &self.0.layer_desc().lsn_range
3665 0 : }
3666 0 : fn file_size(&self) -> u64 {
3667 0 : self.0.layer_desc().file_size
3668 0 : }
3669 0 : fn short_id(&self) -> std::string::String {
3670 0 : self.0.layer_desc().short_id().to_string()
3671 0 : }
3672 0 : fn is_delta(&self) -> bool {
3673 0 : false
3674 0 : }
3675 : }
3676 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|