Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
14 : RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::models::CompactInfoResponse;
25 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
26 : use serde::Serialize;
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::{debug, info, info_span, trace, warn, Instrument};
29 : use utils::id::TimelineId;
30 :
31 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
32 : use crate::page_cache;
33 : use crate::statvfs::Statvfs;
34 : use crate::tenant::checks::check_valid_layermap;
35 : use crate::tenant::gc_block::GcBlock;
36 : use crate::tenant::remote_timeline_client::WaitCompletionError;
37 : use crate::tenant::storage_layer::batch_split_writer::{
38 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
39 : };
40 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
41 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
42 : use crate::tenant::storage_layer::{
43 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
44 : };
45 : use crate::tenant::timeline::ImageLayerCreationOutcome;
46 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
47 : use crate::tenant::timeline::{Layer, ResidentLayer};
48 : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
49 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
50 : use pageserver_api::config::tenant_conf_defaults::{
51 : DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
52 : };
53 :
54 : use pageserver_api::key::Key;
55 : use pageserver_api::keyspace::KeySpace;
56 : use pageserver_api::record::NeonWalRecord;
57 : use pageserver_api::value::Value;
58 :
59 : use utils::lsn::Lsn;
60 :
61 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
62 : use pageserver_compaction::interface::*;
63 :
64 : use super::CompactionError;
65 :
66 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
67 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
68 :
69 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
70 : pub struct GcCompactionJobId(pub usize);
71 :
72 : impl std::fmt::Display for GcCompactionJobId {
73 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 0 : write!(f, "{}", self.0)
75 0 : }
76 : }
77 :
78 : #[derive(Debug, Clone)]
79 : pub enum GcCompactionQueueItem {
80 : Manual(CompactOptions),
81 : SubCompactionJob(CompactOptions),
82 : #[allow(dead_code)]
83 : UpdateL2Lsn(Lsn),
84 : Notify(GcCompactionJobId),
85 : }
86 :
87 : impl GcCompactionQueueItem {
88 0 : pub fn into_compact_info_resp(
89 0 : self,
90 0 : id: GcCompactionJobId,
91 0 : running: bool,
92 0 : ) -> Option<CompactInfoResponse> {
93 0 : match self {
94 0 : GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
95 0 : compact_key_range: options.compact_key_range,
96 0 : compact_lsn_range: options.compact_lsn_range,
97 0 : sub_compaction: options.sub_compaction,
98 0 : running,
99 0 : job_id: id.0,
100 0 : }),
101 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
102 0 : compact_key_range: options.compact_key_range,
103 0 : compact_lsn_range: options.compact_lsn_range,
104 0 : sub_compaction: options.sub_compaction,
105 0 : running,
106 0 : job_id: id.0,
107 0 : }),
108 0 : GcCompactionQueueItem::UpdateL2Lsn(_) => None,
109 0 : GcCompactionQueueItem::Notify(_) => None,
110 : }
111 0 : }
112 : }
113 :
114 : struct GcCompactionQueueInner {
115 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
116 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
117 : notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
118 : gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
119 : last_id: GcCompactionJobId,
120 : }
121 :
122 : impl GcCompactionQueueInner {
123 0 : fn next_id(&mut self) -> GcCompactionJobId {
124 0 : let id = self.last_id;
125 0 : self.last_id = GcCompactionJobId(id.0 + 1);
126 0 : id
127 0 : }
128 : }
129 :
130 : /// A structure to store gc_compaction jobs.
131 : pub struct GcCompactionQueue {
132 : /// All items in the queue, and the currently-running job.
133 : inner: std::sync::Mutex<GcCompactionQueueInner>,
134 : /// Ensure only one thread is consuming the queue.
135 : consumer_lock: tokio::sync::Mutex<()>,
136 : }
137 :
138 : impl GcCompactionQueue {
139 0 : pub fn new() -> Self {
140 0 : GcCompactionQueue {
141 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
142 0 : running: None,
143 0 : queued: VecDeque::new(),
144 0 : notify: HashMap::new(),
145 0 : gc_guards: HashMap::new(),
146 0 : last_id: GcCompactionJobId(0),
147 0 : }),
148 0 : consumer_lock: tokio::sync::Mutex::new(()),
149 0 : }
150 0 : }
151 :
152 0 : pub fn cancel_scheduled(&self) {
153 0 : let mut guard = self.inner.lock().unwrap();
154 0 : guard.queued.clear();
155 0 : guard.notify.clear();
156 0 : guard.gc_guards.clear();
157 0 : }
158 :
159 : /// Schedule a manual compaction job.
160 0 : pub fn schedule_manual_compaction(
161 0 : &self,
162 0 : options: CompactOptions,
163 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
164 0 : ) -> GcCompactionJobId {
165 0 : let mut guard = self.inner.lock().unwrap();
166 0 : let id = guard.next_id();
167 0 : guard
168 0 : .queued
169 0 : .push_back((id, GcCompactionQueueItem::Manual(options)));
170 0 : if let Some(notify) = notify {
171 0 : guard.notify.insert(id, notify);
172 0 : }
173 0 : info!("scheduled compaction job id={}", id);
174 0 : id
175 0 : }
176 :
177 : /// Trigger an auto compaction.
178 : #[allow(dead_code)]
179 0 : pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
180 :
181 : /// Notify the caller the job has finished and unblock GC.
182 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
183 0 : info!("compaction job id={} finished", id);
184 0 : let mut guard = self.inner.lock().unwrap();
185 0 : if let Some(blocking) = guard.gc_guards.remove(&id) {
186 0 : drop(blocking)
187 0 : }
188 0 : if let Some(tx) = guard.notify.remove(&id) {
189 0 : let _ = tx.send(());
190 0 : }
191 0 : }
192 :
193 0 : async fn handle_sub_compaction(
194 0 : &self,
195 0 : id: GcCompactionJobId,
196 0 : options: CompactOptions,
197 0 : timeline: &Arc<Timeline>,
198 0 : gc_block: &GcBlock,
199 0 : ) -> Result<(), CompactionError> {
200 0 : info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
201 0 : let jobs: Vec<GcCompactJob> = timeline
202 0 : .gc_compaction_split_jobs(
203 0 : GcCompactJob::from_compact_options(options.clone()),
204 0 : options.sub_compaction_max_job_size_mb,
205 0 : )
206 0 : .await
207 0 : .map_err(CompactionError::Other)?;
208 0 : if jobs.is_empty() {
209 0 : info!("no jobs to run, skipping scheduled compaction task");
210 0 : self.notify_and_unblock(id);
211 : } else {
212 0 : let gc_guard = match gc_block.start().await {
213 0 : Ok(guard) => guard,
214 0 : Err(e) => {
215 0 : return Err(CompactionError::Other(anyhow!(
216 0 : "cannot run gc-compaction because gc is blocked: {}",
217 0 : e
218 0 : )));
219 : }
220 : };
221 :
222 0 : let jobs_len = jobs.len();
223 0 : let mut pending_tasks = Vec::new();
224 0 : for job in jobs {
225 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
226 : // until we do further refactors to allow directly call `compact_with_gc`.
227 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
228 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
229 0 : if job.dry_run {
230 0 : flags |= CompactFlags::DryRun;
231 0 : }
232 0 : let options = CompactOptions {
233 0 : flags,
234 0 : sub_compaction: false,
235 0 : compact_key_range: Some(job.compact_key_range.into()),
236 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
237 0 : sub_compaction_max_job_size_mb: None,
238 0 : };
239 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
240 : }
241 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id));
242 0 : {
243 0 : let mut guard = self.inner.lock().unwrap();
244 0 : guard.gc_guards.insert(id, gc_guard);
245 0 : let mut tasks = Vec::new();
246 0 : for task in pending_tasks {
247 0 : let id = guard.next_id();
248 0 : tasks.push((id, task));
249 0 : }
250 0 : tasks.reverse();
251 0 : for item in tasks {
252 0 : guard.queued.push_front(item);
253 0 : }
254 : }
255 0 : info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
256 : }
257 0 : Ok(())
258 0 : }
259 :
260 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
261 0 : pub async fn iteration(
262 0 : &self,
263 0 : cancel: &CancellationToken,
264 0 : ctx: &RequestContext,
265 0 : gc_block: &GcBlock,
266 0 : timeline: &Arc<Timeline>,
267 0 : ) -> Result<bool, CompactionError> {
268 0 : let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
269 : let has_pending_tasks;
270 0 : let (id, item) = {
271 0 : let mut guard = self.inner.lock().unwrap();
272 0 : let Some((id, item)) = guard.queued.pop_front() else {
273 0 : return Ok(false);
274 : };
275 0 : guard.running = Some((id, item.clone()));
276 0 : has_pending_tasks = !guard.queued.is_empty();
277 0 : (id, item)
278 0 : };
279 0 :
280 0 : match item {
281 0 : GcCompactionQueueItem::Manual(options) => {
282 0 : if !options
283 0 : .flags
284 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
285 : {
286 0 : warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
287 0 : } else if options.sub_compaction {
288 0 : self.handle_sub_compaction(id, options, timeline, gc_block)
289 0 : .await?;
290 : } else {
291 0 : let gc_guard = match gc_block.start().await {
292 0 : Ok(guard) => guard,
293 0 : Err(e) => {
294 0 : return Err(CompactionError::Other(anyhow!(
295 0 : "cannot run gc-compaction because gc is blocked: {}",
296 0 : e
297 0 : )));
298 : }
299 : };
300 0 : {
301 0 : let mut guard = self.inner.lock().unwrap();
302 0 : guard.gc_guards.insert(id, gc_guard);
303 0 : }
304 0 : let _ = timeline
305 0 : .compact_with_options(cancel, options, ctx)
306 0 : .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
307 0 : .await?;
308 0 : self.notify_and_unblock(id);
309 : }
310 : }
311 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
312 0 : let _ = timeline
313 0 : .compact_with_options(cancel, options, ctx)
314 0 : .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
315 0 : .await?;
316 : }
317 0 : GcCompactionQueueItem::Notify(id) => {
318 0 : self.notify_and_unblock(id);
319 0 : }
320 : GcCompactionQueueItem::UpdateL2Lsn(_) => {
321 0 : unreachable!()
322 : }
323 : }
324 0 : {
325 0 : let mut guard = self.inner.lock().unwrap();
326 0 : guard.running = None;
327 0 : }
328 0 : Ok(has_pending_tasks)
329 0 : }
330 :
331 : #[allow(clippy::type_complexity)]
332 0 : pub fn remaining_jobs(
333 0 : &self,
334 0 : ) -> (
335 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
336 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
337 0 : ) {
338 0 : let guard = self.inner.lock().unwrap();
339 0 : (guard.running.clone(), guard.queued.clone())
340 0 : }
341 :
342 : #[allow(dead_code)]
343 0 : pub fn remaining_jobs_num(&self) -> usize {
344 0 : let guard = self.inner.lock().unwrap();
345 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
346 0 : }
347 : }
348 :
349 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
350 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
351 : /// called.
352 : #[derive(Debug, Clone)]
353 : pub(crate) struct GcCompactJob {
354 : pub dry_run: bool,
355 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
356 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
357 : pub compact_key_range: Range<Key>,
358 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
359 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
360 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
361 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
362 : pub compact_lsn_range: Range<Lsn>,
363 : }
364 :
365 : impl GcCompactJob {
366 54 : pub fn from_compact_options(options: CompactOptions) -> Self {
367 54 : GcCompactJob {
368 54 : dry_run: options.flags.contains(CompactFlags::DryRun),
369 54 : compact_key_range: options
370 54 : .compact_key_range
371 54 : .map(|x| x.into())
372 54 : .unwrap_or(Key::MIN..Key::MAX),
373 54 : compact_lsn_range: options
374 54 : .compact_lsn_range
375 54 : .map(|x| x.into())
376 54 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
377 54 : }
378 54 : }
379 : }
380 :
381 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
382 : /// and contains the exact layers we want to compact.
383 : pub struct GcCompactionJobDescription {
384 : /// All layers to read in the compaction job
385 : selected_layers: Vec<Layer>,
386 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
387 : /// keep all deltas <= this LSN or generate an image == this LSN.
388 : gc_cutoff: Lsn,
389 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
390 : /// generate an image == this LSN.
391 : retain_lsns_below_horizon: Vec<Lsn>,
392 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
393 : /// \>= this LSN will be kept and will not be rewritten.
394 : max_layer_lsn: Lsn,
395 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
396 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
397 : /// k-merge within gc-compaction.
398 : min_layer_lsn: Lsn,
399 : /// Only compact layers overlapping with this range.
400 : compaction_key_range: Range<Key>,
401 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
402 : /// This field is here solely for debugging. The field will not be read once the compaction
403 : /// description is generated.
404 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
405 : }
406 :
407 : /// The result of bottom-most compaction for a single key at each LSN.
408 : #[derive(Debug)]
409 : #[cfg_attr(test, derive(PartialEq))]
410 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
411 :
412 : /// The result of bottom-most compaction.
413 : #[derive(Debug)]
414 : #[cfg_attr(test, derive(PartialEq))]
415 : pub(crate) struct KeyHistoryRetention {
416 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
417 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
418 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
419 : pub(crate) above_horizon: KeyLogAtLsn,
420 : }
421 :
422 : impl KeyHistoryRetention {
423 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
424 : ///
425 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
426 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
427 : /// And we have branches at LSN 0x10, 0x20, 0x30.
428 : /// Then we delete branch @ 0x20.
429 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
430 : /// And that wouldnt' change the shape of the layer.
431 : ///
432 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
433 : ///
434 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
435 74 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
436 74 : if dry_run {
437 0 : return true;
438 74 : }
439 74 : let guard = tline.layers.read().await;
440 74 : if !guard.contains_key(key) {
441 52 : return false;
442 22 : }
443 22 : let layer_generation = guard.get_from_key(key).metadata().generation;
444 22 : drop(guard);
445 22 : if layer_generation == tline.generation {
446 22 : info!(
447 : key=%key,
448 : ?layer_generation,
449 0 : "discard layer due to duplicated layer key in the same generation",
450 : );
451 22 : true
452 : } else {
453 0 : false
454 : }
455 74 : }
456 :
457 : /// Pipe a history of a single key to the writers.
458 : ///
459 : /// If `image_writer` is none, the images will be placed into the delta layers.
460 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
461 : #[allow(clippy::too_many_arguments)]
462 622 : async fn pipe_to(
463 622 : self,
464 622 : key: Key,
465 622 : delta_writer: &mut SplitDeltaLayerWriter,
466 622 : mut image_writer: Option<&mut SplitImageLayerWriter>,
467 622 : stat: &mut CompactionStatistics,
468 622 : ctx: &RequestContext,
469 622 : ) -> anyhow::Result<()> {
470 622 : let mut first_batch = true;
471 2012 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
472 1390 : if first_batch {
473 622 : if logs.len() == 1 && logs[0].1.is_image() {
474 584 : let Value::Image(img) = &logs[0].1 else {
475 0 : unreachable!()
476 : };
477 584 : stat.produce_image_key(img);
478 584 : if let Some(image_writer) = image_writer.as_mut() {
479 584 : image_writer.put_image(key, img.clone(), ctx).await?;
480 : } else {
481 0 : delta_writer
482 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
483 0 : .await?;
484 : }
485 : } else {
486 66 : for (lsn, val) in logs {
487 28 : stat.produce_key(&val);
488 28 : delta_writer.put_value(key, lsn, val, ctx).await?;
489 : }
490 : }
491 622 : first_batch = false;
492 : } else {
493 884 : for (lsn, val) in logs {
494 116 : stat.produce_key(&val);
495 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
496 : }
497 : }
498 : }
499 622 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
500 680 : for (lsn, val) in above_horizon_logs {
501 58 : stat.produce_key(&val);
502 58 : delta_writer.put_value(key, lsn, val, ctx).await?;
503 : }
504 622 : Ok(())
505 622 : }
506 : }
507 :
508 : #[derive(Debug, Serialize, Default)]
509 : struct CompactionStatisticsNumSize {
510 : num: u64,
511 : size: u64,
512 : }
513 :
514 : #[derive(Debug, Serialize, Default)]
515 : pub struct CompactionStatistics {
516 : delta_layer_visited: CompactionStatisticsNumSize,
517 : image_layer_visited: CompactionStatisticsNumSize,
518 : delta_layer_produced: CompactionStatisticsNumSize,
519 : image_layer_produced: CompactionStatisticsNumSize,
520 : num_delta_layer_discarded: usize,
521 : num_image_layer_discarded: usize,
522 : num_unique_keys_visited: usize,
523 : wal_keys_visited: CompactionStatisticsNumSize,
524 : image_keys_visited: CompactionStatisticsNumSize,
525 : wal_produced: CompactionStatisticsNumSize,
526 : image_produced: CompactionStatisticsNumSize,
527 : }
528 :
529 : impl CompactionStatistics {
530 1042 : fn estimated_size_of_value(val: &Value) -> usize {
531 432 : match val {
532 610 : Value::Image(img) => img.len(),
533 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
534 432 : _ => std::mem::size_of::<NeonWalRecord>(),
535 : }
536 1042 : }
537 1636 : fn estimated_size_of_key() -> usize {
538 1636 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
539 1636 : }
540 86 : fn visit_delta_layer(&mut self, size: u64) {
541 86 : self.delta_layer_visited.num += 1;
542 86 : self.delta_layer_visited.size += size;
543 86 : }
544 66 : fn visit_image_layer(&mut self, size: u64) {
545 66 : self.image_layer_visited.num += 1;
546 66 : self.image_layer_visited.size += size;
547 66 : }
548 622 : fn on_unique_key_visited(&mut self) {
549 622 : self.num_unique_keys_visited += 1;
550 622 : }
551 240 : fn visit_wal_key(&mut self, val: &Value) {
552 240 : self.wal_keys_visited.num += 1;
553 240 : self.wal_keys_visited.size +=
554 240 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
555 240 : }
556 610 : fn visit_image_key(&mut self, val: &Value) {
557 610 : self.image_keys_visited.num += 1;
558 610 : self.image_keys_visited.size +=
559 610 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
560 610 : }
561 202 : fn produce_key(&mut self, val: &Value) {
562 202 : match val {
563 10 : Value::Image(img) => self.produce_image_key(img),
564 192 : Value::WalRecord(_) => self.produce_wal_key(val),
565 : }
566 202 : }
567 192 : fn produce_wal_key(&mut self, val: &Value) {
568 192 : self.wal_produced.num += 1;
569 192 : self.wal_produced.size +=
570 192 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
571 192 : }
572 594 : fn produce_image_key(&mut self, val: &Bytes) {
573 594 : self.image_produced.num += 1;
574 594 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
575 594 : }
576 14 : fn discard_delta_layer(&mut self) {
577 14 : self.num_delta_layer_discarded += 1;
578 14 : }
579 8 : fn discard_image_layer(&mut self) {
580 8 : self.num_image_layer_discarded += 1;
581 8 : }
582 22 : fn produce_delta_layer(&mut self, size: u64) {
583 22 : self.delta_layer_produced.num += 1;
584 22 : self.delta_layer_produced.size += size;
585 22 : }
586 30 : fn produce_image_layer(&mut self, size: u64) {
587 30 : self.image_layer_produced.num += 1;
588 30 : self.image_layer_produced.size += size;
589 30 : }
590 : }
591 :
592 : impl Timeline {
593 : /// TODO: cancellation
594 : ///
595 : /// Returns whether the compaction has pending tasks.
596 364 : pub(crate) async fn compact_legacy(
597 364 : self: &Arc<Self>,
598 364 : cancel: &CancellationToken,
599 364 : options: CompactOptions,
600 364 : ctx: &RequestContext,
601 364 : ) -> Result<bool, CompactionError> {
602 364 : if options
603 364 : .flags
604 364 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
605 : {
606 0 : self.compact_with_gc(cancel, options, ctx)
607 0 : .await
608 0 : .map_err(CompactionError::Other)?;
609 0 : return Ok(false);
610 364 : }
611 364 :
612 364 : if options.flags.contains(CompactFlags::DryRun) {
613 0 : return Err(CompactionError::Other(anyhow!(
614 0 : "dry-run mode is not supported for legacy compaction for now"
615 0 : )));
616 364 : }
617 364 :
618 364 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
619 : // maybe useful in the future? could implement this at some point
620 0 : return Err(CompactionError::Other(anyhow!(
621 0 : "compaction range is not supported for legacy compaction for now"
622 0 : )));
623 364 : }
624 364 :
625 364 : // High level strategy for compaction / image creation:
626 364 : //
627 364 : // 1. First, calculate the desired "partitioning" of the
628 364 : // currently in-use key space. The goal is to partition the
629 364 : // key space into roughly fixed-size chunks, but also take into
630 364 : // account any existing image layers, and try to align the
631 364 : // chunk boundaries with the existing image layers to avoid
632 364 : // too much churn. Also try to align chunk boundaries with
633 364 : // relation boundaries. In principle, we don't know about
634 364 : // relation boundaries here, we just deal with key-value
635 364 : // pairs, and the code in pgdatadir_mapping.rs knows how to
636 364 : // map relations into key-value pairs. But in practice we know
637 364 : // that 'field6' is the block number, and the fields 1-5
638 364 : // identify a relation. This is just an optimization,
639 364 : // though.
640 364 : //
641 364 : // 2. Once we know the partitioning, for each partition,
642 364 : // decide if it's time to create a new image layer. The
643 364 : // criteria is: there has been too much "churn" since the last
644 364 : // image layer? The "churn" is fuzzy concept, it's a
645 364 : // combination of too many delta files, or too much WAL in
646 364 : // total in the delta file. Or perhaps: if creating an image
647 364 : // file would allow to delete some older files.
648 364 : //
649 364 : // 3. After that, we compact all level0 delta files if there
650 364 : // are too many of them. While compacting, we also garbage
651 364 : // collect any page versions that are no longer needed because
652 364 : // of the new image layers we created in step 2.
653 364 : //
654 364 : // TODO: This high level strategy hasn't been implemented yet.
655 364 : // Below are functions compact_level0() and create_image_layers()
656 364 : // but they are a bit ad hoc and don't quite work like it's explained
657 364 : // above. Rewrite it.
658 364 :
659 364 : // Is the timeline being deleted?
660 364 : if self.is_stopping() {
661 0 : trace!("Dropping out of compaction on timeline shutdown");
662 0 : return Err(CompactionError::ShuttingDown);
663 364 : }
664 364 :
665 364 : let target_file_size = self.get_checkpoint_distance();
666 :
667 : // Define partitioning schema if needed
668 :
669 : // FIXME: the match should only cover repartitioning, not the next steps
670 364 : let (partition_count, has_pending_tasks) = match self
671 364 : .repartition(
672 364 : self.get_last_record_lsn(),
673 364 : self.get_compaction_target_size(),
674 364 : options.flags,
675 364 : ctx,
676 364 : )
677 364 : .await
678 : {
679 364 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
680 364 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
681 364 : let image_ctx = RequestContextBuilder::extend(ctx)
682 364 : .access_stats_behavior(AccessStatsBehavior::Skip)
683 364 : .build();
684 364 :
685 364 : // 2. Compact
686 364 : let timer = self.metrics.compact_time_histo.start_timer();
687 364 : let fully_compacted = self
688 364 : .compact_level0(
689 364 : target_file_size,
690 364 : options.flags.contains(CompactFlags::ForceL0Compaction),
691 364 : ctx,
692 364 : )
693 364 : .await?;
694 364 : timer.stop_and_record();
695 364 :
696 364 : let mut partitioning = dense_partitioning;
697 364 : partitioning
698 364 : .parts
699 364 : .extend(sparse_partitioning.into_dense().parts);
700 364 :
701 364 : // 3. Create new image layers for partitions that have been modified
702 364 : // "enough". Skip image layer creation if L0 compaction cannot keep up.
703 364 : if fully_compacted {
704 364 : let image_layers = self
705 364 : .create_image_layers(
706 364 : &partitioning,
707 364 : lsn,
708 364 : if options
709 364 : .flags
710 364 : .contains(CompactFlags::ForceImageLayerCreation)
711 : {
712 14 : ImageLayerCreationMode::Force
713 : } else {
714 350 : ImageLayerCreationMode::Try
715 : },
716 364 : &image_ctx,
717 364 : )
718 364 : .await?;
719 :
720 364 : self.upload_new_image_layers(image_layers)?;
721 : } else {
722 0 : info!("skipping image layer generation due to L0 compaction did not include all layers.");
723 : }
724 364 : (partitioning.parts.len(), !fully_compacted)
725 : }
726 0 : Err(err) => {
727 0 : // no partitioning? This is normal, if the timeline was just created
728 0 : // as an empty timeline. Also in unit tests, when we use the timeline
729 0 : // as a simple key-value store, ignoring the datadir layout. Log the
730 0 : // error but continue.
731 0 : //
732 0 : // Suppress error when it's due to cancellation
733 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
734 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
735 0 : }
736 0 : (1, false)
737 : }
738 : };
739 :
740 364 : if self.shard_identity.count >= ShardCount::new(2) {
741 : // Limit the number of layer rewrites to the number of partitions: this means its
742 : // runtime should be comparable to a full round of image layer creations, rather than
743 : // being potentially much longer.
744 0 : let rewrite_max = partition_count;
745 0 :
746 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
747 364 : }
748 :
749 364 : Ok(has_pending_tasks)
750 364 : }
751 :
752 : /// Check for layers that are elegible to be rewritten:
753 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
754 : /// we don't indefinitely retain keys in this shard that aren't needed.
755 : /// - For future use: layers beyond pitr_interval that are in formats we would
756 : /// rather not maintain compatibility with indefinitely.
757 : ///
758 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
759 : /// how much work it will try to do in each compaction pass.
760 0 : async fn compact_shard_ancestors(
761 0 : self: &Arc<Self>,
762 0 : rewrite_max: usize,
763 0 : ctx: &RequestContext,
764 0 : ) -> Result<(), CompactionError> {
765 0 : let mut drop_layers = Vec::new();
766 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
767 0 :
768 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
769 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
770 0 : // pitr_interval, for example because a branchpoint references it.
771 0 : //
772 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
773 0 : // are rewriting layers.
774 0 : let latest_gc_cutoff = self.get_latest_gc_cutoff_lsn();
775 0 :
776 0 : tracing::info!(
777 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
778 0 : *latest_gc_cutoff,
779 0 : self.gc_info.read().unwrap().cutoffs.time
780 : );
781 :
782 0 : let layers = self.layers.read().await;
783 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
784 0 : let layer = layers.get_from_desc(&layer_desc);
785 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
786 : // This layer does not belong to a historic ancestor, no need to re-image it.
787 0 : continue;
788 0 : }
789 0 :
790 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
791 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
792 0 : let layer_local_page_count = sharded_range.page_count();
793 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
794 0 : if layer_local_page_count == 0 {
795 : // This ancestral layer only covers keys that belong to other shards.
796 : // We include the full metadata in the log: if we had some critical bug that caused
797 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
798 0 : info!(%layer, old_metadata=?layer.metadata(),
799 0 : "dropping layer after shard split, contains no keys for this shard.",
800 : );
801 :
802 0 : if cfg!(debug_assertions) {
803 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
804 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
805 : // should be !is_key_disposable()
806 0 : let range = layer_desc.get_key_range();
807 0 : let mut key = range.start;
808 0 : while key < range.end {
809 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
810 0 : key = key.next();
811 : }
812 0 : }
813 :
814 0 : drop_layers.push(layer);
815 0 : continue;
816 0 : } else if layer_local_page_count != u32::MAX
817 0 : && layer_local_page_count == layer_raw_page_count
818 : {
819 0 : debug!(%layer,
820 0 : "layer is entirely shard local ({} keys), no need to filter it",
821 : layer_local_page_count
822 : );
823 0 : continue;
824 0 : }
825 0 :
826 0 : // Don't bother re-writing a layer unless it will at least halve its size
827 0 : if layer_local_page_count != u32::MAX
828 0 : && layer_local_page_count > layer_raw_page_count / 2
829 : {
830 0 : debug!(%layer,
831 0 : "layer is already mostly local ({}/{}), not rewriting",
832 : layer_local_page_count,
833 : layer_raw_page_count
834 : );
835 0 : }
836 :
837 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
838 : // without incurring the I/O cost of a rewrite.
839 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
840 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
841 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
842 0 : continue;
843 0 : }
844 0 :
845 0 : if layer_desc.is_delta() {
846 : // We do not yet implement rewrite of delta layers
847 0 : debug!(%layer, "Skipping rewrite of delta layer");
848 0 : continue;
849 0 : }
850 0 :
851 0 : // Only rewrite layers if their generations differ. This guarantees:
852 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
853 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
854 0 : if layer.metadata().generation == self.generation {
855 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
856 0 : continue;
857 0 : }
858 0 :
859 0 : if layers_to_rewrite.len() >= rewrite_max {
860 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
861 0 : layers_to_rewrite.len()
862 : );
863 0 : continue;
864 0 : }
865 0 :
866 0 : // Fall through: all our conditions for doing a rewrite passed.
867 0 : layers_to_rewrite.push(layer);
868 : }
869 :
870 : // Drop read lock on layer map before we start doing time-consuming I/O
871 0 : drop(layers);
872 0 :
873 0 : let mut replace_image_layers = Vec::new();
874 :
875 0 : for layer in layers_to_rewrite {
876 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
877 0 : let mut image_layer_writer = ImageLayerWriter::new(
878 0 : self.conf,
879 0 : self.timeline_id,
880 0 : self.tenant_shard_id,
881 0 : &layer.layer_desc().key_range,
882 0 : layer.layer_desc().image_layer_lsn(),
883 0 : ctx,
884 0 : )
885 0 : .await
886 0 : .map_err(CompactionError::Other)?;
887 :
888 : // Safety of layer rewrites:
889 : // - We are writing to a different local file path than we are reading from, so the old Layer
890 : // cannot interfere with the new one.
891 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
892 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
893 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
894 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
895 : // - Any readers that have a reference to the old layer will keep it alive until they are done
896 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
897 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
898 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
899 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
900 : // - ingestion, which only inserts layers, therefore cannot collide with us.
901 0 : let resident = layer.download_and_keep_resident().await?;
902 :
903 0 : let keys_written = resident
904 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
905 0 : .await?;
906 :
907 0 : if keys_written > 0 {
908 0 : let (desc, path) = image_layer_writer
909 0 : .finish(ctx)
910 0 : .await
911 0 : .map_err(CompactionError::Other)?;
912 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
913 0 : .map_err(CompactionError::Other)?;
914 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
915 0 : layer.metadata().file_size,
916 0 : new_layer.metadata().file_size);
917 :
918 0 : replace_image_layers.push((layer, new_layer));
919 0 : } else {
920 0 : // Drop the old layer. Usually for this case we would already have noticed that
921 0 : // the layer has no data for us with the ShardedRange check above, but
922 0 : drop_layers.push(layer);
923 0 : }
924 : }
925 :
926 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
927 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
928 : // to remote index) and be removed. This is inefficient but safe.
929 0 : fail::fail_point!("compact-shard-ancestors-localonly");
930 0 :
931 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
932 0 : self.rewrite_layers(replace_image_layers, drop_layers)
933 0 : .await?;
934 :
935 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
936 0 :
937 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
938 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
939 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
940 0 : // load.
941 0 : match self.remote_client.wait_completion().await {
942 0 : Ok(()) => (),
943 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
944 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
945 0 : return Err(CompactionError::ShuttingDown)
946 : }
947 : }
948 :
949 0 : fail::fail_point!("compact-shard-ancestors-persistent");
950 0 :
951 0 : Ok(())
952 0 : }
953 :
954 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
955 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
956 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
957 : ///
958 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
959 : /// that we know won't be needed for reads.
960 226 : pub(super) async fn update_layer_visibility(
961 226 : &self,
962 226 : ) -> Result<(), super::layer_manager::Shutdown> {
963 226 : let head_lsn = self.get_last_record_lsn();
964 :
965 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
966 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
967 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
968 : // they will be subject to L0->L1 compaction in the near future.
969 226 : let layer_manager = self.layers.read().await;
970 226 : let layer_map = layer_manager.layer_map()?;
971 :
972 226 : let readable_points = {
973 226 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
974 226 :
975 226 : let mut readable_points = Vec::with_capacity(children.len() + 1);
976 226 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
977 0 : if *is_offloaded == MaybeOffloaded::Yes {
978 0 : continue;
979 0 : }
980 0 : readable_points.push(*child_lsn);
981 : }
982 226 : readable_points.push(head_lsn);
983 226 : readable_points
984 226 : };
985 226 :
986 226 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
987 572 : for (layer_desc, visibility) in layer_visibility {
988 346 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
989 346 : let layer = layer_manager.get_from_desc(&layer_desc);
990 346 : layer.set_visibility(visibility);
991 346 : }
992 :
993 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
994 : // avoid assuming that everything at a branch point is visible.
995 226 : drop(covered);
996 226 : Ok(())
997 226 : }
998 :
999 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1000 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1001 364 : async fn compact_level0(
1002 364 : self: &Arc<Self>,
1003 364 : target_file_size: u64,
1004 364 : force_compaction_ignore_threshold: bool,
1005 364 : ctx: &RequestContext,
1006 364 : ) -> Result<bool, CompactionError> {
1007 : let CompactLevel0Phase1Result {
1008 364 : new_layers,
1009 364 : deltas_to_compact,
1010 364 : fully_compacted,
1011 : } = {
1012 364 : let phase1_span = info_span!("compact_level0_phase1");
1013 364 : let ctx = ctx.attached_child();
1014 364 : let mut stats = CompactLevel0Phase1StatsBuilder {
1015 364 : version: Some(2),
1016 364 : tenant_id: Some(self.tenant_shard_id),
1017 364 : timeline_id: Some(self.timeline_id),
1018 364 : ..Default::default()
1019 364 : };
1020 364 :
1021 364 : let begin = tokio::time::Instant::now();
1022 364 : let phase1_layers_locked = self.layers.read().await;
1023 364 : let now = tokio::time::Instant::now();
1024 364 : stats.read_lock_acquisition_micros =
1025 364 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1026 364 : self.compact_level0_phase1(
1027 364 : phase1_layers_locked,
1028 364 : stats,
1029 364 : target_file_size,
1030 364 : force_compaction_ignore_threshold,
1031 364 : &ctx,
1032 364 : )
1033 364 : .instrument(phase1_span)
1034 364 : .await?
1035 : };
1036 :
1037 364 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1038 : // nothing to do
1039 336 : return Ok(true);
1040 28 : }
1041 28 :
1042 28 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1043 28 : .await?;
1044 28 : Ok(fully_compacted)
1045 364 : }
1046 :
1047 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1048 364 : async fn compact_level0_phase1<'a>(
1049 364 : self: &'a Arc<Self>,
1050 364 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1051 364 : mut stats: CompactLevel0Phase1StatsBuilder,
1052 364 : target_file_size: u64,
1053 364 : force_compaction_ignore_threshold: bool,
1054 364 : ctx: &RequestContext,
1055 364 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1056 364 : stats.read_lock_held_spawn_blocking_startup_micros =
1057 364 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1058 364 : let layers = guard.layer_map()?;
1059 364 : let level0_deltas = layers.level0_deltas();
1060 364 : stats.level0_deltas_count = Some(level0_deltas.len());
1061 364 :
1062 364 : // Only compact if enough layers have accumulated.
1063 364 : let threshold = self.get_compaction_threshold();
1064 364 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1065 336 : if force_compaction_ignore_threshold {
1066 0 : if !level0_deltas.is_empty() {
1067 0 : info!(
1068 0 : level0_deltas = level0_deltas.len(),
1069 0 : threshold, "too few deltas to compact, but forcing compaction"
1070 : );
1071 : } else {
1072 0 : info!(
1073 0 : level0_deltas = level0_deltas.len(),
1074 0 : threshold, "too few deltas to compact, cannot force compaction"
1075 : );
1076 0 : return Ok(CompactLevel0Phase1Result::default());
1077 : }
1078 : } else {
1079 336 : debug!(
1080 0 : level0_deltas = level0_deltas.len(),
1081 0 : threshold, "too few deltas to compact"
1082 : );
1083 336 : return Ok(CompactLevel0Phase1Result::default());
1084 : }
1085 28 : }
1086 :
1087 28 : let mut level0_deltas = level0_deltas
1088 28 : .iter()
1089 402 : .map(|x| guard.get_from_desc(x))
1090 28 : .collect::<Vec<_>>();
1091 28 :
1092 28 : // Gather the files to compact in this iteration.
1093 28 : //
1094 28 : // Start with the oldest Level 0 delta file, and collect any other
1095 28 : // level 0 files that form a contiguous sequence, such that the end
1096 28 : // LSN of previous file matches the start LSN of the next file.
1097 28 : //
1098 28 : // Note that if the files don't form such a sequence, we might
1099 28 : // "compact" just a single file. That's a bit pointless, but it allows
1100 28 : // us to get rid of the level 0 file, and compact the other files on
1101 28 : // the next iteration. This could probably made smarter, but such
1102 28 : // "gaps" in the sequence of level 0 files should only happen in case
1103 28 : // of a crash, partial download from cloud storage, or something like
1104 28 : // that, so it's not a big deal in practice.
1105 748 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1106 28 : let mut level0_deltas_iter = level0_deltas.iter();
1107 28 :
1108 28 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1109 28 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1110 28 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1111 28 :
1112 28 : // Accumulate the size of layers in `deltas_to_compact`
1113 28 : let mut deltas_to_compact_bytes = 0;
1114 28 :
1115 28 : // Under normal circumstances, we will accumulate up to compaction_interval L0s of size
1116 28 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1117 28 : // work in this function to only operate on this much delta data at once.
1118 28 : //
1119 28 : // Take the max of the configured value & the default, so that tests that configure tiny values
1120 28 : // can still use a sensible amount of memory, but if a deployed system configures bigger values we
1121 28 : // still let them compact a full stack of L0s in one go.
1122 28 : let delta_size_limit = std::cmp::max(
1123 28 : self.get_compaction_threshold(),
1124 28 : DEFAULT_COMPACTION_THRESHOLD,
1125 28 : ) as u64
1126 28 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1127 28 :
1128 28 : let mut fully_compacted = true;
1129 28 :
1130 28 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
1131 402 : for l in level0_deltas_iter {
1132 374 : let lsn_range = &l.layer_desc().lsn_range;
1133 374 :
1134 374 : if lsn_range.start != prev_lsn_end {
1135 0 : break;
1136 374 : }
1137 374 : deltas_to_compact.push(l.download_and_keep_resident().await?);
1138 374 : deltas_to_compact_bytes += l.metadata().file_size;
1139 374 : prev_lsn_end = lsn_range.end;
1140 374 :
1141 374 : if deltas_to_compact_bytes >= delta_size_limit {
1142 0 : info!(
1143 0 : l0_deltas_selected = deltas_to_compact.len(),
1144 0 : l0_deltas_total = level0_deltas.len(),
1145 0 : "L0 compaction picker hit max delta layer size limit: {}",
1146 : delta_size_limit
1147 : );
1148 0 : fully_compacted = false;
1149 0 :
1150 0 : // Proceed with compaction, but only a subset of L0s
1151 0 : break;
1152 374 : }
1153 : }
1154 28 : let lsn_range = Range {
1155 28 : start: deltas_to_compact
1156 28 : .first()
1157 28 : .unwrap()
1158 28 : .layer_desc()
1159 28 : .lsn_range
1160 28 : .start,
1161 28 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1162 28 : };
1163 28 :
1164 28 : info!(
1165 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1166 0 : lsn_range.start,
1167 0 : lsn_range.end,
1168 0 : deltas_to_compact.len(),
1169 0 : level0_deltas.len()
1170 : );
1171 :
1172 402 : for l in deltas_to_compact.iter() {
1173 402 : info!("compact includes {l}");
1174 : }
1175 :
1176 : // We don't need the original list of layers anymore. Drop it so that
1177 : // we don't accidentally use it later in the function.
1178 28 : drop(level0_deltas);
1179 28 :
1180 28 : stats.read_lock_held_prerequisites_micros = stats
1181 28 : .read_lock_held_spawn_blocking_startup_micros
1182 28 : .till_now();
1183 :
1184 : // TODO: replace with streaming k-merge
1185 28 : let all_keys = {
1186 28 : let mut all_keys = Vec::new();
1187 402 : for l in deltas_to_compact.iter() {
1188 402 : if self.cancel.is_cancelled() {
1189 0 : return Err(CompactionError::ShuttingDown);
1190 402 : }
1191 402 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1192 402 : let keys = delta
1193 402 : .index_entries(ctx)
1194 402 : .await
1195 402 : .map_err(CompactionError::Other)?;
1196 402 : all_keys.extend(keys);
1197 : }
1198 : // The current stdlib sorting implementation is designed in a way where it is
1199 : // particularly fast where the slice is made up of sorted sub-ranges.
1200 4423790 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1201 28 : all_keys
1202 28 : };
1203 28 :
1204 28 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1205 :
1206 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1207 : //
1208 : // A hole is a key range for which this compaction doesn't have any WAL records.
1209 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1210 : // cover the hole, but actually don't contain any WAL records for that key range.
1211 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1212 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1213 : //
1214 : // The algorithm chooses holes as follows.
1215 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1216 : // - Filter: min threshold on range length
1217 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1218 : //
1219 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1220 : #[derive(PartialEq, Eq)]
1221 : struct Hole {
1222 : key_range: Range<Key>,
1223 : coverage_size: usize,
1224 : }
1225 28 : let holes: Vec<Hole> = {
1226 : use std::cmp::Ordering;
1227 : impl Ord for Hole {
1228 0 : fn cmp(&self, other: &Self) -> Ordering {
1229 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1230 0 : }
1231 : }
1232 : impl PartialOrd for Hole {
1233 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1234 0 : Some(self.cmp(other))
1235 0 : }
1236 : }
1237 28 : let max_holes = deltas_to_compact.len();
1238 28 : let last_record_lsn = self.get_last_record_lsn();
1239 28 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1240 28 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1241 28 : // min-heap (reserve space for one more element added before eviction)
1242 28 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1243 28 : let mut prev: Option<Key> = None;
1244 :
1245 2064038 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1246 2064038 : if let Some(prev_key) = prev {
1247 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1248 : // compaction is the gap between data key and metadata keys.
1249 2064010 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1250 0 : && !Key::is_metadata_key(&prev_key)
1251 : {
1252 0 : let key_range = prev_key..next_key;
1253 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1254 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1255 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1256 0 : // That is why it is better to measure size of hole as number of covering image layers.
1257 0 : let coverage_size =
1258 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1259 0 : if coverage_size >= min_hole_coverage_size {
1260 0 : heap.push(Hole {
1261 0 : key_range,
1262 0 : coverage_size,
1263 0 : });
1264 0 : if heap.len() > max_holes {
1265 0 : heap.pop(); // remove smallest hole
1266 0 : }
1267 0 : }
1268 2064010 : }
1269 28 : }
1270 2064038 : prev = Some(next_key.next());
1271 : }
1272 28 : let mut holes = heap.into_vec();
1273 28 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1274 28 : holes
1275 28 : };
1276 28 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1277 28 : drop_rlock(guard);
1278 28 :
1279 28 : if self.cancel.is_cancelled() {
1280 0 : return Err(CompactionError::ShuttingDown);
1281 28 : }
1282 28 :
1283 28 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1284 :
1285 : // This iterator walks through all key-value pairs from all the layers
1286 : // we're compacting, in key, LSN order.
1287 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1288 : // then the Value::Image is ordered before Value::WalRecord.
1289 28 : let mut all_values_iter = {
1290 28 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1291 402 : for l in deltas_to_compact.iter() {
1292 402 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1293 402 : deltas.push(l);
1294 : }
1295 28 : MergeIterator::create(&deltas, &[], ctx)
1296 28 : };
1297 28 :
1298 28 : // This iterator walks through all keys and is needed to calculate size used by each key
1299 28 : let mut all_keys_iter = all_keys
1300 28 : .iter()
1301 2064038 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1302 2064010 : .coalesce(|mut prev, cur| {
1303 2064010 : // Coalesce keys that belong to the same key pair.
1304 2064010 : // This ensures that compaction doesn't put them
1305 2064010 : // into different layer files.
1306 2064010 : // Still limit this by the target file size,
1307 2064010 : // so that we keep the size of the files in
1308 2064010 : // check.
1309 2064010 : if prev.0 == cur.0 && prev.2 < target_file_size {
1310 40038 : prev.2 += cur.2;
1311 40038 : Ok(prev)
1312 : } else {
1313 2023972 : Err((prev, cur))
1314 : }
1315 2064010 : });
1316 28 :
1317 28 : // Merge the contents of all the input delta layers into a new set
1318 28 : // of delta layers, based on the current partitioning.
1319 28 : //
1320 28 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1321 28 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1322 28 : // would be too large. In that case, we also split on the LSN dimension.
1323 28 : //
1324 28 : // LSN
1325 28 : // ^
1326 28 : // |
1327 28 : // | +-----------+ +--+--+--+--+
1328 28 : // | | | | | | | |
1329 28 : // | +-----------+ | | | | |
1330 28 : // | | | | | | | |
1331 28 : // | +-----------+ ==> | | | | |
1332 28 : // | | | | | | | |
1333 28 : // | +-----------+ | | | | |
1334 28 : // | | | | | | | |
1335 28 : // | +-----------+ +--+--+--+--+
1336 28 : // |
1337 28 : // +--------------> key
1338 28 : //
1339 28 : //
1340 28 : // If one key (X) has a lot of page versions:
1341 28 : //
1342 28 : // LSN
1343 28 : // ^
1344 28 : // | (X)
1345 28 : // | +-----------+ +--+--+--+--+
1346 28 : // | | | | | | | |
1347 28 : // | +-----------+ | | +--+ |
1348 28 : // | | | | | | | |
1349 28 : // | +-----------+ ==> | | | | |
1350 28 : // | | | | | +--+ |
1351 28 : // | +-----------+ | | | | |
1352 28 : // | | | | | | | |
1353 28 : // | +-----------+ +--+--+--+--+
1354 28 : // |
1355 28 : // +--------------> key
1356 28 : // TODO: this actually divides the layers into fixed-size chunks, not
1357 28 : // based on the partitioning.
1358 28 : //
1359 28 : // TODO: we should also opportunistically materialize and
1360 28 : // garbage collect what we can.
1361 28 : let mut new_layers = Vec::new();
1362 28 : let mut prev_key: Option<Key> = None;
1363 28 : let mut writer: Option<DeltaLayerWriter> = None;
1364 28 : let mut key_values_total_size = 0u64;
1365 28 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1366 28 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1367 28 : let mut next_hole = 0; // index of next hole in holes vector
1368 28 :
1369 28 : let mut keys = 0;
1370 :
1371 2064066 : while let Some((key, lsn, value)) = all_values_iter
1372 2064066 : .next()
1373 2064066 : .await
1374 2064066 : .map_err(CompactionError::Other)?
1375 : {
1376 2064038 : keys += 1;
1377 2064038 :
1378 2064038 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1379 : // avoid hitting the cancellation token on every key. in benches, we end up
1380 : // shuffling an order of million keys per layer, this means we'll check it
1381 : // around tens of times per layer.
1382 0 : return Err(CompactionError::ShuttingDown);
1383 2064038 : }
1384 2064038 :
1385 2064038 : let same_key = prev_key == Some(key);
1386 2064038 : // We need to check key boundaries once we reach next key or end of layer with the same key
1387 2064038 : if !same_key || lsn == dup_end_lsn {
1388 2024000 : let mut next_key_size = 0u64;
1389 2024000 : let is_dup_layer = dup_end_lsn.is_valid();
1390 2024000 : dup_start_lsn = Lsn::INVALID;
1391 2024000 : if !same_key {
1392 2024000 : dup_end_lsn = Lsn::INVALID;
1393 2024000 : }
1394 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1395 2024000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1396 2024000 : next_key_size = next_size;
1397 2024000 : if key != next_key {
1398 2023972 : if dup_end_lsn.is_valid() {
1399 0 : // We are writting segment with duplicates:
1400 0 : // place all remaining values of this key in separate segment
1401 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1402 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1403 2023972 : }
1404 2023972 : break;
1405 28 : }
1406 28 : key_values_total_size += next_size;
1407 28 : // Check if it is time to split segment: if total keys size is larger than target file size.
1408 28 : // We need to avoid generation of empty segments if next_size > target_file_size.
1409 28 : if key_values_total_size > target_file_size && lsn != next_lsn {
1410 : // Split key between multiple layers: such layer can contain only single key
1411 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1412 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1413 : } else {
1414 0 : lsn // start with the first LSN for this key
1415 : };
1416 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1417 0 : break;
1418 28 : }
1419 : }
1420 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1421 2024000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1422 0 : dup_start_lsn = dup_end_lsn;
1423 0 : dup_end_lsn = lsn_range.end;
1424 2024000 : }
1425 2024000 : if writer.is_some() {
1426 2023972 : let written_size = writer.as_mut().unwrap().size();
1427 2023972 : let contains_hole =
1428 2023972 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1429 : // check if key cause layer overflow or contains hole...
1430 2023972 : if is_dup_layer
1431 2023972 : || dup_end_lsn.is_valid()
1432 2023972 : || written_size + key_values_total_size > target_file_size
1433 2023692 : || contains_hole
1434 : {
1435 : // ... if so, flush previous layer and prepare to write new one
1436 280 : let (desc, path) = writer
1437 280 : .take()
1438 280 : .unwrap()
1439 280 : .finish(prev_key.unwrap().next(), ctx)
1440 280 : .await
1441 280 : .map_err(CompactionError::Other)?;
1442 280 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1443 280 : .map_err(CompactionError::Other)?;
1444 :
1445 280 : new_layers.push(new_delta);
1446 280 : writer = None;
1447 280 :
1448 280 : if contains_hole {
1449 0 : // skip hole
1450 0 : next_hole += 1;
1451 280 : }
1452 2023692 : }
1453 28 : }
1454 : // Remember size of key value because at next iteration we will access next item
1455 2024000 : key_values_total_size = next_key_size;
1456 40038 : }
1457 2064038 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1458 0 : Err(CompactionError::Other(anyhow::anyhow!(
1459 0 : "failpoint delta-layer-writer-fail-before-finish"
1460 0 : )))
1461 2064038 : });
1462 :
1463 2064038 : if !self.shard_identity.is_key_disposable(&key) {
1464 2064038 : if writer.is_none() {
1465 308 : if self.cancel.is_cancelled() {
1466 : // to be somewhat responsive to cancellation, check for each new layer
1467 0 : return Err(CompactionError::ShuttingDown);
1468 308 : }
1469 : // Create writer if not initiaized yet
1470 308 : writer = Some(
1471 : DeltaLayerWriter::new(
1472 308 : self.conf,
1473 308 : self.timeline_id,
1474 308 : self.tenant_shard_id,
1475 308 : key,
1476 308 : if dup_end_lsn.is_valid() {
1477 : // this is a layer containing slice of values of the same key
1478 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1479 0 : dup_start_lsn..dup_end_lsn
1480 : } else {
1481 308 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1482 308 : lsn_range.clone()
1483 : },
1484 308 : ctx,
1485 308 : )
1486 308 : .await
1487 308 : .map_err(CompactionError::Other)?,
1488 : );
1489 :
1490 308 : keys = 0;
1491 2063730 : }
1492 :
1493 2064038 : writer
1494 2064038 : .as_mut()
1495 2064038 : .unwrap()
1496 2064038 : .put_value(key, lsn, value, ctx)
1497 2064038 : .await
1498 2064038 : .map_err(CompactionError::Other)?;
1499 : } else {
1500 0 : let shard = self.shard_identity.shard_index();
1501 0 : let owner = self.shard_identity.get_shard_number(&key);
1502 0 : if cfg!(debug_assertions) {
1503 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
1504 0 : }
1505 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
1506 : }
1507 :
1508 2064038 : if !new_layers.is_empty() {
1509 19786 : fail_point!("after-timeline-compacted-first-L1");
1510 2044252 : }
1511 :
1512 2064038 : prev_key = Some(key);
1513 : }
1514 28 : if let Some(writer) = writer {
1515 28 : let (desc, path) = writer
1516 28 : .finish(prev_key.unwrap().next(), ctx)
1517 28 : .await
1518 28 : .map_err(CompactionError::Other)?;
1519 28 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1520 28 : .map_err(CompactionError::Other)?;
1521 28 : new_layers.push(new_delta);
1522 0 : }
1523 :
1524 : // Sync layers
1525 28 : if !new_layers.is_empty() {
1526 : // Print a warning if the created layer is larger than double the target size
1527 : // Add two pages for potential overhead. This should in theory be already
1528 : // accounted for in the target calculation, but for very small targets,
1529 : // we still might easily hit the limit otherwise.
1530 28 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1531 308 : for layer in new_layers.iter() {
1532 308 : if layer.layer_desc().file_size > warn_limit {
1533 0 : warn!(
1534 : %layer,
1535 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1536 : );
1537 308 : }
1538 : }
1539 :
1540 : // The writer.finish() above already did the fsync of the inodes.
1541 : // We just need to fsync the directory in which these inodes are linked,
1542 : // which we know to be the timeline directory.
1543 : //
1544 : // We use fatal_err() below because the after writer.finish() returns with success,
1545 : // the in-memory state of the filesystem already has the layer file in its final place,
1546 : // and subsequent pageserver code could think it's durable while it really isn't.
1547 28 : let timeline_dir = VirtualFile::open(
1548 28 : &self
1549 28 : .conf
1550 28 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1551 28 : ctx,
1552 28 : )
1553 28 : .await
1554 28 : .fatal_err("VirtualFile::open for timeline dir fsync");
1555 28 : timeline_dir
1556 28 : .sync_all()
1557 28 : .await
1558 28 : .fatal_err("VirtualFile::sync_all timeline dir");
1559 0 : }
1560 :
1561 28 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1562 28 : stats.new_deltas_count = Some(new_layers.len());
1563 308 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1564 28 :
1565 28 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1566 28 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1567 : {
1568 28 : Ok(stats_json) => {
1569 28 : info!(
1570 0 : stats_json = stats_json.as_str(),
1571 0 : "compact_level0_phase1 stats available"
1572 : )
1573 : }
1574 0 : Err(e) => {
1575 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1576 : }
1577 : }
1578 :
1579 : // Without this, rustc complains about deltas_to_compact still
1580 : // being borrowed when we `.into_iter()` below.
1581 28 : drop(all_values_iter);
1582 28 :
1583 28 : Ok(CompactLevel0Phase1Result {
1584 28 : new_layers,
1585 28 : deltas_to_compact: deltas_to_compact
1586 28 : .into_iter()
1587 402 : .map(|x| x.drop_eviction_guard())
1588 28 : .collect::<Vec<_>>(),
1589 28 : fully_compacted,
1590 28 : })
1591 364 : }
1592 : }
1593 :
1594 : #[derive(Default)]
1595 : struct CompactLevel0Phase1Result {
1596 : new_layers: Vec<ResidentLayer>,
1597 : deltas_to_compact: Vec<Layer>,
1598 : // Whether we have included all L0 layers, or selected only part of them due to the
1599 : // L0 compaction size limit.
1600 : fully_compacted: bool,
1601 : }
1602 :
1603 : #[derive(Default)]
1604 : struct CompactLevel0Phase1StatsBuilder {
1605 : version: Option<u64>,
1606 : tenant_id: Option<TenantShardId>,
1607 : timeline_id: Option<TimelineId>,
1608 : read_lock_acquisition_micros: DurationRecorder,
1609 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1610 : read_lock_held_key_sort_micros: DurationRecorder,
1611 : read_lock_held_prerequisites_micros: DurationRecorder,
1612 : read_lock_held_compute_holes_micros: DurationRecorder,
1613 : read_lock_drop_micros: DurationRecorder,
1614 : write_layer_files_micros: DurationRecorder,
1615 : level0_deltas_count: Option<usize>,
1616 : new_deltas_count: Option<usize>,
1617 : new_deltas_size: Option<u64>,
1618 : }
1619 :
1620 : #[derive(serde::Serialize)]
1621 : struct CompactLevel0Phase1Stats {
1622 : version: u64,
1623 : tenant_id: TenantShardId,
1624 : timeline_id: TimelineId,
1625 : read_lock_acquisition_micros: RecordedDuration,
1626 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1627 : read_lock_held_key_sort_micros: RecordedDuration,
1628 : read_lock_held_prerequisites_micros: RecordedDuration,
1629 : read_lock_held_compute_holes_micros: RecordedDuration,
1630 : read_lock_drop_micros: RecordedDuration,
1631 : write_layer_files_micros: RecordedDuration,
1632 : level0_deltas_count: usize,
1633 : new_deltas_count: usize,
1634 : new_deltas_size: u64,
1635 : }
1636 :
1637 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1638 : type Error = anyhow::Error;
1639 :
1640 28 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1641 28 : Ok(Self {
1642 28 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1643 28 : tenant_id: value
1644 28 : .tenant_id
1645 28 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1646 28 : timeline_id: value
1647 28 : .timeline_id
1648 28 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1649 28 : read_lock_acquisition_micros: value
1650 28 : .read_lock_acquisition_micros
1651 28 : .into_recorded()
1652 28 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1653 28 : read_lock_held_spawn_blocking_startup_micros: value
1654 28 : .read_lock_held_spawn_blocking_startup_micros
1655 28 : .into_recorded()
1656 28 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1657 28 : read_lock_held_key_sort_micros: value
1658 28 : .read_lock_held_key_sort_micros
1659 28 : .into_recorded()
1660 28 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1661 28 : read_lock_held_prerequisites_micros: value
1662 28 : .read_lock_held_prerequisites_micros
1663 28 : .into_recorded()
1664 28 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1665 28 : read_lock_held_compute_holes_micros: value
1666 28 : .read_lock_held_compute_holes_micros
1667 28 : .into_recorded()
1668 28 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1669 28 : read_lock_drop_micros: value
1670 28 : .read_lock_drop_micros
1671 28 : .into_recorded()
1672 28 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1673 28 : write_layer_files_micros: value
1674 28 : .write_layer_files_micros
1675 28 : .into_recorded()
1676 28 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1677 28 : level0_deltas_count: value
1678 28 : .level0_deltas_count
1679 28 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1680 28 : new_deltas_count: value
1681 28 : .new_deltas_count
1682 28 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1683 28 : new_deltas_size: value
1684 28 : .new_deltas_size
1685 28 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1686 : })
1687 28 : }
1688 : }
1689 :
1690 : impl Timeline {
1691 : /// Entry point for new tiered compaction algorithm.
1692 : ///
1693 : /// All the real work is in the implementation in the pageserver_compaction
1694 : /// crate. The code here would apply to any algorithm implemented by the
1695 : /// same interface, but tiered is the only one at the moment.
1696 : ///
1697 : /// TODO: cancellation
1698 0 : pub(crate) async fn compact_tiered(
1699 0 : self: &Arc<Self>,
1700 0 : _cancel: &CancellationToken,
1701 0 : ctx: &RequestContext,
1702 0 : ) -> Result<(), CompactionError> {
1703 0 : let fanout = self.get_compaction_threshold() as u64;
1704 0 : let target_file_size = self.get_checkpoint_distance();
1705 :
1706 : // Find the top of the historical layers
1707 0 : let end_lsn = {
1708 0 : let guard = self.layers.read().await;
1709 0 : let layers = guard.layer_map()?;
1710 :
1711 0 : let l0_deltas = layers.level0_deltas();
1712 0 :
1713 0 : // As an optimization, if we find that there are too few L0 layers,
1714 0 : // bail out early. We know that the compaction algorithm would do
1715 0 : // nothing in that case.
1716 0 : if l0_deltas.len() < fanout as usize {
1717 : // doesn't need compacting
1718 0 : return Ok(());
1719 0 : }
1720 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1721 0 : };
1722 0 :
1723 0 : // Is the timeline being deleted?
1724 0 : if self.is_stopping() {
1725 0 : trace!("Dropping out of compaction on timeline shutdown");
1726 0 : return Err(CompactionError::ShuttingDown);
1727 0 : }
1728 :
1729 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1730 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1731 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1732 0 :
1733 0 : pageserver_compaction::compact_tiered::compact_tiered(
1734 0 : &mut adaptor,
1735 0 : end_lsn,
1736 0 : target_file_size,
1737 0 : fanout,
1738 0 : ctx,
1739 0 : )
1740 0 : .await
1741 : // TODO: compact_tiered needs to return CompactionError
1742 0 : .map_err(CompactionError::Other)?;
1743 :
1744 0 : adaptor.flush_updates().await?;
1745 0 : Ok(())
1746 0 : }
1747 :
1748 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1749 : ///
1750 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1751 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1752 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1753 : ///
1754 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1755 : ///
1756 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1757 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1758 : ///
1759 : /// The function will produce:
1760 : ///
1761 : /// ```plain
1762 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1763 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1764 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1765 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1766 : /// ```
1767 : ///
1768 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1769 630 : pub(crate) async fn generate_key_retention(
1770 630 : self: &Arc<Timeline>,
1771 630 : key: Key,
1772 630 : full_history: &[(Key, Lsn, Value)],
1773 630 : horizon: Lsn,
1774 630 : retain_lsn_below_horizon: &[Lsn],
1775 630 : delta_threshold_cnt: usize,
1776 630 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1777 630 : ) -> anyhow::Result<KeyHistoryRetention> {
1778 : // Pre-checks for the invariants
1779 :
1780 630 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
1781 :
1782 630 : if debug_mode {
1783 1530 : for (log_key, _, _) in full_history {
1784 900 : assert_eq!(log_key, &key, "mismatched key");
1785 : }
1786 630 : for i in 1..full_history.len() {
1787 270 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1788 270 : if full_history[i - 1].1 == full_history[i].1 {
1789 0 : assert!(
1790 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1791 0 : "unordered delta/image, or duplicated delta"
1792 : );
1793 270 : }
1794 : }
1795 : // There was an assertion for no base image that checks if the first
1796 : // record in the history is `will_init` before, but it was removed.
1797 : // This is explained in the test cases for generate_key_retention.
1798 : // Search "incomplete history" for more information.
1799 1410 : for lsn in retain_lsn_below_horizon {
1800 780 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1801 : }
1802 630 : for i in 1..retain_lsn_below_horizon.len() {
1803 356 : assert!(
1804 356 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1805 0 : "unordered LSN"
1806 : );
1807 : }
1808 0 : }
1809 630 : let has_ancestor = base_img_from_ancestor.is_some();
1810 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1811 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1812 630 : let (mut split_history, lsn_split_points) = {
1813 630 : let mut split_history = Vec::new();
1814 630 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1815 630 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1816 1410 : for lsn in retain_lsn_below_horizon {
1817 780 : lsn_split_points.push(*lsn);
1818 780 : }
1819 630 : lsn_split_points.push(horizon);
1820 630 : let mut current_idx = 0;
1821 1530 : for item @ (_, lsn, _) in full_history {
1822 1144 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1823 244 : current_idx += 1;
1824 244 : }
1825 900 : split_history[current_idx].push(item);
1826 : }
1827 630 : (split_history, lsn_split_points)
1828 : };
1829 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1830 2670 : for split_for_lsn in &mut split_history {
1831 2040 : let mut prev_lsn = None;
1832 2040 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1833 2040 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1834 900 : if let Some(prev_lsn) = &prev_lsn {
1835 118 : if *prev_lsn == lsn {
1836 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1837 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1838 : // drop this delta and keep the image.
1839 : //
1840 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1841 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1842 : // dropped.
1843 : //
1844 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1845 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1846 0 : continue;
1847 118 : }
1848 782 : }
1849 900 : prev_lsn = Some(lsn);
1850 900 : new_split_for_lsn.push(record);
1851 : }
1852 2040 : *split_for_lsn = new_split_for_lsn;
1853 : }
1854 : // Step 3: generate images when necessary
1855 630 : let mut retention = Vec::with_capacity(split_history.len());
1856 630 : let mut records_since_last_image = 0;
1857 630 : let batch_cnt = split_history.len();
1858 630 : assert!(
1859 630 : batch_cnt >= 2,
1860 0 : "should have at least below + above horizon batches"
1861 : );
1862 630 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1863 630 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1864 42 : replay_history.push((key, lsn, Value::Image(img)));
1865 588 : }
1866 :
1867 : /// Generate debug information for the replay history
1868 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1869 : use std::fmt::Write;
1870 0 : let mut output = String::new();
1871 0 : if let Some((key, _, _)) = replay_history.first() {
1872 0 : write!(output, "key={} ", key).unwrap();
1873 0 : let mut cnt = 0;
1874 0 : for (_, lsn, val) in replay_history {
1875 0 : if val.is_image() {
1876 0 : write!(output, "i@{} ", lsn).unwrap();
1877 0 : } else if val.will_init() {
1878 0 : write!(output, "di@{} ", lsn).unwrap();
1879 0 : } else {
1880 0 : write!(output, "d@{} ", lsn).unwrap();
1881 0 : }
1882 0 : cnt += 1;
1883 0 : if cnt >= 128 {
1884 0 : write!(output, "... and more").unwrap();
1885 0 : break;
1886 0 : }
1887 : }
1888 0 : } else {
1889 0 : write!(output, "<no history>").unwrap();
1890 0 : }
1891 0 : output
1892 0 : }
1893 :
1894 0 : fn generate_debug_trace(
1895 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1896 0 : full_history: &[(Key, Lsn, Value)],
1897 0 : lsns: &[Lsn],
1898 0 : horizon: Lsn,
1899 0 : ) -> String {
1900 : use std::fmt::Write;
1901 0 : let mut output = String::new();
1902 0 : if let Some(replay_history) = replay_history {
1903 0 : writeln!(
1904 0 : output,
1905 0 : "replay_history: {}",
1906 0 : generate_history_trace(replay_history)
1907 0 : )
1908 0 : .unwrap();
1909 0 : } else {
1910 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1911 0 : }
1912 0 : writeln!(
1913 0 : output,
1914 0 : "full_history: {}",
1915 0 : generate_history_trace(full_history)
1916 0 : )
1917 0 : .unwrap();
1918 0 : writeln!(
1919 0 : output,
1920 0 : "when processing: [{}] horizon={}",
1921 0 : lsns.iter().map(|l| format!("{l}")).join(","),
1922 0 : horizon
1923 0 : )
1924 0 : .unwrap();
1925 0 : output
1926 0 : }
1927 :
1928 630 : let mut key_exists = false;
1929 2040 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
1930 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
1931 2040 : records_since_last_image += split_for_lsn.len();
1932 : // Whether to produce an image into the final layer files
1933 2040 : let produce_image = if i == 0 && !has_ancestor {
1934 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
1935 588 : true
1936 1452 : } else if i == batch_cnt - 1 {
1937 : // Do not generate images for the last batch (above horizon)
1938 630 : false
1939 822 : } else if records_since_last_image == 0 {
1940 644 : false
1941 178 : } else if records_since_last_image >= delta_threshold_cnt {
1942 : // Generate images when there are too many records
1943 6 : true
1944 : } else {
1945 172 : false
1946 : };
1947 2040 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
1948 : // Only retain the items after the last image record
1949 2514 : for idx in (0..replay_history.len()).rev() {
1950 2514 : if replay_history[idx].2.will_init() {
1951 2040 : replay_history = replay_history[idx..].to_vec();
1952 2040 : break;
1953 474 : }
1954 : }
1955 2040 : if replay_history.is_empty() && !key_exists {
1956 : // The key does not exist at earlier LSN, we can skip this iteration.
1957 0 : retention.push(Vec::new());
1958 0 : continue;
1959 2040 : } else {
1960 2040 : key_exists = true;
1961 2040 : }
1962 2040 : let Some((_, _, val)) = replay_history.first() else {
1963 0 : unreachable!("replay history should not be empty once it exists")
1964 : };
1965 2040 : if !val.will_init() {
1966 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
1967 0 : generate_debug_trace(
1968 0 : Some(&replay_history),
1969 0 : full_history,
1970 0 : retain_lsn_below_horizon,
1971 0 : horizon,
1972 0 : )
1973 0 : });
1974 2040 : }
1975 : // Whether to reconstruct the image. In debug mode, we will generate an image
1976 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
1977 : // image into the final layer.
1978 2040 : let generate_image = produce_image || debug_mode;
1979 2040 : if produce_image {
1980 594 : records_since_last_image = 0;
1981 1446 : }
1982 2040 : let img_and_lsn = if generate_image {
1983 2040 : let replay_history_for_debug = if debug_mode {
1984 2040 : Some(replay_history.clone())
1985 : } else {
1986 0 : None
1987 : };
1988 2040 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
1989 2040 : let history = if produce_image {
1990 594 : std::mem::take(&mut replay_history)
1991 : } else {
1992 1446 : replay_history.clone()
1993 : };
1994 2040 : let mut img = None;
1995 2040 : let mut records = Vec::with_capacity(history.len());
1996 2040 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
1997 2018 : img = Some((*lsn, val.clone()));
1998 2018 : for (_, lsn, val) in history.into_iter().skip(1) {
1999 460 : let Value::WalRecord(rec) = val else {
2000 0 : return Err(anyhow::anyhow!(
2001 0 : "invalid record, first record is image, expect walrecords"
2002 0 : ))
2003 0 : .with_context(|| {
2004 0 : generate_debug_trace(
2005 0 : replay_history_for_debug_ref,
2006 0 : full_history,
2007 0 : retain_lsn_below_horizon,
2008 0 : horizon,
2009 0 : )
2010 0 : });
2011 : };
2012 460 : records.push((lsn, rec));
2013 : }
2014 : } else {
2015 36 : for (_, lsn, val) in history.into_iter() {
2016 36 : let Value::WalRecord(rec) = val else {
2017 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2018 0 : .with_context(|| generate_debug_trace(
2019 0 : replay_history_for_debug_ref,
2020 0 : full_history,
2021 0 : retain_lsn_below_horizon,
2022 0 : horizon,
2023 0 : ));
2024 : };
2025 36 : records.push((lsn, rec));
2026 : }
2027 : }
2028 2040 : records.reverse();
2029 2040 : let state = ValueReconstructState { img, records };
2030 : // last batch does not generate image so i is always in range, unless we force generate
2031 : // an image during testing
2032 2040 : let request_lsn = if i >= lsn_split_points.len() {
2033 630 : Lsn::MAX
2034 : } else {
2035 1410 : lsn_split_points[i]
2036 : };
2037 2040 : let img = self.reconstruct_value(key, request_lsn, state).await?;
2038 2040 : Some((request_lsn, img))
2039 : } else {
2040 0 : None
2041 : };
2042 2040 : if produce_image {
2043 594 : let (request_lsn, img) = img_and_lsn.unwrap();
2044 594 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2045 594 : retention.push(vec![(request_lsn, Value::Image(img))]);
2046 1446 : } else {
2047 1446 : let deltas = split_for_lsn
2048 1446 : .iter()
2049 1446 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2050 1446 : .collect_vec();
2051 1446 : retention.push(deltas);
2052 1446 : }
2053 : }
2054 630 : let mut result = Vec::with_capacity(retention.len());
2055 630 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2056 2040 : for (idx, logs) in retention.into_iter().enumerate() {
2057 2040 : if idx == lsn_split_points.len() {
2058 630 : return Ok(KeyHistoryRetention {
2059 630 : below_horizon: result,
2060 630 : above_horizon: KeyLogAtLsn(logs),
2061 630 : });
2062 1410 : } else {
2063 1410 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2064 1410 : }
2065 : }
2066 0 : unreachable!("key retention is empty")
2067 630 : }
2068 :
2069 : /// Check how much space is left on the disk
2070 52 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2071 52 : let tenants_dir = self.conf.tenants_path();
2072 :
2073 52 : let stat = Statvfs::get(&tenants_dir, None)
2074 52 : .context("statvfs failed, presumably directory got unlinked")?;
2075 :
2076 52 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2077 52 :
2078 52 : Ok(avail_bytes)
2079 52 : }
2080 :
2081 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2082 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2083 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2084 : /// compaction.
2085 52 : async fn check_compaction_space(
2086 52 : self: &Arc<Self>,
2087 52 : layer_selection: &[Layer],
2088 52 : ) -> anyhow::Result<()> {
2089 52 : let available_space = self.check_available_space().await?;
2090 52 : let mut remote_layer_size = 0;
2091 52 : let mut all_layer_size = 0;
2092 204 : for layer in layer_selection {
2093 152 : let needs_download = layer.needs_download().await?;
2094 152 : if needs_download.is_some() {
2095 0 : remote_layer_size += layer.layer_desc().file_size;
2096 152 : }
2097 152 : all_layer_size += layer.layer_desc().file_size;
2098 : }
2099 52 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2100 52 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2101 : {
2102 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2103 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
2104 52 : }
2105 52 : Ok(())
2106 52 : }
2107 :
2108 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2109 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2110 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2111 : /// here.
2112 54 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2113 54 : let gc_cutoff_lsn = {
2114 54 : let gc_info = self.gc_info.read().unwrap();
2115 54 : gc_info.min_cutoff()
2116 54 : };
2117 54 :
2118 54 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2119 54 : // let watermark = watermark.min(self.standby_horizon.load());
2120 54 :
2121 54 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2122 54 : // them when computing the watermark.
2123 54 : gc_cutoff_lsn.min(*self.get_latest_gc_cutoff_lsn())
2124 54 : }
2125 :
2126 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2127 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2128 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2129 : /// like a full compaction of the specified keyspace.
2130 0 : pub(crate) async fn gc_compaction_split_jobs(
2131 0 : self: &Arc<Self>,
2132 0 : job: GcCompactJob,
2133 0 : sub_compaction_max_job_size_mb: Option<u64>,
2134 0 : ) -> anyhow::Result<Vec<GcCompactJob>> {
2135 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2136 0 : job.compact_lsn_range.end
2137 : } else {
2138 0 : self.get_gc_compaction_watermark()
2139 : };
2140 :
2141 : // Split compaction job to about 4GB each
2142 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2143 0 : let sub_compaction_max_job_size_mb =
2144 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2145 0 :
2146 0 : let mut compact_jobs = Vec::new();
2147 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2148 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2149 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2150 : // Truncate the key range to be within user specified compaction range.
2151 0 : fn truncate_to(
2152 0 : source_start: &Key,
2153 0 : source_end: &Key,
2154 0 : target_start: &Key,
2155 0 : target_end: &Key,
2156 0 : ) -> Option<(Key, Key)> {
2157 0 : let start = source_start.max(target_start);
2158 0 : let end = source_end.min(target_end);
2159 0 : if start < end {
2160 0 : Some((*start, *end))
2161 : } else {
2162 0 : None
2163 : }
2164 0 : }
2165 0 : let mut split_key_ranges = Vec::new();
2166 0 : let ranges = dense_ks
2167 0 : .parts
2168 0 : .iter()
2169 0 : .map(|partition| partition.ranges.iter())
2170 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2171 0 : .flatten()
2172 0 : .cloned()
2173 0 : .collect_vec();
2174 0 : for range in ranges.iter() {
2175 0 : let Some((start, end)) = truncate_to(
2176 0 : &range.start,
2177 0 : &range.end,
2178 0 : &job.compact_key_range.start,
2179 0 : &job.compact_key_range.end,
2180 0 : ) else {
2181 0 : continue;
2182 : };
2183 0 : split_key_ranges.push((start, end));
2184 : }
2185 0 : split_key_ranges.sort();
2186 0 : let guard = self.layers.read().await;
2187 0 : let layer_map = guard.layer_map()?;
2188 0 : let mut current_start = None;
2189 0 : let ranges_num = split_key_ranges.len();
2190 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2191 0 : if current_start.is_none() {
2192 0 : current_start = Some(start);
2193 0 : }
2194 0 : let start = current_start.unwrap();
2195 0 : if start >= end {
2196 : // We have already processed this partition.
2197 0 : continue;
2198 0 : }
2199 0 : let res = layer_map.range_search(start..end, compact_below_lsn);
2200 0 : let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
2201 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2202 : // Try to extend the compaction range so that we include at least one full layer file.
2203 0 : let extended_end = res
2204 0 : .found
2205 0 : .keys()
2206 0 : .map(|layer| layer.layer.key_range.end)
2207 0 : .min();
2208 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2209 : // In this case, we simply use the specified key range end.
2210 0 : let end = if let Some(extended_end) = extended_end {
2211 0 : extended_end.max(end)
2212 : } else {
2213 0 : end
2214 : };
2215 0 : info!(
2216 0 : "splitting compaction job: {}..{}, estimated_size={}",
2217 : start, end, total_size
2218 : );
2219 0 : compact_jobs.push(GcCompactJob {
2220 0 : dry_run: job.dry_run,
2221 0 : compact_key_range: start..end,
2222 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
2223 0 : });
2224 0 : current_start = Some(end);
2225 0 : }
2226 : }
2227 0 : drop(guard);
2228 0 : Ok(compact_jobs)
2229 0 : }
2230 :
2231 : /// An experimental compaction building block that combines compaction with garbage collection.
2232 : ///
2233 : /// The current implementation picks all delta + image layers that are below or intersecting with
2234 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
2235 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
2236 : /// and create delta layers with all deltas >= gc horizon.
2237 : ///
2238 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
2239 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
2240 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
2241 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
2242 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
2243 : /// part of the range.
2244 : ///
2245 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
2246 : /// the LSN. Otherwise, it will use the gc cutoff by default.
2247 54 : pub(crate) async fn compact_with_gc(
2248 54 : self: &Arc<Self>,
2249 54 : cancel: &CancellationToken,
2250 54 : options: CompactOptions,
2251 54 : ctx: &RequestContext,
2252 54 : ) -> anyhow::Result<()> {
2253 54 : let sub_compaction = options.sub_compaction;
2254 54 : let job = GcCompactJob::from_compact_options(options.clone());
2255 54 : if sub_compaction {
2256 0 : info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
2257 0 : let jobs = self
2258 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
2259 0 : .await?;
2260 0 : let jobs_len = jobs.len();
2261 0 : for (idx, job) in jobs.into_iter().enumerate() {
2262 0 : info!(
2263 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
2264 0 : idx + 1,
2265 : jobs_len
2266 : );
2267 0 : self.compact_with_gc_inner(cancel, job, ctx).await?;
2268 : }
2269 0 : if jobs_len == 0 {
2270 0 : info!("no jobs to run, skipping gc bottom-most compaction");
2271 0 : }
2272 0 : return Ok(());
2273 54 : }
2274 54 : self.compact_with_gc_inner(cancel, job, ctx).await
2275 54 : }
2276 :
2277 54 : async fn compact_with_gc_inner(
2278 54 : self: &Arc<Self>,
2279 54 : cancel: &CancellationToken,
2280 54 : job: GcCompactJob,
2281 54 : ctx: &RequestContext,
2282 54 : ) -> anyhow::Result<()> {
2283 54 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
2284 54 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
2285 54 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
2286 54 :
2287 54 : let gc_lock = async {
2288 54 : tokio::select! {
2289 54 : guard = self.gc_lock.lock() => Ok(guard),
2290 : // TODO: refactor to CompactionError to correctly pass cancelled error
2291 54 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
2292 : }
2293 54 : };
2294 :
2295 54 : let gc_lock = crate::timed(
2296 54 : gc_lock,
2297 54 : "acquires gc lock",
2298 54 : std::time::Duration::from_secs(5),
2299 54 : )
2300 54 : .await?;
2301 :
2302 54 : let dry_run = job.dry_run;
2303 54 : let compact_key_range = job.compact_key_range;
2304 54 : let compact_lsn_range = job.compact_lsn_range;
2305 :
2306 54 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2307 :
2308 54 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
2309 :
2310 54 : scopeguard::defer! {
2311 54 : info!("done enhanced gc bottom-most compaction");
2312 54 : };
2313 54 :
2314 54 : let mut stat = CompactionStatistics::default();
2315 :
2316 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
2317 : // The layer selection has the following properties:
2318 : // 1. If a layer is in the selection, all layers below it are in the selection.
2319 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
2320 52 : let job_desc = {
2321 54 : let guard = self.layers.read().await;
2322 54 : let layers = guard.layer_map()?;
2323 54 : let gc_info = self.gc_info.read().unwrap();
2324 54 : let mut retain_lsns_below_horizon = Vec::new();
2325 54 : let gc_cutoff = {
2326 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2327 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2328 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2329 : // to get the truth data.
2330 54 : let real_gc_cutoff = self.get_gc_compaction_watermark();
2331 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2332 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2333 : // the real cutoff.
2334 54 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2335 48 : real_gc_cutoff
2336 : } else {
2337 6 : compact_lsn_range.end
2338 : };
2339 54 : if gc_cutoff > real_gc_cutoff {
2340 4 : warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
2341 4 : gc_cutoff = real_gc_cutoff;
2342 50 : }
2343 54 : gc_cutoff
2344 : };
2345 70 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
2346 70 : if lsn < &gc_cutoff {
2347 70 : retain_lsns_below_horizon.push(*lsn);
2348 70 : }
2349 : }
2350 54 : for lsn in gc_info.leases.keys() {
2351 0 : if lsn < &gc_cutoff {
2352 0 : retain_lsns_below_horizon.push(*lsn);
2353 0 : }
2354 : }
2355 54 : let mut selected_layers: Vec<Layer> = Vec::new();
2356 54 : drop(gc_info);
2357 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
2358 54 : let Some(max_layer_lsn) = layers
2359 54 : .iter_historic_layers()
2360 244 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
2361 208 : .map(|desc| desc.get_lsn_range().end)
2362 54 : .max()
2363 : else {
2364 0 : info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
2365 0 : return Ok(());
2366 : };
2367 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
2368 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
2369 : // it is a branch.
2370 54 : let Some(min_layer_lsn) = layers
2371 54 : .iter_historic_layers()
2372 244 : .filter(|desc| {
2373 244 : if compact_lsn_range.start == Lsn::INVALID {
2374 198 : true // select all layers below if start == Lsn(0)
2375 : } else {
2376 46 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
2377 : }
2378 244 : })
2379 226 : .map(|desc| desc.get_lsn_range().start)
2380 54 : .min()
2381 : else {
2382 0 : info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
2383 0 : return Ok(());
2384 : };
2385 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
2386 : // layers to compact.
2387 54 : let mut rewrite_layers = Vec::new();
2388 244 : for desc in layers.iter_historic_layers() {
2389 244 : if desc.get_lsn_range().end <= max_layer_lsn
2390 208 : && desc.get_lsn_range().start >= min_layer_lsn
2391 190 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
2392 : {
2393 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
2394 : // even if it might contain extra keys
2395 152 : selected_layers.push(guard.get_from_desc(&desc));
2396 152 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
2397 152 : // to overlap image layers)
2398 152 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
2399 2 : {
2400 2 : rewrite_layers.push(desc);
2401 150 : }
2402 92 : }
2403 : }
2404 54 : if selected_layers.is_empty() {
2405 2 : info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
2406 2 : return Ok(());
2407 52 : }
2408 52 : retain_lsns_below_horizon.sort();
2409 52 : GcCompactionJobDescription {
2410 52 : selected_layers,
2411 52 : gc_cutoff,
2412 52 : retain_lsns_below_horizon,
2413 52 : min_layer_lsn,
2414 52 : max_layer_lsn,
2415 52 : compaction_key_range: compact_key_range,
2416 52 : rewrite_layers,
2417 52 : }
2418 : };
2419 52 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
2420 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
2421 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
2422 8 : (true, job_desc.min_layer_lsn)
2423 44 : } else if self.ancestor_timeline.is_some() {
2424 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
2425 : // LSN ranges all the way to the ancestor timeline.
2426 2 : (true, self.ancestor_lsn)
2427 : } else {
2428 42 : let res = job_desc
2429 42 : .retain_lsns_below_horizon
2430 42 : .first()
2431 42 : .copied()
2432 42 : .unwrap_or(job_desc.gc_cutoff);
2433 42 : if debug_mode {
2434 42 : assert_eq!(
2435 42 : res,
2436 42 : job_desc
2437 42 : .retain_lsns_below_horizon
2438 42 : .iter()
2439 42 : .min()
2440 42 : .copied()
2441 42 : .unwrap_or(job_desc.gc_cutoff)
2442 42 : );
2443 0 : }
2444 42 : (false, res)
2445 : };
2446 52 : info!(
2447 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
2448 0 : job_desc.selected_layers.len(),
2449 0 : job_desc.rewrite_layers.len(),
2450 : job_desc.max_layer_lsn,
2451 : job_desc.min_layer_lsn,
2452 : job_desc.gc_cutoff,
2453 : lowest_retain_lsn,
2454 : job_desc.compaction_key_range.start,
2455 : job_desc.compaction_key_range.end,
2456 : has_data_below,
2457 : );
2458 :
2459 204 : for layer in &job_desc.selected_layers {
2460 152 : debug!("read layer: {}", layer.layer_desc().key());
2461 : }
2462 54 : for layer in &job_desc.rewrite_layers {
2463 2 : debug!("rewrite layer: {}", layer.key());
2464 : }
2465 :
2466 52 : self.check_compaction_space(&job_desc.selected_layers)
2467 52 : .await?;
2468 :
2469 : // Generate statistics for the compaction
2470 204 : for layer in &job_desc.selected_layers {
2471 152 : let desc = layer.layer_desc();
2472 152 : if desc.is_delta() {
2473 86 : stat.visit_delta_layer(desc.file_size());
2474 86 : } else {
2475 66 : stat.visit_image_layer(desc.file_size());
2476 66 : }
2477 : }
2478 :
2479 : // Step 1: construct a k-merge iterator over all layers.
2480 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
2481 52 : let layer_names = job_desc
2482 52 : .selected_layers
2483 52 : .iter()
2484 152 : .map(|layer| layer.layer_desc().layer_name())
2485 52 : .collect_vec();
2486 52 : if let Some(err) = check_valid_layermap(&layer_names) {
2487 0 : bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
2488 52 : }
2489 52 : // The maximum LSN we are processing in this compaction loop
2490 52 : let end_lsn = job_desc
2491 52 : .selected_layers
2492 52 : .iter()
2493 152 : .map(|l| l.layer_desc().lsn_range.end)
2494 52 : .max()
2495 52 : .unwrap();
2496 52 : let mut delta_layers = Vec::new();
2497 52 : let mut image_layers = Vec::new();
2498 52 : let mut downloaded_layers = Vec::new();
2499 52 : let mut total_downloaded_size = 0;
2500 52 : let mut total_layer_size = 0;
2501 204 : for layer in &job_desc.selected_layers {
2502 152 : if layer.needs_download().await?.is_some() {
2503 0 : total_downloaded_size += layer.layer_desc().file_size;
2504 152 : }
2505 152 : total_layer_size += layer.layer_desc().file_size;
2506 152 : let resident_layer = layer.download_and_keep_resident().await?;
2507 152 : downloaded_layers.push(resident_layer);
2508 : }
2509 52 : info!(
2510 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
2511 0 : total_downloaded_size,
2512 0 : total_layer_size,
2513 0 : total_downloaded_size as f64 / total_layer_size as f64
2514 : );
2515 204 : for resident_layer in &downloaded_layers {
2516 152 : if resident_layer.layer_desc().is_delta() {
2517 86 : let layer = resident_layer.get_as_delta(ctx).await?;
2518 86 : delta_layers.push(layer);
2519 : } else {
2520 66 : let layer = resident_layer.get_as_image(ctx).await?;
2521 66 : image_layers.push(layer);
2522 : }
2523 : }
2524 52 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
2525 52 : let mut merge_iter = FilterIterator::create(
2526 52 : MergeIterator::create(&delta_layers, &image_layers, ctx),
2527 52 : dense_ks,
2528 52 : sparse_ks,
2529 52 : )?;
2530 :
2531 : // Step 2: Produce images+deltas.
2532 52 : let mut accumulated_values = Vec::new();
2533 52 : let mut last_key: Option<Key> = None;
2534 :
2535 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2536 : // when some condition meet.
2537 52 : let mut image_layer_writer = if !has_data_below {
2538 : Some(
2539 42 : SplitImageLayerWriter::new(
2540 42 : self.conf,
2541 42 : self.timeline_id,
2542 42 : self.tenant_shard_id,
2543 42 : job_desc.compaction_key_range.start,
2544 42 : lowest_retain_lsn,
2545 42 : self.get_compaction_target_size(),
2546 42 : ctx,
2547 42 : )
2548 42 : .await?,
2549 : )
2550 : } else {
2551 10 : None
2552 : };
2553 :
2554 52 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
2555 52 : self.conf,
2556 52 : self.timeline_id,
2557 52 : self.tenant_shard_id,
2558 52 : lowest_retain_lsn..end_lsn,
2559 52 : self.get_compaction_target_size(),
2560 52 : )
2561 52 : .await?;
2562 :
2563 : #[derive(Default)]
2564 : struct RewritingLayers {
2565 : before: Option<DeltaLayerWriter>,
2566 : after: Option<DeltaLayerWriter>,
2567 : }
2568 52 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
2569 :
2570 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
2571 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
2572 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
2573 : ///
2574 : /// This function unifies the cases so that later code doesn't have to think about it.
2575 : ///
2576 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2577 : /// is needed for reconstruction. This should be fixed in the future.
2578 : ///
2579 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2580 : /// images.
2581 622 : async fn get_ancestor_image(
2582 622 : this_tline: &Arc<Timeline>,
2583 622 : key: Key,
2584 622 : ctx: &RequestContext,
2585 622 : has_data_below: bool,
2586 622 : history_lsn_point: Lsn,
2587 622 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2588 622 : if !has_data_below {
2589 584 : return Ok(None);
2590 38 : };
2591 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2592 : // as much existing code as possible.
2593 38 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
2594 38 : Ok(Some((key, history_lsn_point, img)))
2595 622 : }
2596 :
2597 : // Actually, we can decide not to write to the image layer at all at this point because
2598 : // the key and LSN range are determined. However, to keep things simple here, we still
2599 : // create this writer, and discard the writer in the end.
2600 :
2601 966 : while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
2602 914 : if cancel.is_cancelled() {
2603 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2604 914 : }
2605 914 : if self.shard_identity.is_key_disposable(&key) {
2606 : // If this shard does not need to store this key, simply skip it.
2607 : //
2608 : // This is not handled in the filter iterator because shard is determined by hash.
2609 : // Therefore, it does not give us any performance benefit to do things like skip
2610 : // a whole layer file as handling key spaces (ranges).
2611 0 : if cfg!(debug_assertions) {
2612 0 : let shard = self.shard_identity.shard_index();
2613 0 : let owner = self.shard_identity.get_shard_number(&key);
2614 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
2615 0 : }
2616 0 : continue;
2617 914 : }
2618 914 : if !job_desc.compaction_key_range.contains(&key) {
2619 64 : if !desc.is_delta {
2620 60 : continue;
2621 4 : }
2622 4 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
2623 4 : let rewriter = if key < job_desc.compaction_key_range.start {
2624 0 : if rewriter.before.is_none() {
2625 0 : rewriter.before = Some(
2626 0 : DeltaLayerWriter::new(
2627 0 : self.conf,
2628 0 : self.timeline_id,
2629 0 : self.tenant_shard_id,
2630 0 : desc.key_range.start,
2631 0 : desc.lsn_range.clone(),
2632 0 : ctx,
2633 0 : )
2634 0 : .await?,
2635 : );
2636 0 : }
2637 0 : rewriter.before.as_mut().unwrap()
2638 4 : } else if key >= job_desc.compaction_key_range.end {
2639 4 : if rewriter.after.is_none() {
2640 2 : rewriter.after = Some(
2641 2 : DeltaLayerWriter::new(
2642 2 : self.conf,
2643 2 : self.timeline_id,
2644 2 : self.tenant_shard_id,
2645 2 : job_desc.compaction_key_range.end,
2646 2 : desc.lsn_range.clone(),
2647 2 : ctx,
2648 2 : )
2649 2 : .await?,
2650 : );
2651 2 : }
2652 4 : rewriter.after.as_mut().unwrap()
2653 : } else {
2654 0 : unreachable!()
2655 : };
2656 4 : rewriter.put_value(key, lsn, val, ctx).await?;
2657 4 : continue;
2658 850 : }
2659 850 : match val {
2660 610 : Value::Image(_) => stat.visit_image_key(&val),
2661 240 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2662 : }
2663 850 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2664 280 : if last_key.is_none() {
2665 52 : last_key = Some(key);
2666 228 : }
2667 280 : accumulated_values.push((key, lsn, val));
2668 : } else {
2669 570 : let last_key: &mut Key = last_key.as_mut().unwrap();
2670 570 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
2671 570 : let retention = self
2672 570 : .generate_key_retention(
2673 570 : *last_key,
2674 570 : &accumulated_values,
2675 570 : job_desc.gc_cutoff,
2676 570 : &job_desc.retain_lsns_below_horizon,
2677 570 : COMPACTION_DELTA_THRESHOLD,
2678 570 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
2679 570 : .await?,
2680 : )
2681 570 : .await?;
2682 570 : retention
2683 570 : .pipe_to(
2684 570 : *last_key,
2685 570 : &mut delta_layer_writer,
2686 570 : image_layer_writer.as_mut(),
2687 570 : &mut stat,
2688 570 : ctx,
2689 570 : )
2690 570 : .await?;
2691 570 : accumulated_values.clear();
2692 570 : *last_key = key;
2693 570 : accumulated_values.push((key, lsn, val));
2694 : }
2695 : }
2696 :
2697 : // TODO: move the below part to the loop body
2698 52 : let last_key = last_key.expect("no keys produced during compaction");
2699 52 : stat.on_unique_key_visited();
2700 :
2701 52 : let retention = self
2702 52 : .generate_key_retention(
2703 52 : last_key,
2704 52 : &accumulated_values,
2705 52 : job_desc.gc_cutoff,
2706 52 : &job_desc.retain_lsns_below_horizon,
2707 52 : COMPACTION_DELTA_THRESHOLD,
2708 52 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
2709 : )
2710 52 : .await?;
2711 52 : retention
2712 52 : .pipe_to(
2713 52 : last_key,
2714 52 : &mut delta_layer_writer,
2715 52 : image_layer_writer.as_mut(),
2716 52 : &mut stat,
2717 52 : ctx,
2718 52 : )
2719 52 : .await?;
2720 : // end: move the above part to the loop body
2721 :
2722 52 : let mut rewrote_delta_layers = Vec::new();
2723 54 : for (key, writers) in delta_layer_rewriters {
2724 2 : if let Some(delta_writer_before) = writers.before {
2725 0 : let (desc, path) = delta_writer_before
2726 0 : .finish(job_desc.compaction_key_range.start, ctx)
2727 0 : .await?;
2728 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2729 0 : rewrote_delta_layers.push(layer);
2730 2 : }
2731 2 : if let Some(delta_writer_after) = writers.after {
2732 2 : let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
2733 2 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2734 2 : rewrote_delta_layers.push(layer);
2735 0 : }
2736 : }
2737 :
2738 74 : let discard = |key: &PersistentLayerKey| {
2739 74 : let key = key.clone();
2740 74 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2741 74 : };
2742 :
2743 52 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2744 42 : if !dry_run {
2745 38 : let end_key = job_desc.compaction_key_range.end;
2746 38 : writer
2747 38 : .finish_with_discard_fn(self, ctx, end_key, discard)
2748 38 : .await?
2749 : } else {
2750 4 : drop(writer);
2751 4 : Vec::new()
2752 : }
2753 : } else {
2754 10 : Vec::new()
2755 : };
2756 :
2757 52 : let produced_delta_layers = if !dry_run {
2758 48 : delta_layer_writer
2759 48 : .finish_with_discard_fn(self, ctx, discard)
2760 48 : .await?
2761 : } else {
2762 4 : drop(delta_layer_writer);
2763 4 : Vec::new()
2764 : };
2765 :
2766 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
2767 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
2768 52 : let mut compact_to = Vec::new();
2769 52 : let mut keep_layers = HashSet::new();
2770 52 : let produced_delta_layers_len = produced_delta_layers.len();
2771 52 : let produced_image_layers_len = produced_image_layers.len();
2772 88 : for action in produced_delta_layers {
2773 36 : match action {
2774 22 : BatchWriterResult::Produced(layer) => {
2775 22 : if cfg!(debug_assertions) {
2776 22 : info!("produced delta layer: {}", layer.layer_desc().key());
2777 0 : }
2778 22 : stat.produce_delta_layer(layer.layer_desc().file_size());
2779 22 : compact_to.push(layer);
2780 : }
2781 14 : BatchWriterResult::Discarded(l) => {
2782 14 : if cfg!(debug_assertions) {
2783 14 : info!("discarded delta layer: {}", l);
2784 0 : }
2785 14 : keep_layers.insert(l);
2786 14 : stat.discard_delta_layer();
2787 : }
2788 : }
2789 : }
2790 54 : for layer in &rewrote_delta_layers {
2791 2 : debug!(
2792 0 : "produced rewritten delta layer: {}",
2793 0 : layer.layer_desc().key()
2794 : );
2795 : }
2796 52 : compact_to.extend(rewrote_delta_layers);
2797 90 : for action in produced_image_layers {
2798 38 : match action {
2799 30 : BatchWriterResult::Produced(layer) => {
2800 30 : debug!("produced image layer: {}", layer.layer_desc().key());
2801 30 : stat.produce_image_layer(layer.layer_desc().file_size());
2802 30 : compact_to.push(layer);
2803 : }
2804 8 : BatchWriterResult::Discarded(l) => {
2805 8 : debug!("discarded image layer: {}", l);
2806 8 : keep_layers.insert(l);
2807 8 : stat.discard_image_layer();
2808 : }
2809 : }
2810 : }
2811 :
2812 52 : let mut layer_selection = job_desc.selected_layers;
2813 :
2814 : // Partial compaction might select more data than it processes, e.g., if
2815 : // the compaction_key_range only partially overlaps:
2816 : //
2817 : // [---compaction_key_range---]
2818 : // [---A----][----B----][----C----][----D----]
2819 : //
2820 : // For delta layers, we will rewrite the layers so that it is cut exactly at
2821 : // the compaction key range, so we can always discard them. However, for image
2822 : // layers, as we do not rewrite them for now, we need to handle them differently.
2823 : // Assume image layers A, B, C, D are all in the `layer_selection`.
2824 : //
2825 : // The created image layers contain whatever is needed from B, C, and from
2826 : // `----]` of A, and from `[---` of D.
2827 : //
2828 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
2829 : // keep that data.
2830 : //
2831 : // The solution for now is to keep A and D completely if they are image layers.
2832 : // (layer_selection is what we'll remove from the layer map, so, retain what
2833 : // is _not_ fully covered by compaction_key_range).
2834 204 : for layer in &layer_selection {
2835 152 : if !layer.layer_desc().is_delta() {
2836 66 : if !overlaps_with(
2837 66 : &layer.layer_desc().key_range,
2838 66 : &job_desc.compaction_key_range,
2839 66 : ) {
2840 0 : bail!("violated constraint: image layer outside of compaction key range");
2841 66 : }
2842 66 : if !fully_contains(
2843 66 : &job_desc.compaction_key_range,
2844 66 : &layer.layer_desc().key_range,
2845 66 : ) {
2846 8 : keep_layers.insert(layer.layer_desc().key());
2847 58 : }
2848 86 : }
2849 : }
2850 :
2851 152 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2852 52 :
2853 52 : info!(
2854 0 : "gc-compaction statistics: {}",
2855 0 : serde_json::to_string(&stat)?
2856 : );
2857 :
2858 52 : if dry_run {
2859 4 : return Ok(());
2860 48 : }
2861 48 :
2862 48 : info!(
2863 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2864 0 : produced_delta_layers_len,
2865 0 : produced_image_layers_len,
2866 0 : layer_selection.len()
2867 : );
2868 :
2869 : // Step 3: Place back to the layer map.
2870 :
2871 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
2872 48 : let all_layers = {
2873 48 : let guard = self.layers.read().await;
2874 48 : let layer_map = guard.layer_map()?;
2875 48 : layer_map.iter_historic_layers().collect_vec()
2876 48 : };
2877 48 :
2878 48 : let mut final_layers = all_layers
2879 48 : .iter()
2880 214 : .map(|layer| layer.layer_name())
2881 48 : .collect::<HashSet<_>>();
2882 152 : for layer in &layer_selection {
2883 104 : final_layers.remove(&layer.layer_desc().layer_name());
2884 104 : }
2885 102 : for layer in &compact_to {
2886 54 : final_layers.insert(layer.layer_desc().layer_name());
2887 54 : }
2888 48 : let final_layers = final_layers.into_iter().collect_vec();
2889 :
2890 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
2891 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
2892 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
2893 48 : if let Some(err) = check_valid_layermap(&final_layers) {
2894 0 : bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
2895 48 : }
2896 :
2897 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
2898 : // operate on L1 layers.
2899 : {
2900 48 : let mut guard = self.layers.write().await;
2901 48 : guard
2902 48 : .open_mut()?
2903 48 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
2904 48 : };
2905 48 :
2906 48 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
2907 48 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
2908 48 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
2909 48 : // be batched into `schedule_compaction_update`.
2910 48 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
2911 48 : self.schedule_uploads(disk_consistent_lsn, None)?;
2912 48 : self.remote_client
2913 48 : .schedule_compaction_update(&layer_selection, &compact_to)?;
2914 :
2915 48 : drop(gc_lock);
2916 48 :
2917 48 : Ok(())
2918 54 : }
2919 : }
2920 :
2921 : struct TimelineAdaptor {
2922 : timeline: Arc<Timeline>,
2923 :
2924 : keyspace: (Lsn, KeySpace),
2925 :
2926 : new_deltas: Vec<ResidentLayer>,
2927 : new_images: Vec<ResidentLayer>,
2928 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
2929 : }
2930 :
2931 : impl TimelineAdaptor {
2932 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
2933 0 : Self {
2934 0 : timeline: timeline.clone(),
2935 0 : keyspace,
2936 0 : new_images: Vec::new(),
2937 0 : new_deltas: Vec::new(),
2938 0 : layers_to_delete: Vec::new(),
2939 0 : }
2940 0 : }
2941 :
2942 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
2943 0 : let layers_to_delete = {
2944 0 : let guard = self.timeline.layers.read().await;
2945 0 : self.layers_to_delete
2946 0 : .iter()
2947 0 : .map(|x| guard.get_from_desc(x))
2948 0 : .collect::<Vec<Layer>>()
2949 0 : };
2950 0 : self.timeline
2951 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
2952 0 : .await?;
2953 :
2954 0 : self.timeline
2955 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
2956 :
2957 0 : self.new_deltas.clear();
2958 0 : self.layers_to_delete.clear();
2959 0 : Ok(())
2960 0 : }
2961 : }
2962 :
2963 : #[derive(Clone)]
2964 : struct ResidentDeltaLayer(ResidentLayer);
2965 : #[derive(Clone)]
2966 : struct ResidentImageLayer(ResidentLayer);
2967 :
2968 : impl CompactionJobExecutor for TimelineAdaptor {
2969 : type Key = pageserver_api::key::Key;
2970 :
2971 : type Layer = OwnArc<PersistentLayerDesc>;
2972 : type DeltaLayer = ResidentDeltaLayer;
2973 : type ImageLayer = ResidentImageLayer;
2974 :
2975 : type RequestContext = crate::context::RequestContext;
2976 :
2977 0 : fn get_shard_identity(&self) -> &ShardIdentity {
2978 0 : self.timeline.get_shard_identity()
2979 0 : }
2980 :
2981 0 : async fn get_layers(
2982 0 : &mut self,
2983 0 : key_range: &Range<Key>,
2984 0 : lsn_range: &Range<Lsn>,
2985 0 : _ctx: &RequestContext,
2986 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
2987 0 : self.flush_updates().await?;
2988 :
2989 0 : let guard = self.timeline.layers.read().await;
2990 0 : let layer_map = guard.layer_map()?;
2991 :
2992 0 : let result = layer_map
2993 0 : .iter_historic_layers()
2994 0 : .filter(|l| {
2995 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
2996 0 : })
2997 0 : .map(OwnArc)
2998 0 : .collect();
2999 0 : Ok(result)
3000 0 : }
3001 :
3002 0 : async fn get_keyspace(
3003 0 : &mut self,
3004 0 : key_range: &Range<Key>,
3005 0 : lsn: Lsn,
3006 0 : _ctx: &RequestContext,
3007 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
3008 0 : if lsn == self.keyspace.0 {
3009 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
3010 0 : &self.keyspace.1.ranges,
3011 0 : key_range,
3012 0 : ))
3013 : } else {
3014 : // The current compaction implementation only ever requests the key space
3015 : // at the compaction end LSN.
3016 0 : anyhow::bail!("keyspace not available for requested lsn");
3017 : }
3018 0 : }
3019 :
3020 0 : async fn downcast_delta_layer(
3021 0 : &self,
3022 0 : layer: &OwnArc<PersistentLayerDesc>,
3023 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
3024 0 : // this is a lot more complex than a simple downcast...
3025 0 : if layer.is_delta() {
3026 0 : let l = {
3027 0 : let guard = self.timeline.layers.read().await;
3028 0 : guard.get_from_desc(layer)
3029 : };
3030 0 : let result = l.download_and_keep_resident().await?;
3031 :
3032 0 : Ok(Some(ResidentDeltaLayer(result)))
3033 : } else {
3034 0 : Ok(None)
3035 : }
3036 0 : }
3037 :
3038 0 : async fn create_image(
3039 0 : &mut self,
3040 0 : lsn: Lsn,
3041 0 : key_range: &Range<Key>,
3042 0 : ctx: &RequestContext,
3043 0 : ) -> anyhow::Result<()> {
3044 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
3045 0 : }
3046 :
3047 0 : async fn create_delta(
3048 0 : &mut self,
3049 0 : lsn_range: &Range<Lsn>,
3050 0 : key_range: &Range<Key>,
3051 0 : input_layers: &[ResidentDeltaLayer],
3052 0 : ctx: &RequestContext,
3053 0 : ) -> anyhow::Result<()> {
3054 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3055 :
3056 0 : let mut all_entries = Vec::new();
3057 0 : for dl in input_layers.iter() {
3058 0 : all_entries.extend(dl.load_keys(ctx).await?);
3059 : }
3060 :
3061 : // The current stdlib sorting implementation is designed in a way where it is
3062 : // particularly fast where the slice is made up of sorted sub-ranges.
3063 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3064 :
3065 0 : let mut writer = DeltaLayerWriter::new(
3066 0 : self.timeline.conf,
3067 0 : self.timeline.timeline_id,
3068 0 : self.timeline.tenant_shard_id,
3069 0 : key_range.start,
3070 0 : lsn_range.clone(),
3071 0 : ctx,
3072 0 : )
3073 0 : .await?;
3074 :
3075 0 : let mut dup_values = 0;
3076 0 :
3077 0 : // This iterator walks through all key-value pairs from all the layers
3078 0 : // we're compacting, in key, LSN order.
3079 0 : let mut prev: Option<(Key, Lsn)> = None;
3080 : for &DeltaEntry {
3081 0 : key, lsn, ref val, ..
3082 0 : } in all_entries.iter()
3083 : {
3084 0 : if prev == Some((key, lsn)) {
3085 : // This is a duplicate. Skip it.
3086 : //
3087 : // It can happen if compaction is interrupted after writing some
3088 : // layers but not all, and we are compacting the range again.
3089 : // The calculations in the algorithm assume that there are no
3090 : // duplicates, so the math on targeted file size is likely off,
3091 : // and we will create smaller files than expected.
3092 0 : dup_values += 1;
3093 0 : continue;
3094 0 : }
3095 :
3096 0 : let value = val.load(ctx).await?;
3097 :
3098 0 : writer.put_value(key, lsn, value, ctx).await?;
3099 :
3100 0 : prev = Some((key, lsn));
3101 : }
3102 :
3103 0 : if dup_values > 0 {
3104 0 : warn!("delta layer created with {} duplicate values", dup_values);
3105 0 : }
3106 :
3107 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3108 0 : Err(anyhow::anyhow!(
3109 0 : "failpoint delta-layer-writer-fail-before-finish"
3110 0 : ))
3111 0 : });
3112 :
3113 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
3114 0 : let new_delta_layer =
3115 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
3116 :
3117 0 : self.new_deltas.push(new_delta_layer);
3118 0 : Ok(())
3119 0 : }
3120 :
3121 0 : async fn delete_layer(
3122 0 : &mut self,
3123 0 : layer: &OwnArc<PersistentLayerDesc>,
3124 0 : _ctx: &RequestContext,
3125 0 : ) -> anyhow::Result<()> {
3126 0 : self.layers_to_delete.push(layer.clone().0);
3127 0 : Ok(())
3128 0 : }
3129 : }
3130 :
3131 : impl TimelineAdaptor {
3132 0 : async fn create_image_impl(
3133 0 : &mut self,
3134 0 : lsn: Lsn,
3135 0 : key_range: &Range<Key>,
3136 0 : ctx: &RequestContext,
3137 0 : ) -> Result<(), CreateImageLayersError> {
3138 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
3139 :
3140 0 : let image_layer_writer = ImageLayerWriter::new(
3141 0 : self.timeline.conf,
3142 0 : self.timeline.timeline_id,
3143 0 : self.timeline.tenant_shard_id,
3144 0 : key_range,
3145 0 : lsn,
3146 0 : ctx,
3147 0 : )
3148 0 : .await?;
3149 :
3150 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3151 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3152 0 : "failpoint image-layer-writer-fail-before-finish"
3153 0 : )))
3154 0 : });
3155 :
3156 0 : let keyspace = KeySpace {
3157 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
3158 : };
3159 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
3160 0 : let start = Key::MIN;
3161 : let ImageLayerCreationOutcome {
3162 0 : image,
3163 : next_start_key: _,
3164 0 : } = self
3165 0 : .timeline
3166 0 : .create_image_layer_for_rel_blocks(
3167 0 : &keyspace,
3168 0 : image_layer_writer,
3169 0 : lsn,
3170 0 : ctx,
3171 0 : key_range.clone(),
3172 0 : start,
3173 0 : )
3174 0 : .await?;
3175 :
3176 0 : if let Some(image_layer) = image {
3177 0 : self.new_images.push(image_layer);
3178 0 : }
3179 :
3180 0 : timer.stop_and_record();
3181 0 :
3182 0 : Ok(())
3183 0 : }
3184 : }
3185 :
3186 : impl CompactionRequestContext for crate::context::RequestContext {}
3187 :
3188 : #[derive(Debug, Clone)]
3189 : pub struct OwnArc<T>(pub Arc<T>);
3190 :
3191 : impl<T> Deref for OwnArc<T> {
3192 : type Target = <Arc<T> as Deref>::Target;
3193 0 : fn deref(&self) -> &Self::Target {
3194 0 : &self.0
3195 0 : }
3196 : }
3197 :
3198 : impl<T> AsRef<T> for OwnArc<T> {
3199 0 : fn as_ref(&self) -> &T {
3200 0 : self.0.as_ref()
3201 0 : }
3202 : }
3203 :
3204 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
3205 0 : fn key_range(&self) -> &Range<Key> {
3206 0 : &self.key_range
3207 0 : }
3208 0 : fn lsn_range(&self) -> &Range<Lsn> {
3209 0 : &self.lsn_range
3210 0 : }
3211 0 : fn file_size(&self) -> u64 {
3212 0 : self.file_size
3213 0 : }
3214 0 : fn short_id(&self) -> std::string::String {
3215 0 : self.as_ref().short_id().to_string()
3216 0 : }
3217 0 : fn is_delta(&self) -> bool {
3218 0 : self.as_ref().is_delta()
3219 0 : }
3220 : }
3221 :
3222 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
3223 0 : fn key_range(&self) -> &Range<Key> {
3224 0 : &self.layer_desc().key_range
3225 0 : }
3226 0 : fn lsn_range(&self) -> &Range<Lsn> {
3227 0 : &self.layer_desc().lsn_range
3228 0 : }
3229 0 : fn file_size(&self) -> u64 {
3230 0 : self.layer_desc().file_size
3231 0 : }
3232 0 : fn short_id(&self) -> std::string::String {
3233 0 : self.layer_desc().short_id().to_string()
3234 0 : }
3235 0 : fn is_delta(&self) -> bool {
3236 0 : true
3237 0 : }
3238 : }
3239 :
3240 : use crate::tenant::timeline::DeltaEntry;
3241 :
3242 : impl CompactionLayer<Key> for ResidentDeltaLayer {
3243 0 : fn key_range(&self) -> &Range<Key> {
3244 0 : &self.0.layer_desc().key_range
3245 0 : }
3246 0 : fn lsn_range(&self) -> &Range<Lsn> {
3247 0 : &self.0.layer_desc().lsn_range
3248 0 : }
3249 0 : fn file_size(&self) -> u64 {
3250 0 : self.0.layer_desc().file_size
3251 0 : }
3252 0 : fn short_id(&self) -> std::string::String {
3253 0 : self.0.layer_desc().short_id().to_string()
3254 0 : }
3255 0 : fn is_delta(&self) -> bool {
3256 0 : true
3257 0 : }
3258 : }
3259 :
3260 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
3261 : type DeltaEntry<'a> = DeltaEntry<'a>;
3262 :
3263 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
3264 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
3265 0 : }
3266 : }
3267 :
3268 : impl CompactionLayer<Key> for ResidentImageLayer {
3269 0 : fn key_range(&self) -> &Range<Key> {
3270 0 : &self.0.layer_desc().key_range
3271 0 : }
3272 0 : fn lsn_range(&self) -> &Range<Lsn> {
3273 0 : &self.0.layer_desc().lsn_range
3274 0 : }
3275 0 : fn file_size(&self) -> u64 {
3276 0 : self.0.layer_desc().file_size
3277 0 : }
3278 0 : fn short_id(&self) -> std::string::String {
3279 0 : self.0.layer_desc().short_id().to_string()
3280 0 : }
3281 0 : fn is_delta(&self) -> bool {
3282 0 : false
3283 0 : }
3284 : }
3285 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|