Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{
13 : CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, GetVectoredError,
14 : ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline,
15 : };
16 :
17 : use anyhow::{anyhow, bail, Context};
18 : use bytes::Bytes;
19 : use enumset::EnumSet;
20 : use fail::fail_point;
21 : use itertools::Itertools;
22 : use pageserver_api::key::KEY_SIZE;
23 : use pageserver_api::keyspace::ShardedRange;
24 : use pageserver_api::models::CompactInfoResponse;
25 : use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
26 : use serde::Serialize;
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::{debug, info, info_span, trace, warn, Instrument};
29 : use utils::critical;
30 : use utils::id::TimelineId;
31 :
32 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
33 : use crate::page_cache;
34 : use crate::statvfs::Statvfs;
35 : use crate::tenant::checks::check_valid_layermap;
36 : use crate::tenant::gc_block::GcBlock;
37 : use crate::tenant::layer_map::LayerMap;
38 : use crate::tenant::remote_timeline_client::WaitCompletionError;
39 : use crate::tenant::storage_layer::batch_split_writer::{
40 : BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
41 : };
42 : use crate::tenant::storage_layer::filter_iterator::FilterIterator;
43 : use crate::tenant::storage_layer::merge_iterator::MergeIterator;
44 : use crate::tenant::storage_layer::{
45 : AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
46 : };
47 : use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
48 : use crate::tenant::timeline::{ImageLayerCreationOutcome, IoConcurrency};
49 : use crate::tenant::timeline::{Layer, ResidentLayer};
50 : use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
51 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
52 : use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
53 :
54 : use pageserver_api::key::Key;
55 : use pageserver_api::keyspace::KeySpace;
56 : use pageserver_api::record::NeonWalRecord;
57 : use pageserver_api::value::Value;
58 :
59 : use utils::lsn::Lsn;
60 :
61 : use pageserver_compaction::helpers::{fully_contains, overlaps_with};
62 : use pageserver_compaction::interface::*;
63 :
64 : use super::CompactionError;
65 :
66 : /// Maximum number of deltas before generating an image layer in bottom-most compaction.
67 : const COMPACTION_DELTA_THRESHOLD: usize = 5;
68 :
69 : #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
70 : pub struct GcCompactionJobId(pub usize);
71 :
72 : impl std::fmt::Display for GcCompactionJobId {
73 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 0 : write!(f, "{}", self.0)
75 0 : }
76 : }
77 :
78 : #[derive(Debug, Clone)]
79 : pub enum GcCompactionQueueItem {
80 : Manual(CompactOptions),
81 : SubCompactionJob(CompactOptions),
82 : #[allow(dead_code)]
83 : UpdateL2Lsn(Lsn),
84 : Notify(GcCompactionJobId),
85 : }
86 :
87 : impl GcCompactionQueueItem {
88 0 : pub fn into_compact_info_resp(
89 0 : self,
90 0 : id: GcCompactionJobId,
91 0 : running: bool,
92 0 : ) -> Option<CompactInfoResponse> {
93 0 : match self {
94 0 : GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
95 0 : compact_key_range: options.compact_key_range,
96 0 : compact_lsn_range: options.compact_lsn_range,
97 0 : sub_compaction: options.sub_compaction,
98 0 : running,
99 0 : job_id: id.0,
100 0 : }),
101 0 : GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
102 0 : compact_key_range: options.compact_key_range,
103 0 : compact_lsn_range: options.compact_lsn_range,
104 0 : sub_compaction: options.sub_compaction,
105 0 : running,
106 0 : job_id: id.0,
107 0 : }),
108 0 : GcCompactionQueueItem::UpdateL2Lsn(_) => None,
109 0 : GcCompactionQueueItem::Notify(_) => None,
110 : }
111 0 : }
112 : }
113 :
114 : struct GcCompactionQueueInner {
115 : running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
116 : queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
117 : notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
118 : gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
119 : last_id: GcCompactionJobId,
120 : }
121 :
122 : impl GcCompactionQueueInner {
123 0 : fn next_id(&mut self) -> GcCompactionJobId {
124 0 : let id = self.last_id;
125 0 : self.last_id = GcCompactionJobId(id.0 + 1);
126 0 : id
127 0 : }
128 : }
129 :
130 : /// A structure to store gc_compaction jobs.
131 : pub struct GcCompactionQueue {
132 : /// All items in the queue, and the currently-running job.
133 : inner: std::sync::Mutex<GcCompactionQueueInner>,
134 : /// Ensure only one thread is consuming the queue.
135 : consumer_lock: tokio::sync::Mutex<()>,
136 : }
137 :
138 : impl GcCompactionQueue {
139 0 : pub fn new() -> Self {
140 0 : GcCompactionQueue {
141 0 : inner: std::sync::Mutex::new(GcCompactionQueueInner {
142 0 : running: None,
143 0 : queued: VecDeque::new(),
144 0 : notify: HashMap::new(),
145 0 : gc_guards: HashMap::new(),
146 0 : last_id: GcCompactionJobId(0),
147 0 : }),
148 0 : consumer_lock: tokio::sync::Mutex::new(()),
149 0 : }
150 0 : }
151 :
152 0 : pub fn cancel_scheduled(&self) {
153 0 : let mut guard = self.inner.lock().unwrap();
154 0 : guard.queued.clear();
155 0 : guard.notify.clear();
156 0 : guard.gc_guards.clear();
157 0 : }
158 :
159 : /// Schedule a manual compaction job.
160 0 : pub fn schedule_manual_compaction(
161 0 : &self,
162 0 : options: CompactOptions,
163 0 : notify: Option<tokio::sync::oneshot::Sender<()>>,
164 0 : ) -> GcCompactionJobId {
165 0 : let mut guard = self.inner.lock().unwrap();
166 0 : let id = guard.next_id();
167 0 : guard
168 0 : .queued
169 0 : .push_back((id, GcCompactionQueueItem::Manual(options)));
170 0 : if let Some(notify) = notify {
171 0 : guard.notify.insert(id, notify);
172 0 : }
173 0 : info!("scheduled compaction job id={}", id);
174 0 : id
175 0 : }
176 :
177 : /// Trigger an auto compaction.
178 : #[allow(dead_code)]
179 0 : pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
180 :
181 : /// Notify the caller the job has finished and unblock GC.
182 0 : fn notify_and_unblock(&self, id: GcCompactionJobId) {
183 0 : info!("compaction job id={} finished", id);
184 0 : let mut guard = self.inner.lock().unwrap();
185 0 : if let Some(blocking) = guard.gc_guards.remove(&id) {
186 0 : drop(blocking)
187 0 : }
188 0 : if let Some(tx) = guard.notify.remove(&id) {
189 0 : let _ = tx.send(());
190 0 : }
191 0 : }
192 :
193 0 : async fn handle_sub_compaction(
194 0 : &self,
195 0 : id: GcCompactionJobId,
196 0 : options: CompactOptions,
197 0 : timeline: &Arc<Timeline>,
198 0 : gc_block: &GcBlock,
199 0 : ) -> Result<(), CompactionError> {
200 0 : info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
201 0 : let jobs: Vec<GcCompactJob> = timeline
202 0 : .gc_compaction_split_jobs(
203 0 : GcCompactJob::from_compact_options(options.clone()),
204 0 : options.sub_compaction_max_job_size_mb,
205 0 : )
206 0 : .await
207 0 : .map_err(CompactionError::Other)?;
208 0 : if jobs.is_empty() {
209 0 : info!("no jobs to run, skipping scheduled compaction task");
210 0 : self.notify_and_unblock(id);
211 : } else {
212 0 : let gc_guard = match gc_block.start().await {
213 0 : Ok(guard) => guard,
214 0 : Err(e) => {
215 0 : return Err(CompactionError::Other(anyhow!(
216 0 : "cannot run gc-compaction because gc is blocked: {}",
217 0 : e
218 0 : )));
219 : }
220 : };
221 :
222 0 : let jobs_len = jobs.len();
223 0 : let mut pending_tasks = Vec::new();
224 0 : for job in jobs {
225 : // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
226 : // until we do further refactors to allow directly call `compact_with_gc`.
227 0 : let mut flags: EnumSet<CompactFlags> = EnumSet::default();
228 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
229 0 : if job.dry_run {
230 0 : flags |= CompactFlags::DryRun;
231 0 : }
232 0 : let options = CompactOptions {
233 0 : flags,
234 0 : sub_compaction: false,
235 0 : compact_key_range: Some(job.compact_key_range.into()),
236 0 : compact_lsn_range: Some(job.compact_lsn_range.into()),
237 0 : sub_compaction_max_job_size_mb: None,
238 0 : };
239 0 : pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
240 : }
241 0 : pending_tasks.push(GcCompactionQueueItem::Notify(id));
242 0 : {
243 0 : let mut guard = self.inner.lock().unwrap();
244 0 : guard.gc_guards.insert(id, gc_guard);
245 0 : let mut tasks = Vec::new();
246 0 : for task in pending_tasks {
247 0 : let id = guard.next_id();
248 0 : tasks.push((id, task));
249 0 : }
250 0 : tasks.reverse();
251 0 : for item in tasks {
252 0 : guard.queued.push_front(item);
253 0 : }
254 : }
255 0 : info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
256 : }
257 0 : Ok(())
258 0 : }
259 :
260 : /// Take a job from the queue and process it. Returns if there are still pending tasks.
261 0 : pub async fn iteration(
262 0 : &self,
263 0 : cancel: &CancellationToken,
264 0 : ctx: &RequestContext,
265 0 : gc_block: &GcBlock,
266 0 : timeline: &Arc<Timeline>,
267 0 : ) -> Result<CompactionOutcome, CompactionError> {
268 0 : let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
269 : let has_pending_tasks;
270 0 : let (id, item) = {
271 0 : let mut guard = self.inner.lock().unwrap();
272 0 : let Some((id, item)) = guard.queued.pop_front() else {
273 0 : return Ok(CompactionOutcome::Done);
274 : };
275 0 : guard.running = Some((id, item.clone()));
276 0 : has_pending_tasks = !guard.queued.is_empty();
277 0 : (id, item)
278 0 : };
279 0 :
280 0 : match item {
281 0 : GcCompactionQueueItem::Manual(options) => {
282 0 : if !options
283 0 : .flags
284 0 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
285 : {
286 0 : warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
287 0 : } else if options.sub_compaction {
288 0 : self.handle_sub_compaction(id, options, timeline, gc_block)
289 0 : .await?;
290 : } else {
291 0 : let gc_guard = match gc_block.start().await {
292 0 : Ok(guard) => guard,
293 0 : Err(e) => {
294 0 : return Err(CompactionError::Other(anyhow!(
295 0 : "cannot run gc-compaction because gc is blocked: {}",
296 0 : e
297 0 : )));
298 : }
299 : };
300 0 : {
301 0 : let mut guard = self.inner.lock().unwrap();
302 0 : guard.gc_guards.insert(id, gc_guard);
303 0 : }
304 0 : let _ = timeline
305 0 : .compact_with_options(cancel, options, ctx)
306 0 : .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
307 0 : .await?;
308 0 : self.notify_and_unblock(id);
309 : }
310 : }
311 0 : GcCompactionQueueItem::SubCompactionJob(options) => {
312 0 : let _ = timeline
313 0 : .compact_with_options(cancel, options, ctx)
314 0 : .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
315 0 : .await?;
316 : }
317 0 : GcCompactionQueueItem::Notify(id) => {
318 0 : self.notify_and_unblock(id);
319 0 : }
320 : GcCompactionQueueItem::UpdateL2Lsn(_) => {
321 0 : unreachable!()
322 : }
323 : }
324 0 : {
325 0 : let mut guard = self.inner.lock().unwrap();
326 0 : guard.running = None;
327 0 : }
328 0 : Ok(if has_pending_tasks {
329 0 : CompactionOutcome::Pending
330 : } else {
331 0 : CompactionOutcome::Done
332 : })
333 0 : }
334 :
335 : #[allow(clippy::type_complexity)]
336 0 : pub fn remaining_jobs(
337 0 : &self,
338 0 : ) -> (
339 0 : Option<(GcCompactionJobId, GcCompactionQueueItem)>,
340 0 : VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
341 0 : ) {
342 0 : let guard = self.inner.lock().unwrap();
343 0 : (guard.running.clone(), guard.queued.clone())
344 0 : }
345 :
346 : #[allow(dead_code)]
347 0 : pub fn remaining_jobs_num(&self) -> usize {
348 0 : let guard = self.inner.lock().unwrap();
349 0 : guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
350 0 : }
351 : }
352 :
353 : /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
354 : /// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets
355 : /// called.
356 : #[derive(Debug, Clone)]
357 : pub(crate) struct GcCompactJob {
358 : pub dry_run: bool,
359 : /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range
360 : /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary.
361 : pub compact_key_range: Range<Key>,
362 : /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be
363 : /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range
364 : /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`].
365 : /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here.
366 : pub compact_lsn_range: Range<Lsn>,
367 : }
368 :
369 : impl GcCompactJob {
370 108 : pub fn from_compact_options(options: CompactOptions) -> Self {
371 108 : GcCompactJob {
372 108 : dry_run: options.flags.contains(CompactFlags::DryRun),
373 108 : compact_key_range: options
374 108 : .compact_key_range
375 108 : .map(|x| x.into())
376 108 : .unwrap_or(Key::MIN..Key::MAX),
377 108 : compact_lsn_range: options
378 108 : .compact_lsn_range
379 108 : .map(|x| x.into())
380 108 : .unwrap_or(Lsn::INVALID..Lsn::MAX),
381 108 : }
382 108 : }
383 : }
384 :
385 : /// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called
386 : /// and contains the exact layers we want to compact.
387 : pub struct GcCompactionJobDescription {
388 : /// All layers to read in the compaction job
389 : selected_layers: Vec<Layer>,
390 : /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to
391 : /// keep all deltas <= this LSN or generate an image == this LSN.
392 : gc_cutoff: Lsn,
393 : /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or
394 : /// generate an image == this LSN.
395 : retain_lsns_below_horizon: Vec<Lsn>,
396 : /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data
397 : /// \>= this LSN will be kept and will not be rewritten.
398 : max_layer_lsn: Lsn,
399 : /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive.
400 : /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of
401 : /// k-merge within gc-compaction.
402 : min_layer_lsn: Lsn,
403 : /// Only compact layers overlapping with this range.
404 : compaction_key_range: Range<Key>,
405 : /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap.
406 : /// This field is here solely for debugging. The field will not be read once the compaction
407 : /// description is generated.
408 : rewrite_layers: Vec<Arc<PersistentLayerDesc>>,
409 : }
410 :
411 : /// The result of bottom-most compaction for a single key at each LSN.
412 : #[derive(Debug)]
413 : #[cfg_attr(test, derive(PartialEq))]
414 : pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
415 :
416 : /// The result of bottom-most compaction.
417 : #[derive(Debug)]
418 : #[cfg_attr(test, derive(PartialEq))]
419 : pub(crate) struct KeyHistoryRetention {
420 : /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
421 : pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
422 : /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
423 : pub(crate) above_horizon: KeyLogAtLsn,
424 : }
425 :
426 : impl KeyHistoryRetention {
427 : /// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
428 : ///
429 : /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
430 : /// For example, consider the case where a single delta with range [0x10,0x50) exists.
431 : /// And we have branches at LSN 0x10, 0x20, 0x30.
432 : /// Then we delete branch @ 0x20.
433 : /// Bottom-most compaction may now delete the delta [0x20,0x30).
434 : /// And that wouldnt' change the shape of the layer.
435 : ///
436 : /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
437 : ///
438 : /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
439 148 : async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
440 148 : if dry_run {
441 0 : return true;
442 148 : }
443 148 : if LayerMap::is_l0(&key.key_range, key.is_delta) {
444 : // gc-compaction should not produce L0 deltas, otherwise it will break the layer order.
445 : // We should ignore such layers.
446 0 : return true;
447 148 : }
448 : let layer_generation;
449 : {
450 148 : let guard = tline.layers.read().await;
451 148 : if !guard.contains_key(key) {
452 104 : return false;
453 44 : }
454 44 : layer_generation = guard.get_from_key(key).metadata().generation;
455 44 : }
456 44 : if layer_generation == tline.generation {
457 44 : info!(
458 : key=%key,
459 : ?layer_generation,
460 0 : "discard layer due to duplicated layer key in the same generation",
461 : );
462 44 : true
463 : } else {
464 0 : false
465 : }
466 148 : }
467 :
468 : /// Pipe a history of a single key to the writers.
469 : ///
470 : /// If `image_writer` is none, the images will be placed into the delta layers.
471 : /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
472 : #[allow(clippy::too_many_arguments)]
473 1244 : async fn pipe_to(
474 1244 : self,
475 1244 : key: Key,
476 1244 : delta_writer: &mut SplitDeltaLayerWriter,
477 1244 : mut image_writer: Option<&mut SplitImageLayerWriter>,
478 1244 : stat: &mut CompactionStatistics,
479 1244 : ctx: &RequestContext,
480 1244 : ) -> anyhow::Result<()> {
481 1244 : let mut first_batch = true;
482 4024 : for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
483 2780 : if first_batch {
484 1244 : if logs.len() == 1 && logs[0].1.is_image() {
485 1168 : let Value::Image(img) = &logs[0].1 else {
486 0 : unreachable!()
487 : };
488 1168 : stat.produce_image_key(img);
489 1168 : if let Some(image_writer) = image_writer.as_mut() {
490 1168 : image_writer.put_image(key, img.clone(), ctx).await?;
491 : } else {
492 0 : delta_writer
493 0 : .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
494 0 : .await?;
495 : }
496 : } else {
497 132 : for (lsn, val) in logs {
498 56 : stat.produce_key(&val);
499 56 : delta_writer.put_value(key, lsn, val, ctx).await?;
500 : }
501 : }
502 1244 : first_batch = false;
503 : } else {
504 1768 : for (lsn, val) in logs {
505 232 : stat.produce_key(&val);
506 232 : delta_writer.put_value(key, lsn, val, ctx).await?;
507 : }
508 : }
509 : }
510 1244 : let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
511 1360 : for (lsn, val) in above_horizon_logs {
512 116 : stat.produce_key(&val);
513 116 : delta_writer.put_value(key, lsn, val, ctx).await?;
514 : }
515 1244 : Ok(())
516 1244 : }
517 : }
518 :
519 : #[derive(Debug, Serialize, Default)]
520 : struct CompactionStatisticsNumSize {
521 : num: u64,
522 : size: u64,
523 : }
524 :
525 : #[derive(Debug, Serialize, Default)]
526 : pub struct CompactionStatistics {
527 : delta_layer_visited: CompactionStatisticsNumSize,
528 : image_layer_visited: CompactionStatisticsNumSize,
529 : delta_layer_produced: CompactionStatisticsNumSize,
530 : image_layer_produced: CompactionStatisticsNumSize,
531 : num_delta_layer_discarded: usize,
532 : num_image_layer_discarded: usize,
533 : num_unique_keys_visited: usize,
534 : wal_keys_visited: CompactionStatisticsNumSize,
535 : image_keys_visited: CompactionStatisticsNumSize,
536 : wal_produced: CompactionStatisticsNumSize,
537 : image_produced: CompactionStatisticsNumSize,
538 : }
539 :
540 : impl CompactionStatistics {
541 2084 : fn estimated_size_of_value(val: &Value) -> usize {
542 864 : match val {
543 1220 : Value::Image(img) => img.len(),
544 0 : Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
545 864 : _ => std::mem::size_of::<NeonWalRecord>(),
546 : }
547 2084 : }
548 3272 : fn estimated_size_of_key() -> usize {
549 3272 : KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
550 3272 : }
551 172 : fn visit_delta_layer(&mut self, size: u64) {
552 172 : self.delta_layer_visited.num += 1;
553 172 : self.delta_layer_visited.size += size;
554 172 : }
555 132 : fn visit_image_layer(&mut self, size: u64) {
556 132 : self.image_layer_visited.num += 1;
557 132 : self.image_layer_visited.size += size;
558 132 : }
559 1244 : fn on_unique_key_visited(&mut self) {
560 1244 : self.num_unique_keys_visited += 1;
561 1244 : }
562 480 : fn visit_wal_key(&mut self, val: &Value) {
563 480 : self.wal_keys_visited.num += 1;
564 480 : self.wal_keys_visited.size +=
565 480 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
566 480 : }
567 1220 : fn visit_image_key(&mut self, val: &Value) {
568 1220 : self.image_keys_visited.num += 1;
569 1220 : self.image_keys_visited.size +=
570 1220 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
571 1220 : }
572 404 : fn produce_key(&mut self, val: &Value) {
573 404 : match val {
574 20 : Value::Image(img) => self.produce_image_key(img),
575 384 : Value::WalRecord(_) => self.produce_wal_key(val),
576 : }
577 404 : }
578 384 : fn produce_wal_key(&mut self, val: &Value) {
579 384 : self.wal_produced.num += 1;
580 384 : self.wal_produced.size +=
581 384 : Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
582 384 : }
583 1188 : fn produce_image_key(&mut self, val: &Bytes) {
584 1188 : self.image_produced.num += 1;
585 1188 : self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
586 1188 : }
587 28 : fn discard_delta_layer(&mut self) {
588 28 : self.num_delta_layer_discarded += 1;
589 28 : }
590 16 : fn discard_image_layer(&mut self) {
591 16 : self.num_image_layer_discarded += 1;
592 16 : }
593 44 : fn produce_delta_layer(&mut self, size: u64) {
594 44 : self.delta_layer_produced.num += 1;
595 44 : self.delta_layer_produced.size += size;
596 44 : }
597 60 : fn produce_image_layer(&mut self, size: u64) {
598 60 : self.image_layer_produced.num += 1;
599 60 : self.image_layer_produced.size += size;
600 60 : }
601 : }
602 :
603 : #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
604 : pub enum CompactionOutcome {
605 : #[default]
606 : /// No layers need to be compacted after this round. Compaction doesn't need
607 : /// to be immediately scheduled.
608 : Done,
609 : /// Still has pending layers to be compacted after this round. Ideally, the scheduler
610 : /// should immediately schedule another compaction.
611 : Pending,
612 : /// A timeline needs L0 compaction. Yield and schedule an immediate L0 compaction pass (only
613 : /// guaranteed when `compaction_l0_first` is enabled).
614 : YieldForL0,
615 : /// Compaction was skipped, because the timeline is ineligible for compaction.
616 : Skipped,
617 : }
618 :
619 : impl Timeline {
620 : /// TODO: cancellation
621 : ///
622 : /// Returns whether the compaction has pending tasks.
623 768 : pub(crate) async fn compact_legacy(
624 768 : self: &Arc<Self>,
625 768 : cancel: &CancellationToken,
626 768 : options: CompactOptions,
627 768 : ctx: &RequestContext,
628 768 : ) -> Result<CompactionOutcome, CompactionError> {
629 768 : if options
630 768 : .flags
631 768 : .contains(CompactFlags::EnhancedGcBottomMostCompaction)
632 : {
633 0 : self.compact_with_gc(cancel, options, ctx)
634 0 : .await
635 0 : .map_err(CompactionError::Other)?;
636 0 : return Ok(CompactionOutcome::Done);
637 768 : }
638 768 :
639 768 : if options.flags.contains(CompactFlags::DryRun) {
640 0 : return Err(CompactionError::Other(anyhow!(
641 0 : "dry-run mode is not supported for legacy compaction for now"
642 0 : )));
643 768 : }
644 768 :
645 768 : if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() {
646 : // maybe useful in the future? could implement this at some point
647 0 : return Err(CompactionError::Other(anyhow!(
648 0 : "compaction range is not supported for legacy compaction for now"
649 0 : )));
650 768 : }
651 768 :
652 768 : // High level strategy for compaction / image creation:
653 768 : //
654 768 : // 1. First, do a L0 compaction to ensure we move the L0
655 768 : // layers into the historic layer map get flat levels of
656 768 : // layers. If we did not compact all L0 layers, we will
657 768 : // prioritize compacting the timeline again and not do
658 768 : // any of the compactions below.
659 768 : //
660 768 : // 2. Then, calculate the desired "partitioning" of the
661 768 : // currently in-use key space. The goal is to partition the
662 768 : // key space into roughly fixed-size chunks, but also take into
663 768 : // account any existing image layers, and try to align the
664 768 : // chunk boundaries with the existing image layers to avoid
665 768 : // too much churn. Also try to align chunk boundaries with
666 768 : // relation boundaries. In principle, we don't know about
667 768 : // relation boundaries here, we just deal with key-value
668 768 : // pairs, and the code in pgdatadir_mapping.rs knows how to
669 768 : // map relations into key-value pairs. But in practice we know
670 768 : // that 'field6' is the block number, and the fields 1-5
671 768 : // identify a relation. This is just an optimization,
672 768 : // though.
673 768 : //
674 768 : // 3. Once we know the partitioning, for each partition,
675 768 : // decide if it's time to create a new image layer. The
676 768 : // criteria is: there has been too much "churn" since the last
677 768 : // image layer? The "churn" is fuzzy concept, it's a
678 768 : // combination of too many delta files, or too much WAL in
679 768 : // total in the delta file. Or perhaps: if creating an image
680 768 : // file would allow to delete some older files.
681 768 : //
682 768 : // 4. In the end, if the tenant gets auto-sharded, we will run
683 768 : // a shard-ancestor compaction.
684 768 :
685 768 : // Is the timeline being deleted?
686 768 : if self.is_stopping() {
687 0 : trace!("Dropping out of compaction on timeline shutdown");
688 0 : return Err(CompactionError::ShuttingDown);
689 768 : }
690 768 :
691 768 : let target_file_size = self.get_checkpoint_distance();
692 :
693 : // Define partitioning schema if needed
694 :
695 768 : let l0_l1_boundary_lsn = {
696 : // We do the repartition on the L0-L1 boundary. All data below the boundary
697 : // are compacted by L0 with low read amplification, thus making the `repartition`
698 : // function run fast.
699 768 : let guard = self.layers.read().await;
700 768 : let l0_min_lsn = guard
701 768 : .layer_map()?
702 768 : .level0_deltas()
703 768 : .iter()
704 3212 : .map(|l| l.get_lsn_range().start)
705 768 : .min()
706 768 : .unwrap_or(self.get_disk_consistent_lsn());
707 768 : l0_min_lsn.max(self.get_ancestor_lsn())
708 : };
709 :
710 : // 1. L0 Compact
711 768 : let l0_outcome = {
712 768 : let timer = self.metrics.compact_time_histo.start_timer();
713 768 : let l0_outcome = self
714 768 : .compact_level0(
715 768 : target_file_size,
716 768 : options.flags.contains(CompactFlags::ForceL0Compaction),
717 768 : ctx,
718 768 : )
719 768 : .await?;
720 768 : timer.stop_and_record();
721 768 : l0_outcome
722 768 : };
723 768 :
724 768 : if options.flags.contains(CompactFlags::OnlyL0Compaction) {
725 0 : return Ok(l0_outcome);
726 768 : }
727 768 :
728 768 : // Yield if we have pending L0 compaction. The scheduler will do another pass.
729 768 : if (l0_outcome == CompactionOutcome::Pending || l0_outcome == CompactionOutcome::YieldForL0)
730 0 : && !options.flags.contains(CompactFlags::NoYield)
731 : {
732 0 : info!("image/ancestor compaction yielding for L0 compaction");
733 0 : return Ok(CompactionOutcome::YieldForL0);
734 768 : }
735 768 :
736 768 : if l0_l1_boundary_lsn < self.partitioning.read().1 {
737 : // We never go backwards when repartition and create image layers.
738 0 : info!("skipping image layer generation because repartition LSN is greater than L0-L1 boundary LSN.");
739 : } else {
740 : // 2. Repartition and create image layers if necessary
741 768 : match self
742 768 : .repartition(
743 768 : l0_l1_boundary_lsn,
744 768 : self.get_compaction_target_size(),
745 768 : options.flags,
746 768 : ctx,
747 768 : )
748 768 : .await
749 : {
750 768 : Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
751 768 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
752 768 : let image_ctx = RequestContextBuilder::extend(ctx)
753 768 : .access_stats_behavior(AccessStatsBehavior::Skip)
754 768 : .build();
755 768 :
756 768 : let mut partitioning = dense_partitioning;
757 768 : partitioning
758 768 : .parts
759 768 : .extend(sparse_partitioning.into_dense().parts);
760 :
761 : // 3. Create new image layers for partitions that have been modified "enough".
762 768 : let (image_layers, outcome) = self
763 768 : .create_image_layers(
764 768 : &partitioning,
765 768 : lsn,
766 768 : if options
767 768 : .flags
768 768 : .contains(CompactFlags::ForceImageLayerCreation)
769 : {
770 28 : ImageLayerCreationMode::Force
771 : } else {
772 740 : ImageLayerCreationMode::Try
773 : },
774 768 : &image_ctx,
775 768 : self.last_image_layer_creation_status
776 768 : .load()
777 768 : .as_ref()
778 768 : .clone(),
779 768 : !options.flags.contains(CompactFlags::NoYield),
780 768 : )
781 768 : .await
782 768 : .inspect_err(|err| {
783 : if let CreateImageLayersError::GetVectoredError(
784 : GetVectoredError::MissingKey(_),
785 0 : ) = err
786 : {
787 0 : critical!("missing key during compaction: {err:?}");
788 0 : }
789 768 : })?;
790 :
791 768 : self.last_image_layer_creation_status
792 768 : .store(Arc::new(outcome.clone()));
793 768 :
794 768 : self.upload_new_image_layers(image_layers)?;
795 768 : if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
796 : // Yield and do not do any other kind of compaction.
797 0 : info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
798 0 : return Ok(CompactionOutcome::YieldForL0);
799 768 : }
800 : }
801 0 : Err(err) => {
802 0 : // no partitioning? This is normal, if the timeline was just created
803 0 : // as an empty timeline. Also in unit tests, when we use the timeline
804 0 : // as a simple key-value store, ignoring the datadir layout. Log the
805 0 : // error but continue.
806 0 : //
807 0 : // Suppress error when it's due to cancellation
808 0 : if !self.cancel.is_cancelled() && !err.is_cancelled() {
809 0 : tracing::error!(
810 0 : "could not compact, repartitioning keyspace failed: {err:?}"
811 : );
812 0 : }
813 : }
814 : };
815 : }
816 :
817 768 : let partition_count = self.partitioning.read().0 .0.parts.len();
818 768 :
819 768 : // 4. Shard ancestor compaction
820 768 :
821 768 : if self.shard_identity.count >= ShardCount::new(2) {
822 : // Limit the number of layer rewrites to the number of partitions: this means its
823 : // runtime should be comparable to a full round of image layer creations, rather than
824 : // being potentially much longer.
825 0 : let rewrite_max = partition_count;
826 0 :
827 0 : self.compact_shard_ancestors(rewrite_max, ctx).await?;
828 768 : }
829 :
830 768 : Ok(CompactionOutcome::Done)
831 768 : }
832 :
833 : /// Check for layers that are elegible to be rewritten:
834 : /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
835 : /// we don't indefinitely retain keys in this shard that aren't needed.
836 : /// - For future use: layers beyond pitr_interval that are in formats we would
837 : /// rather not maintain compatibility with indefinitely.
838 : ///
839 : /// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
840 : /// how much work it will try to do in each compaction pass.
841 0 : async fn compact_shard_ancestors(
842 0 : self: &Arc<Self>,
843 0 : rewrite_max: usize,
844 0 : ctx: &RequestContext,
845 0 : ) -> Result<(), CompactionError> {
846 0 : let mut drop_layers = Vec::new();
847 0 : let mut layers_to_rewrite: Vec<Layer> = Vec::new();
848 0 :
849 0 : // We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
850 0 : // layer is behind this Lsn, it indicates that the layer is being retained beyond the
851 0 : // pitr_interval, for example because a branchpoint references it.
852 0 : //
853 0 : // Holding this read guard also blocks [`Self::gc_timeline`] from entering while we
854 0 : // are rewriting layers.
855 0 : let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
856 0 :
857 0 : tracing::info!(
858 0 : "latest_gc_cutoff: {}, pitr cutoff {}",
859 0 : *latest_gc_cutoff,
860 0 : self.gc_info.read().unwrap().cutoffs.time
861 : );
862 :
863 0 : let layers = self.layers.read().await;
864 0 : for layer_desc in layers.layer_map()?.iter_historic_layers() {
865 0 : let layer = layers.get_from_desc(&layer_desc);
866 0 : if layer.metadata().shard.shard_count == self.shard_identity.count {
867 : // This layer does not belong to a historic ancestor, no need to re-image it.
868 0 : continue;
869 0 : }
870 0 :
871 0 : // This layer was created on an ancestor shard: check if it contains any data for this shard.
872 0 : let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
873 0 : let layer_local_page_count = sharded_range.page_count();
874 0 : let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
875 0 : if layer_local_page_count == 0 {
876 : // This ancestral layer only covers keys that belong to other shards.
877 : // We include the full metadata in the log: if we had some critical bug that caused
878 : // us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
879 0 : info!(%layer, old_metadata=?layer.metadata(),
880 0 : "dropping layer after shard split, contains no keys for this shard.",
881 : );
882 :
883 0 : if cfg!(debug_assertions) {
884 : // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
885 : // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
886 : // should be !is_key_disposable()
887 0 : let range = layer_desc.get_key_range();
888 0 : let mut key = range.start;
889 0 : while key < range.end {
890 0 : debug_assert!(self.shard_identity.is_key_disposable(&key));
891 0 : key = key.next();
892 : }
893 0 : }
894 :
895 0 : drop_layers.push(layer);
896 0 : continue;
897 0 : } else if layer_local_page_count != u32::MAX
898 0 : && layer_local_page_count == layer_raw_page_count
899 : {
900 0 : debug!(%layer,
901 0 : "layer is entirely shard local ({} keys), no need to filter it",
902 : layer_local_page_count
903 : );
904 0 : continue;
905 0 : }
906 0 :
907 0 : // Don't bother re-writing a layer unless it will at least halve its size
908 0 : if layer_local_page_count != u32::MAX
909 0 : && layer_local_page_count > layer_raw_page_count / 2
910 : {
911 0 : debug!(%layer,
912 0 : "layer is already mostly local ({}/{}), not rewriting",
913 : layer_local_page_count,
914 : layer_raw_page_count
915 : );
916 0 : }
917 :
918 : // Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
919 : // without incurring the I/O cost of a rewrite.
920 0 : if layer_desc.get_lsn_range().end >= *latest_gc_cutoff {
921 0 : debug!(%layer, "Skipping rewrite of layer still in GC window ({} >= {})",
922 0 : layer_desc.get_lsn_range().end, *latest_gc_cutoff);
923 0 : continue;
924 0 : }
925 0 :
926 0 : if layer_desc.is_delta() {
927 : // We do not yet implement rewrite of delta layers
928 0 : debug!(%layer, "Skipping rewrite of delta layer");
929 0 : continue;
930 0 : }
931 0 :
932 0 : // Only rewrite layers if their generations differ. This guarantees:
933 0 : // - that local rewrite is safe, as local layer paths will differ between existing layer and rewritten one
934 0 : // - that the layer is persistent in remote storage, as we only see old-generation'd layer via loading from remote storage
935 0 : if layer.metadata().generation == self.generation {
936 0 : debug!(%layer, "Skipping rewrite, is not from old generation");
937 0 : continue;
938 0 : }
939 0 :
940 0 : if layers_to_rewrite.len() >= rewrite_max {
941 0 : tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
942 0 : layers_to_rewrite.len()
943 : );
944 0 : continue;
945 0 : }
946 0 :
947 0 : // Fall through: all our conditions for doing a rewrite passed.
948 0 : layers_to_rewrite.push(layer);
949 : }
950 :
951 : // Drop read lock on layer map before we start doing time-consuming I/O
952 0 : drop(layers);
953 0 :
954 0 : let mut replace_image_layers = Vec::new();
955 :
956 0 : for layer in layers_to_rewrite {
957 0 : tracing::info!(layer=%layer, "Rewriting layer after shard split...");
958 0 : let mut image_layer_writer = ImageLayerWriter::new(
959 0 : self.conf,
960 0 : self.timeline_id,
961 0 : self.tenant_shard_id,
962 0 : &layer.layer_desc().key_range,
963 0 : layer.layer_desc().image_layer_lsn(),
964 0 : ctx,
965 0 : )
966 0 : .await
967 0 : .map_err(CompactionError::Other)?;
968 :
969 : // Safety of layer rewrites:
970 : // - We are writing to a different local file path than we are reading from, so the old Layer
971 : // cannot interfere with the new one.
972 : // - In the page cache, contents for a particular VirtualFile are stored with a file_id that
973 : // is different for two layers with the same name (in `ImageLayerInner::new` we always
974 : // acquire a fresh id from [`crate::page_cache::next_file_id`]. So readers do not risk
975 : // reading the index from one layer file, and then data blocks from the rewritten layer file.
976 : // - Any readers that have a reference to the old layer will keep it alive until they are done
977 : // with it. If they are trying to promote from remote storage, that will fail, but this is the same
978 : // as for compaction generally: compaction is allowed to delete layers that readers might be trying to use.
979 : // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
980 : // - GC, which at worst witnesses us "undelete" a layer that they just deleted.
981 : // - ingestion, which only inserts layers, therefore cannot collide with us.
982 0 : let resident = layer.download_and_keep_resident().await?;
983 :
984 0 : let keys_written = resident
985 0 : .filter(&self.shard_identity, &mut image_layer_writer, ctx)
986 0 : .await?;
987 :
988 0 : if keys_written > 0 {
989 0 : let (desc, path) = image_layer_writer
990 0 : .finish(ctx)
991 0 : .await
992 0 : .map_err(CompactionError::Other)?;
993 0 : let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
994 0 : .map_err(CompactionError::Other)?;
995 0 : tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
996 0 : layer.metadata().file_size,
997 0 : new_layer.metadata().file_size);
998 :
999 0 : replace_image_layers.push((layer, new_layer));
1000 0 : } else {
1001 0 : // Drop the old layer. Usually for this case we would already have noticed that
1002 0 : // the layer has no data for us with the ShardedRange check above, but
1003 0 : drop_layers.push(layer);
1004 0 : }
1005 : }
1006 :
1007 : // At this point, we have replaced local layer files with their rewritten form, but not yet uploaded
1008 : // metadata to reflect that. If we restart here, the replaced layer files will look invalid (size mismatch
1009 : // to remote index) and be removed. This is inefficient but safe.
1010 0 : fail::fail_point!("compact-shard-ancestors-localonly");
1011 0 :
1012 0 : // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
1013 0 : self.rewrite_layers(replace_image_layers, drop_layers)
1014 0 : .await?;
1015 :
1016 0 : fail::fail_point!("compact-shard-ancestors-enqueued");
1017 0 :
1018 0 : // We wait for all uploads to complete before finishing this compaction stage. This is not
1019 0 : // necessary for correctness, but it simplifies testing, and avoids proceeding with another
1020 0 : // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
1021 0 : // load.
1022 0 : match self.remote_client.wait_completion().await {
1023 0 : Ok(()) => (),
1024 0 : Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
1025 : Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
1026 0 : return Err(CompactionError::ShuttingDown)
1027 : }
1028 : }
1029 :
1030 0 : fail::fail_point!("compact-shard-ancestors-persistent");
1031 0 :
1032 0 : Ok(())
1033 0 : }
1034 :
1035 : /// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
1036 : /// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
1037 : /// purpose of the visibility hint is to record which layers need to be available to service reads.
1038 : ///
1039 : /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
1040 : /// that we know won't be needed for reads.
1041 456 : pub(super) async fn update_layer_visibility(
1042 456 : &self,
1043 456 : ) -> Result<(), super::layer_manager::Shutdown> {
1044 456 : let head_lsn = self.get_last_record_lsn();
1045 :
1046 : // We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
1047 : // are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
1048 : // Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
1049 : // they will be subject to L0->L1 compaction in the near future.
1050 456 : let layer_manager = self.layers.read().await;
1051 456 : let layer_map = layer_manager.layer_map()?;
1052 :
1053 456 : let readable_points = {
1054 456 : let children = self.gc_info.read().unwrap().retain_lsns.clone();
1055 456 :
1056 456 : let mut readable_points = Vec::with_capacity(children.len() + 1);
1057 456 : for (child_lsn, _child_timeline_id, is_offloaded) in &children {
1058 0 : if *is_offloaded == MaybeOffloaded::Yes {
1059 0 : continue;
1060 0 : }
1061 0 : readable_points.push(*child_lsn);
1062 : }
1063 456 : readable_points.push(head_lsn);
1064 456 : readable_points
1065 456 : };
1066 456 :
1067 456 : let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
1068 1140 : for (layer_desc, visibility) in layer_visibility {
1069 684 : // FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
1070 684 : let layer = layer_manager.get_from_desc(&layer_desc);
1071 684 : layer.set_visibility(visibility);
1072 684 : }
1073 :
1074 : // TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
1075 : // avoid assuming that everything at a branch point is visible.
1076 456 : drop(covered);
1077 456 : Ok(())
1078 456 : }
1079 :
1080 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
1081 : /// as Level 1 files. Returns whether the L0 layers are fully compacted.
1082 768 : async fn compact_level0(
1083 768 : self: &Arc<Self>,
1084 768 : target_file_size: u64,
1085 768 : force_compaction_ignore_threshold: bool,
1086 768 : ctx: &RequestContext,
1087 768 : ) -> Result<CompactionOutcome, CompactionError> {
1088 : let CompactLevel0Phase1Result {
1089 768 : new_layers,
1090 768 : deltas_to_compact,
1091 768 : outcome,
1092 : } = {
1093 768 : let phase1_span = info_span!("compact_level0_phase1");
1094 768 : let ctx = ctx.attached_child();
1095 768 : let mut stats = CompactLevel0Phase1StatsBuilder {
1096 768 : version: Some(2),
1097 768 : tenant_id: Some(self.tenant_shard_id),
1098 768 : timeline_id: Some(self.timeline_id),
1099 768 : ..Default::default()
1100 768 : };
1101 768 :
1102 768 : let begin = tokio::time::Instant::now();
1103 768 : let phase1_layers_locked = self.layers.read().await;
1104 768 : let now = tokio::time::Instant::now();
1105 768 : stats.read_lock_acquisition_micros =
1106 768 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
1107 768 : self.compact_level0_phase1(
1108 768 : phase1_layers_locked,
1109 768 : stats,
1110 768 : target_file_size,
1111 768 : force_compaction_ignore_threshold,
1112 768 : &ctx,
1113 768 : )
1114 768 : .instrument(phase1_span)
1115 768 : .await?
1116 : };
1117 :
1118 768 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
1119 : // nothing to do
1120 676 : return Ok(CompactionOutcome::Done);
1121 92 : }
1122 92 :
1123 92 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
1124 92 : .await?;
1125 92 : Ok(outcome)
1126 768 : }
1127 :
1128 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
1129 768 : async fn compact_level0_phase1<'a>(
1130 768 : self: &'a Arc<Self>,
1131 768 : guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
1132 768 : mut stats: CompactLevel0Phase1StatsBuilder,
1133 768 : target_file_size: u64,
1134 768 : force_compaction_ignore_threshold: bool,
1135 768 : ctx: &RequestContext,
1136 768 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
1137 768 : stats.read_lock_held_spawn_blocking_startup_micros =
1138 768 : stats.read_lock_acquisition_micros.till_now(); // set by caller
1139 768 : let layers = guard.layer_map()?;
1140 768 : let level0_deltas = layers.level0_deltas();
1141 768 : stats.level0_deltas_count = Some(level0_deltas.len());
1142 768 :
1143 768 : // Only compact if enough layers have accumulated.
1144 768 : let threshold = self.get_compaction_threshold();
1145 768 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
1146 716 : if force_compaction_ignore_threshold {
1147 48 : if !level0_deltas.is_empty() {
1148 40 : info!(
1149 0 : level0_deltas = level0_deltas.len(),
1150 0 : threshold, "too few deltas to compact, but forcing compaction"
1151 : );
1152 : } else {
1153 8 : info!(
1154 0 : level0_deltas = level0_deltas.len(),
1155 0 : threshold, "too few deltas to compact, cannot force compaction"
1156 : );
1157 8 : return Ok(CompactLevel0Phase1Result::default());
1158 : }
1159 : } else {
1160 668 : debug!(
1161 0 : level0_deltas = level0_deltas.len(),
1162 0 : threshold, "too few deltas to compact"
1163 : );
1164 668 : return Ok(CompactLevel0Phase1Result::default());
1165 : }
1166 52 : }
1167 :
1168 92 : let mut level0_deltas = level0_deltas
1169 92 : .iter()
1170 804 : .map(|x| guard.get_from_desc(x))
1171 92 : .collect::<Vec<_>>();
1172 92 :
1173 92 : // Gather the files to compact in this iteration.
1174 92 : //
1175 92 : // Start with the oldest Level 0 delta file, and collect any other
1176 92 : // level 0 files that form a contiguous sequence, such that the end
1177 92 : // LSN of previous file matches the start LSN of the next file.
1178 92 : //
1179 92 : // Note that if the files don't form such a sequence, we might
1180 92 : // "compact" just a single file. That's a bit pointless, but it allows
1181 92 : // us to get rid of the level 0 file, and compact the other files on
1182 92 : // the next iteration. This could probably made smarter, but such
1183 92 : // "gaps" in the sequence of level 0 files should only happen in case
1184 92 : // of a crash, partial download from cloud storage, or something like
1185 92 : // that, so it's not a big deal in practice.
1186 1424 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
1187 92 : let mut level0_deltas_iter = level0_deltas.iter();
1188 92 :
1189 92 : let first_level0_delta = level0_deltas_iter.next().unwrap();
1190 92 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
1191 92 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
1192 92 :
1193 92 : // Accumulate the size of layers in `deltas_to_compact`
1194 92 : let mut deltas_to_compact_bytes = 0;
1195 92 :
1196 92 : // Under normal circumstances, we will accumulate up to compaction_upper_limit L0s of size
1197 92 : // checkpoint_distance each. To avoid edge cases using extra system resources, bound our
1198 92 : // work in this function to only operate on this much delta data at once.
1199 92 : //
1200 92 : // In general, compaction_threshold should be <= compaction_upper_limit, but in case that
1201 92 : // the constraint is not respected, we use the larger of the two.
1202 92 : let delta_size_limit = std::cmp::max(
1203 92 : self.get_compaction_upper_limit(),
1204 92 : self.get_compaction_threshold(),
1205 92 : ) as u64
1206 92 : * std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
1207 92 :
1208 92 : let mut fully_compacted = true;
1209 92 :
1210 92 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
1211 804 : for l in level0_deltas_iter {
1212 712 : let lsn_range = &l.layer_desc().lsn_range;
1213 712 :
1214 712 : if lsn_range.start != prev_lsn_end {
1215 0 : break;
1216 712 : }
1217 712 : deltas_to_compact.push(l.download_and_keep_resident().await?);
1218 712 : deltas_to_compact_bytes += l.metadata().file_size;
1219 712 : prev_lsn_end = lsn_range.end;
1220 712 :
1221 712 : if deltas_to_compact_bytes >= delta_size_limit {
1222 0 : info!(
1223 0 : l0_deltas_selected = deltas_to_compact.len(),
1224 0 : l0_deltas_total = level0_deltas.len(),
1225 0 : "L0 compaction picker hit max delta layer size limit: {}",
1226 : delta_size_limit
1227 : );
1228 0 : fully_compacted = false;
1229 0 :
1230 0 : // Proceed with compaction, but only a subset of L0s
1231 0 : break;
1232 712 : }
1233 : }
1234 92 : let lsn_range = Range {
1235 92 : start: deltas_to_compact
1236 92 : .first()
1237 92 : .unwrap()
1238 92 : .layer_desc()
1239 92 : .lsn_range
1240 92 : .start,
1241 92 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
1242 92 : };
1243 92 :
1244 92 : info!(
1245 0 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
1246 0 : lsn_range.start,
1247 0 : lsn_range.end,
1248 0 : deltas_to_compact.len(),
1249 0 : level0_deltas.len()
1250 : );
1251 :
1252 804 : for l in deltas_to_compact.iter() {
1253 804 : info!("compact includes {l}");
1254 : }
1255 :
1256 : // We don't need the original list of layers anymore. Drop it so that
1257 : // we don't accidentally use it later in the function.
1258 92 : drop(level0_deltas);
1259 92 :
1260 92 : stats.read_lock_held_prerequisites_micros = stats
1261 92 : .read_lock_held_spawn_blocking_startup_micros
1262 92 : .till_now();
1263 :
1264 : // TODO: replace with streaming k-merge
1265 92 : let all_keys = {
1266 92 : let mut all_keys = Vec::new();
1267 804 : for l in deltas_to_compact.iter() {
1268 804 : if self.cancel.is_cancelled() {
1269 0 : return Err(CompactionError::ShuttingDown);
1270 804 : }
1271 804 : let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1272 804 : let keys = delta
1273 804 : .index_entries(ctx)
1274 804 : .await
1275 804 : .map_err(CompactionError::Other)?;
1276 804 : all_keys.extend(keys);
1277 : }
1278 : // The current stdlib sorting implementation is designed in a way where it is
1279 : // particularly fast where the slice is made up of sorted sub-ranges.
1280 8551764 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
1281 92 : all_keys
1282 92 : };
1283 92 :
1284 92 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
1285 :
1286 : // Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
1287 : //
1288 : // A hole is a key range for which this compaction doesn't have any WAL records.
1289 : // Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
1290 : // cover the hole, but actually don't contain any WAL records for that key range.
1291 : // The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
1292 : // That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
1293 : //
1294 : // The algorithm chooses holes as follows.
1295 : // - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
1296 : // - Filter: min threshold on range length
1297 : // - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
1298 : //
1299 : // For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
1300 : #[derive(PartialEq, Eq)]
1301 : struct Hole {
1302 : key_range: Range<Key>,
1303 : coverage_size: usize,
1304 : }
1305 92 : let holes: Vec<Hole> = {
1306 : use std::cmp::Ordering;
1307 : impl Ord for Hole {
1308 0 : fn cmp(&self, other: &Self) -> Ordering {
1309 0 : self.coverage_size.cmp(&other.coverage_size).reverse()
1310 0 : }
1311 : }
1312 : impl PartialOrd for Hole {
1313 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1314 0 : Some(self.cmp(other))
1315 0 : }
1316 : }
1317 92 : let max_holes = deltas_to_compact.len();
1318 92 : let last_record_lsn = self.get_last_record_lsn();
1319 92 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
1320 92 : let min_hole_coverage_size = 3; // TODO: something more flexible?
1321 92 : // min-heap (reserve space for one more element added before eviction)
1322 92 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
1323 92 : let mut prev: Option<Key> = None;
1324 :
1325 4128076 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
1326 4128076 : if let Some(prev_key) = prev {
1327 : // just first fast filter, do not create hole entries for metadata keys. The last hole in the
1328 : // compaction is the gap between data key and metadata keys.
1329 4127984 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
1330 1093 : && !Key::is_metadata_key(&prev_key)
1331 : {
1332 0 : let key_range = prev_key..next_key;
1333 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
1334 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
1335 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
1336 0 : // That is why it is better to measure size of hole as number of covering image layers.
1337 0 : let coverage_size =
1338 0 : layers.image_coverage(&key_range, last_record_lsn).len();
1339 0 : if coverage_size >= min_hole_coverage_size {
1340 0 : heap.push(Hole {
1341 0 : key_range,
1342 0 : coverage_size,
1343 0 : });
1344 0 : if heap.len() > max_holes {
1345 0 : heap.pop(); // remove smallest hole
1346 0 : }
1347 0 : }
1348 4127984 : }
1349 92 : }
1350 4128076 : prev = Some(next_key.next());
1351 : }
1352 92 : let mut holes = heap.into_vec();
1353 92 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
1354 92 : holes
1355 92 : };
1356 92 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
1357 92 : drop_rlock(guard);
1358 92 :
1359 92 : if self.cancel.is_cancelled() {
1360 0 : return Err(CompactionError::ShuttingDown);
1361 92 : }
1362 92 :
1363 92 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
1364 :
1365 : // This iterator walks through all key-value pairs from all the layers
1366 : // we're compacting, in key, LSN order.
1367 : // If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
1368 : // then the Value::Image is ordered before Value::WalRecord.
1369 92 : let mut all_values_iter = {
1370 92 : let mut deltas = Vec::with_capacity(deltas_to_compact.len());
1371 804 : for l in deltas_to_compact.iter() {
1372 804 : let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
1373 804 : deltas.push(l);
1374 : }
1375 92 : MergeIterator::create(&deltas, &[], ctx)
1376 92 : };
1377 92 :
1378 92 : // This iterator walks through all keys and is needed to calculate size used by each key
1379 92 : let mut all_keys_iter = all_keys
1380 92 : .iter()
1381 4128076 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
1382 4127984 : .coalesce(|mut prev, cur| {
1383 4127984 : // Coalesce keys that belong to the same key pair.
1384 4127984 : // This ensures that compaction doesn't put them
1385 4127984 : // into different layer files.
1386 4127984 : // Still limit this by the target file size,
1387 4127984 : // so that we keep the size of the files in
1388 4127984 : // check.
1389 4127984 : if prev.0 == cur.0 && prev.2 < target_file_size {
1390 57222 : prev.2 += cur.2;
1391 57222 : Ok(prev)
1392 : } else {
1393 4070762 : Err((prev, cur))
1394 : }
1395 4127984 : });
1396 92 :
1397 92 : // Merge the contents of all the input delta layers into a new set
1398 92 : // of delta layers, based on the current partitioning.
1399 92 : //
1400 92 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
1401 92 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
1402 92 : // would be too large. In that case, we also split on the LSN dimension.
1403 92 : //
1404 92 : // LSN
1405 92 : // ^
1406 92 : // |
1407 92 : // | +-----------+ +--+--+--+--+
1408 92 : // | | | | | | | |
1409 92 : // | +-----------+ | | | | |
1410 92 : // | | | | | | | |
1411 92 : // | +-----------+ ==> | | | | |
1412 92 : // | | | | | | | |
1413 92 : // | +-----------+ | | | | |
1414 92 : // | | | | | | | |
1415 92 : // | +-----------+ +--+--+--+--+
1416 92 : // |
1417 92 : // +--------------> key
1418 92 : //
1419 92 : //
1420 92 : // If one key (X) has a lot of page versions:
1421 92 : //
1422 92 : // LSN
1423 92 : // ^
1424 92 : // | (X)
1425 92 : // | +-----------+ +--+--+--+--+
1426 92 : // | | | | | | | |
1427 92 : // | +-----------+ | | +--+ |
1428 92 : // | | | | | | | |
1429 92 : // | +-----------+ ==> | | | | |
1430 92 : // | | | | | +--+ |
1431 92 : // | +-----------+ | | | | |
1432 92 : // | | | | | | | |
1433 92 : // | +-----------+ +--+--+--+--+
1434 92 : // |
1435 92 : // +--------------> key
1436 92 : // TODO: this actually divides the layers into fixed-size chunks, not
1437 92 : // based on the partitioning.
1438 92 : //
1439 92 : // TODO: we should also opportunistically materialize and
1440 92 : // garbage collect what we can.
1441 92 : let mut new_layers = Vec::new();
1442 92 : let mut prev_key: Option<Key> = None;
1443 92 : let mut writer: Option<DeltaLayerWriter> = None;
1444 92 : let mut key_values_total_size = 0u64;
1445 92 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
1446 92 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
1447 92 : let mut next_hole = 0; // index of next hole in holes vector
1448 92 :
1449 92 : let mut keys = 0;
1450 :
1451 4128168 : while let Some((key, lsn, value)) = all_values_iter
1452 4128168 : .next()
1453 4128168 : .await
1454 4128168 : .map_err(CompactionError::Other)?
1455 : {
1456 4128076 : keys += 1;
1457 4128076 :
1458 4128076 : if keys % 32_768 == 0 && self.cancel.is_cancelled() {
1459 : // avoid hitting the cancellation token on every key. in benches, we end up
1460 : // shuffling an order of million keys per layer, this means we'll check it
1461 : // around tens of times per layer.
1462 0 : return Err(CompactionError::ShuttingDown);
1463 4128076 : }
1464 4128076 :
1465 4128076 : let same_key = prev_key == Some(key);
1466 4128076 : // We need to check key boundaries once we reach next key or end of layer with the same key
1467 4128076 : if !same_key || lsn == dup_end_lsn {
1468 4070854 : let mut next_key_size = 0u64;
1469 4070854 : let is_dup_layer = dup_end_lsn.is_valid();
1470 4070854 : dup_start_lsn = Lsn::INVALID;
1471 4070854 : if !same_key {
1472 4070854 : dup_end_lsn = Lsn::INVALID;
1473 4070854 : }
1474 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
1475 4070854 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
1476 4070854 : next_key_size = next_size;
1477 4070854 : if key != next_key {
1478 4070762 : if dup_end_lsn.is_valid() {
1479 0 : // We are writting segment with duplicates:
1480 0 : // place all remaining values of this key in separate segment
1481 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
1482 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
1483 4070762 : }
1484 4070762 : break;
1485 92 : }
1486 92 : key_values_total_size += next_size;
1487 92 : // Check if it is time to split segment: if total keys size is larger than target file size.
1488 92 : // We need to avoid generation of empty segments if next_size > target_file_size.
1489 92 : if key_values_total_size > target_file_size && lsn != next_lsn {
1490 : // Split key between multiple layers: such layer can contain only single key
1491 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
1492 0 : dup_end_lsn // new segment with duplicates starts where old one stops
1493 : } else {
1494 0 : lsn // start with the first LSN for this key
1495 : };
1496 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
1497 0 : break;
1498 92 : }
1499 : }
1500 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
1501 4070854 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
1502 0 : dup_start_lsn = dup_end_lsn;
1503 0 : dup_end_lsn = lsn_range.end;
1504 4070854 : }
1505 4070854 : if writer.is_some() {
1506 4070762 : let written_size = writer.as_mut().unwrap().size();
1507 4070762 : let contains_hole =
1508 4070762 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
1509 : // check if key cause layer overflow or contains hole...
1510 4070762 : if is_dup_layer
1511 4070762 : || dup_end_lsn.is_valid()
1512 4070762 : || written_size + key_values_total_size > target_file_size
1513 4070202 : || contains_hole
1514 : {
1515 : // ... if so, flush previous layer and prepare to write new one
1516 560 : let (desc, path) = writer
1517 560 : .take()
1518 560 : .unwrap()
1519 560 : .finish(prev_key.unwrap().next(), ctx)
1520 560 : .await
1521 560 : .map_err(CompactionError::Other)?;
1522 560 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1523 560 : .map_err(CompactionError::Other)?;
1524 :
1525 560 : new_layers.push(new_delta);
1526 560 : writer = None;
1527 560 :
1528 560 : if contains_hole {
1529 0 : // skip hole
1530 0 : next_hole += 1;
1531 560 : }
1532 4070202 : }
1533 92 : }
1534 : // Remember size of key value because at next iteration we will access next item
1535 4070854 : key_values_total_size = next_key_size;
1536 57222 : }
1537 4128076 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
1538 0 : Err(CompactionError::Other(anyhow::anyhow!(
1539 0 : "failpoint delta-layer-writer-fail-before-finish"
1540 0 : )))
1541 4128076 : });
1542 :
1543 4128076 : if !self.shard_identity.is_key_disposable(&key) {
1544 4128076 : if writer.is_none() {
1545 652 : if self.cancel.is_cancelled() {
1546 : // to be somewhat responsive to cancellation, check for each new layer
1547 0 : return Err(CompactionError::ShuttingDown);
1548 652 : }
1549 : // Create writer if not initiaized yet
1550 652 : writer = Some(
1551 : DeltaLayerWriter::new(
1552 652 : self.conf,
1553 652 : self.timeline_id,
1554 652 : self.tenant_shard_id,
1555 652 : key,
1556 652 : if dup_end_lsn.is_valid() {
1557 : // this is a layer containing slice of values of the same key
1558 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
1559 0 : dup_start_lsn..dup_end_lsn
1560 : } else {
1561 652 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
1562 652 : lsn_range.clone()
1563 : },
1564 652 : ctx,
1565 652 : )
1566 652 : .await
1567 652 : .map_err(CompactionError::Other)?,
1568 : );
1569 :
1570 652 : keys = 0;
1571 4127424 : }
1572 :
1573 4128076 : writer
1574 4128076 : .as_mut()
1575 4128076 : .unwrap()
1576 4128076 : .put_value(key, lsn, value, ctx)
1577 4128076 : .await
1578 4128076 : .map_err(CompactionError::Other)?;
1579 : } else {
1580 0 : let owner = self.shard_identity.get_shard_number(&key);
1581 0 :
1582 0 : // This happens after a shard split, when we're compacting an L0 created by our parent shard
1583 0 : debug!("dropping key {key} during compaction (it belongs on shard {owner})");
1584 : }
1585 :
1586 4128076 : if !new_layers.is_empty() {
1587 39572 : fail_point!("after-timeline-compacted-first-L1");
1588 4088504 : }
1589 :
1590 4128076 : prev_key = Some(key);
1591 : }
1592 92 : if let Some(writer) = writer {
1593 92 : let (desc, path) = writer
1594 92 : .finish(prev_key.unwrap().next(), ctx)
1595 92 : .await
1596 92 : .map_err(CompactionError::Other)?;
1597 92 : let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
1598 92 : .map_err(CompactionError::Other)?;
1599 92 : new_layers.push(new_delta);
1600 0 : }
1601 :
1602 : // Sync layers
1603 92 : if !new_layers.is_empty() {
1604 : // Print a warning if the created layer is larger than double the target size
1605 : // Add two pages for potential overhead. This should in theory be already
1606 : // accounted for in the target calculation, but for very small targets,
1607 : // we still might easily hit the limit otherwise.
1608 92 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
1609 652 : for layer in new_layers.iter() {
1610 652 : if layer.layer_desc().file_size > warn_limit {
1611 0 : warn!(
1612 : %layer,
1613 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
1614 : );
1615 652 : }
1616 : }
1617 :
1618 : // The writer.finish() above already did the fsync of the inodes.
1619 : // We just need to fsync the directory in which these inodes are linked,
1620 : // which we know to be the timeline directory.
1621 : //
1622 : // We use fatal_err() below because the after writer.finish() returns with success,
1623 : // the in-memory state of the filesystem already has the layer file in its final place,
1624 : // and subsequent pageserver code could think it's durable while it really isn't.
1625 92 : let timeline_dir = VirtualFile::open(
1626 92 : &self
1627 92 : .conf
1628 92 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
1629 92 : ctx,
1630 92 : )
1631 92 : .await
1632 92 : .fatal_err("VirtualFile::open for timeline dir fsync");
1633 92 : timeline_dir
1634 92 : .sync_all()
1635 92 : .await
1636 92 : .fatal_err("VirtualFile::sync_all timeline dir");
1637 0 : }
1638 :
1639 92 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
1640 92 : stats.new_deltas_count = Some(new_layers.len());
1641 652 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
1642 92 :
1643 92 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
1644 92 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
1645 : {
1646 92 : Ok(stats_json) => {
1647 92 : info!(
1648 0 : stats_json = stats_json.as_str(),
1649 0 : "compact_level0_phase1 stats available"
1650 : )
1651 : }
1652 0 : Err(e) => {
1653 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
1654 : }
1655 : }
1656 :
1657 : // Without this, rustc complains about deltas_to_compact still
1658 : // being borrowed when we `.into_iter()` below.
1659 92 : drop(all_values_iter);
1660 92 :
1661 92 : Ok(CompactLevel0Phase1Result {
1662 92 : new_layers,
1663 92 : deltas_to_compact: deltas_to_compact
1664 92 : .into_iter()
1665 804 : .map(|x| x.drop_eviction_guard())
1666 92 : .collect::<Vec<_>>(),
1667 92 : outcome: if fully_compacted {
1668 92 : CompactionOutcome::Done
1669 : } else {
1670 0 : CompactionOutcome::Pending
1671 : },
1672 : })
1673 768 : }
1674 : }
1675 :
1676 : #[derive(Default)]
1677 : struct CompactLevel0Phase1Result {
1678 : new_layers: Vec<ResidentLayer>,
1679 : deltas_to_compact: Vec<Layer>,
1680 : // Whether we have included all L0 layers, or selected only part of them due to the
1681 : // L0 compaction size limit.
1682 : outcome: CompactionOutcome,
1683 : }
1684 :
1685 : #[derive(Default)]
1686 : struct CompactLevel0Phase1StatsBuilder {
1687 : version: Option<u64>,
1688 : tenant_id: Option<TenantShardId>,
1689 : timeline_id: Option<TimelineId>,
1690 : read_lock_acquisition_micros: DurationRecorder,
1691 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
1692 : read_lock_held_key_sort_micros: DurationRecorder,
1693 : read_lock_held_prerequisites_micros: DurationRecorder,
1694 : read_lock_held_compute_holes_micros: DurationRecorder,
1695 : read_lock_drop_micros: DurationRecorder,
1696 : write_layer_files_micros: DurationRecorder,
1697 : level0_deltas_count: Option<usize>,
1698 : new_deltas_count: Option<usize>,
1699 : new_deltas_size: Option<u64>,
1700 : }
1701 :
1702 : #[derive(serde::Serialize)]
1703 : struct CompactLevel0Phase1Stats {
1704 : version: u64,
1705 : tenant_id: TenantShardId,
1706 : timeline_id: TimelineId,
1707 : read_lock_acquisition_micros: RecordedDuration,
1708 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
1709 : read_lock_held_key_sort_micros: RecordedDuration,
1710 : read_lock_held_prerequisites_micros: RecordedDuration,
1711 : read_lock_held_compute_holes_micros: RecordedDuration,
1712 : read_lock_drop_micros: RecordedDuration,
1713 : write_layer_files_micros: RecordedDuration,
1714 : level0_deltas_count: usize,
1715 : new_deltas_count: usize,
1716 : new_deltas_size: u64,
1717 : }
1718 :
1719 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
1720 : type Error = anyhow::Error;
1721 :
1722 92 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
1723 92 : Ok(Self {
1724 92 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
1725 92 : tenant_id: value
1726 92 : .tenant_id
1727 92 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
1728 92 : timeline_id: value
1729 92 : .timeline_id
1730 92 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
1731 92 : read_lock_acquisition_micros: value
1732 92 : .read_lock_acquisition_micros
1733 92 : .into_recorded()
1734 92 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
1735 92 : read_lock_held_spawn_blocking_startup_micros: value
1736 92 : .read_lock_held_spawn_blocking_startup_micros
1737 92 : .into_recorded()
1738 92 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
1739 92 : read_lock_held_key_sort_micros: value
1740 92 : .read_lock_held_key_sort_micros
1741 92 : .into_recorded()
1742 92 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
1743 92 : read_lock_held_prerequisites_micros: value
1744 92 : .read_lock_held_prerequisites_micros
1745 92 : .into_recorded()
1746 92 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
1747 92 : read_lock_held_compute_holes_micros: value
1748 92 : .read_lock_held_compute_holes_micros
1749 92 : .into_recorded()
1750 92 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
1751 92 : read_lock_drop_micros: value
1752 92 : .read_lock_drop_micros
1753 92 : .into_recorded()
1754 92 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
1755 92 : write_layer_files_micros: value
1756 92 : .write_layer_files_micros
1757 92 : .into_recorded()
1758 92 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
1759 92 : level0_deltas_count: value
1760 92 : .level0_deltas_count
1761 92 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
1762 92 : new_deltas_count: value
1763 92 : .new_deltas_count
1764 92 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
1765 92 : new_deltas_size: value
1766 92 : .new_deltas_size
1767 92 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
1768 : })
1769 92 : }
1770 : }
1771 :
1772 : impl Timeline {
1773 : /// Entry point for new tiered compaction algorithm.
1774 : ///
1775 : /// All the real work is in the implementation in the pageserver_compaction
1776 : /// crate. The code here would apply to any algorithm implemented by the
1777 : /// same interface, but tiered is the only one at the moment.
1778 : ///
1779 : /// TODO: cancellation
1780 0 : pub(crate) async fn compact_tiered(
1781 0 : self: &Arc<Self>,
1782 0 : _cancel: &CancellationToken,
1783 0 : ctx: &RequestContext,
1784 0 : ) -> Result<(), CompactionError> {
1785 0 : let fanout = self.get_compaction_threshold() as u64;
1786 0 : let target_file_size = self.get_checkpoint_distance();
1787 :
1788 : // Find the top of the historical layers
1789 0 : let end_lsn = {
1790 0 : let guard = self.layers.read().await;
1791 0 : let layers = guard.layer_map()?;
1792 :
1793 0 : let l0_deltas = layers.level0_deltas();
1794 0 :
1795 0 : // As an optimization, if we find that there are too few L0 layers,
1796 0 : // bail out early. We know that the compaction algorithm would do
1797 0 : // nothing in that case.
1798 0 : if l0_deltas.len() < fanout as usize {
1799 : // doesn't need compacting
1800 0 : return Ok(());
1801 0 : }
1802 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
1803 0 : };
1804 0 :
1805 0 : // Is the timeline being deleted?
1806 0 : if self.is_stopping() {
1807 0 : trace!("Dropping out of compaction on timeline shutdown");
1808 0 : return Err(CompactionError::ShuttingDown);
1809 0 : }
1810 :
1811 0 : let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
1812 : // TODO(chi): ignore sparse_keyspace for now, compact it in the future.
1813 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
1814 0 :
1815 0 : pageserver_compaction::compact_tiered::compact_tiered(
1816 0 : &mut adaptor,
1817 0 : end_lsn,
1818 0 : target_file_size,
1819 0 : fanout,
1820 0 : ctx,
1821 0 : )
1822 0 : .await
1823 : // TODO: compact_tiered needs to return CompactionError
1824 0 : .map_err(CompactionError::Other)?;
1825 :
1826 0 : adaptor.flush_updates().await?;
1827 0 : Ok(())
1828 0 : }
1829 :
1830 : /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
1831 : ///
1832 : /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
1833 : /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
1834 : /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
1835 : ///
1836 : /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
1837 : ///
1838 : /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
1839 : /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
1840 : ///
1841 : /// The function will produce:
1842 : ///
1843 : /// ```plain
1844 : /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
1845 : /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
1846 : /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
1847 : /// above_horizon -> deltas=[+F@0x60] full history above the horizon
1848 : /// ```
1849 : ///
1850 : /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
1851 1260 : pub(crate) async fn generate_key_retention(
1852 1260 : self: &Arc<Timeline>,
1853 1260 : key: Key,
1854 1260 : full_history: &[(Key, Lsn, Value)],
1855 1260 : horizon: Lsn,
1856 1260 : retain_lsn_below_horizon: &[Lsn],
1857 1260 : delta_threshold_cnt: usize,
1858 1260 : base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
1859 1260 : ) -> anyhow::Result<KeyHistoryRetention> {
1860 : // Pre-checks for the invariants
1861 :
1862 1260 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
1863 :
1864 1260 : if debug_mode {
1865 3060 : for (log_key, _, _) in full_history {
1866 1800 : assert_eq!(log_key, &key, "mismatched key");
1867 : }
1868 1260 : for i in 1..full_history.len() {
1869 540 : assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN");
1870 540 : if full_history[i - 1].1 == full_history[i].1 {
1871 0 : assert!(
1872 0 : matches!(full_history[i - 1].2, Value::Image(_)),
1873 0 : "unordered delta/image, or duplicated delta"
1874 : );
1875 540 : }
1876 : }
1877 : // There was an assertion for no base image that checks if the first
1878 : // record in the history is `will_init` before, but it was removed.
1879 : // This is explained in the test cases for generate_key_retention.
1880 : // Search "incomplete history" for more information.
1881 2820 : for lsn in retain_lsn_below_horizon {
1882 1560 : assert!(lsn < &horizon, "retain lsn must be below horizon")
1883 : }
1884 1260 : for i in 1..retain_lsn_below_horizon.len() {
1885 712 : assert!(
1886 712 : retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
1887 0 : "unordered LSN"
1888 : );
1889 : }
1890 0 : }
1891 1260 : let has_ancestor = base_img_from_ancestor.is_some();
1892 : // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
1893 : // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
1894 1260 : let (mut split_history, lsn_split_points) = {
1895 1260 : let mut split_history = Vec::new();
1896 1260 : split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
1897 1260 : let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
1898 2820 : for lsn in retain_lsn_below_horizon {
1899 1560 : lsn_split_points.push(*lsn);
1900 1560 : }
1901 1260 : lsn_split_points.push(horizon);
1902 1260 : let mut current_idx = 0;
1903 3060 : for item @ (_, lsn, _) in full_history {
1904 2288 : while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
1905 488 : current_idx += 1;
1906 488 : }
1907 1800 : split_history[current_idx].push(item);
1908 : }
1909 1260 : (split_history, lsn_split_points)
1910 : };
1911 : // Step 2: filter out duplicated records due to the k-merge of image/delta layers
1912 5340 : for split_for_lsn in &mut split_history {
1913 4080 : let mut prev_lsn = None;
1914 4080 : let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
1915 4080 : for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
1916 1800 : if let Some(prev_lsn) = &prev_lsn {
1917 236 : if *prev_lsn == lsn {
1918 : // The case that we have an LSN with both data from the delta layer and the image layer. As
1919 : // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
1920 : // drop this delta and keep the image.
1921 : //
1922 : // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
1923 : // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
1924 : // dropped.
1925 : //
1926 : // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
1927 : // threshold, we could have kept delta instead to save space. This is an optimization for the future.
1928 0 : continue;
1929 236 : }
1930 1564 : }
1931 1800 : prev_lsn = Some(lsn);
1932 1800 : new_split_for_lsn.push(record);
1933 : }
1934 4080 : *split_for_lsn = new_split_for_lsn;
1935 : }
1936 : // Step 3: generate images when necessary
1937 1260 : let mut retention = Vec::with_capacity(split_history.len());
1938 1260 : let mut records_since_last_image = 0;
1939 1260 : let batch_cnt = split_history.len();
1940 1260 : assert!(
1941 1260 : batch_cnt >= 2,
1942 0 : "should have at least below + above horizon batches"
1943 : );
1944 1260 : let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
1945 1260 : if let Some((key, lsn, img)) = base_img_from_ancestor {
1946 84 : replay_history.push((key, lsn, Value::Image(img)));
1947 1176 : }
1948 :
1949 : /// Generate debug information for the replay history
1950 0 : fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String {
1951 : use std::fmt::Write;
1952 0 : let mut output = String::new();
1953 0 : if let Some((key, _, _)) = replay_history.first() {
1954 0 : write!(output, "key={} ", key).unwrap();
1955 0 : let mut cnt = 0;
1956 0 : for (_, lsn, val) in replay_history {
1957 0 : if val.is_image() {
1958 0 : write!(output, "i@{} ", lsn).unwrap();
1959 0 : } else if val.will_init() {
1960 0 : write!(output, "di@{} ", lsn).unwrap();
1961 0 : } else {
1962 0 : write!(output, "d@{} ", lsn).unwrap();
1963 0 : }
1964 0 : cnt += 1;
1965 0 : if cnt >= 128 {
1966 0 : write!(output, "... and more").unwrap();
1967 0 : break;
1968 0 : }
1969 : }
1970 0 : } else {
1971 0 : write!(output, "<no history>").unwrap();
1972 0 : }
1973 0 : output
1974 0 : }
1975 :
1976 0 : fn generate_debug_trace(
1977 0 : replay_history: Option<&[(Key, Lsn, Value)]>,
1978 0 : full_history: &[(Key, Lsn, Value)],
1979 0 : lsns: &[Lsn],
1980 0 : horizon: Lsn,
1981 0 : ) -> String {
1982 : use std::fmt::Write;
1983 0 : let mut output = String::new();
1984 0 : if let Some(replay_history) = replay_history {
1985 0 : writeln!(
1986 0 : output,
1987 0 : "replay_history: {}",
1988 0 : generate_history_trace(replay_history)
1989 0 : )
1990 0 : .unwrap();
1991 0 : } else {
1992 0 : writeln!(output, "replay_history: <disabled>",).unwrap();
1993 0 : }
1994 0 : writeln!(
1995 0 : output,
1996 0 : "full_history: {}",
1997 0 : generate_history_trace(full_history)
1998 0 : )
1999 0 : .unwrap();
2000 0 : writeln!(
2001 0 : output,
2002 0 : "when processing: [{}] horizon={}",
2003 0 : lsns.iter().map(|l| format!("{l}")).join(","),
2004 0 : horizon
2005 0 : )
2006 0 : .unwrap();
2007 0 : output
2008 0 : }
2009 :
2010 1260 : let mut key_exists = false;
2011 4080 : for (i, split_for_lsn) in split_history.into_iter().enumerate() {
2012 : // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
2013 4080 : records_since_last_image += split_for_lsn.len();
2014 : // Whether to produce an image into the final layer files
2015 4080 : let produce_image = if i == 0 && !has_ancestor {
2016 : // We always generate images for the first batch (below horizon / lowest retain_lsn)
2017 1176 : true
2018 2904 : } else if i == batch_cnt - 1 {
2019 : // Do not generate images for the last batch (above horizon)
2020 1260 : false
2021 1644 : } else if records_since_last_image == 0 {
2022 1288 : false
2023 356 : } else if records_since_last_image >= delta_threshold_cnt {
2024 : // Generate images when there are too many records
2025 12 : true
2026 : } else {
2027 344 : false
2028 : };
2029 4080 : replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
2030 : // Only retain the items after the last image record
2031 5028 : for idx in (0..replay_history.len()).rev() {
2032 5028 : if replay_history[idx].2.will_init() {
2033 4080 : replay_history = replay_history[idx..].to_vec();
2034 4080 : break;
2035 948 : }
2036 : }
2037 4080 : if replay_history.is_empty() && !key_exists {
2038 : // The key does not exist at earlier LSN, we can skip this iteration.
2039 0 : retention.push(Vec::new());
2040 0 : continue;
2041 4080 : } else {
2042 4080 : key_exists = true;
2043 4080 : }
2044 4080 : let Some((_, _, val)) = replay_history.first() else {
2045 0 : unreachable!("replay history should not be empty once it exists")
2046 : };
2047 4080 : if !val.will_init() {
2048 0 : return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
2049 0 : generate_debug_trace(
2050 0 : Some(&replay_history),
2051 0 : full_history,
2052 0 : retain_lsn_below_horizon,
2053 0 : horizon,
2054 0 : )
2055 0 : });
2056 4080 : }
2057 : // Whether to reconstruct the image. In debug mode, we will generate an image
2058 : // at every retain_lsn to ensure data is not corrupted, but we won't put the
2059 : // image into the final layer.
2060 4080 : let generate_image = produce_image || debug_mode;
2061 4080 : if produce_image {
2062 1188 : records_since_last_image = 0;
2063 2892 : }
2064 4080 : let img_and_lsn = if generate_image {
2065 4080 : let replay_history_for_debug = if debug_mode {
2066 4080 : Some(replay_history.clone())
2067 : } else {
2068 0 : None
2069 : };
2070 4080 : let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
2071 4080 : let history = if produce_image {
2072 1188 : std::mem::take(&mut replay_history)
2073 : } else {
2074 2892 : replay_history.clone()
2075 : };
2076 4080 : let mut img = None;
2077 4080 : let mut records = Vec::with_capacity(history.len());
2078 4080 : if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
2079 4036 : img = Some((*lsn, val.clone()));
2080 4036 : for (_, lsn, val) in history.into_iter().skip(1) {
2081 920 : let Value::WalRecord(rec) = val else {
2082 0 : return Err(anyhow::anyhow!(
2083 0 : "invalid record, first record is image, expect walrecords"
2084 0 : ))
2085 0 : .with_context(|| {
2086 0 : generate_debug_trace(
2087 0 : replay_history_for_debug_ref,
2088 0 : full_history,
2089 0 : retain_lsn_below_horizon,
2090 0 : horizon,
2091 0 : )
2092 0 : });
2093 : };
2094 920 : records.push((lsn, rec));
2095 : }
2096 : } else {
2097 72 : for (_, lsn, val) in history.into_iter() {
2098 72 : let Value::WalRecord(rec) = val else {
2099 0 : return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord"))
2100 0 : .with_context(|| generate_debug_trace(
2101 0 : replay_history_for_debug_ref,
2102 0 : full_history,
2103 0 : retain_lsn_below_horizon,
2104 0 : horizon,
2105 0 : ));
2106 : };
2107 72 : records.push((lsn, rec));
2108 : }
2109 : }
2110 4080 : records.reverse();
2111 4080 : let state = ValueReconstructState { img, records };
2112 : // last batch does not generate image so i is always in range, unless we force generate
2113 : // an image during testing
2114 4080 : let request_lsn = if i >= lsn_split_points.len() {
2115 1260 : Lsn::MAX
2116 : } else {
2117 2820 : lsn_split_points[i]
2118 : };
2119 4080 : let img = self.reconstruct_value(key, request_lsn, state).await?;
2120 4080 : Some((request_lsn, img))
2121 : } else {
2122 0 : None
2123 : };
2124 4080 : if produce_image {
2125 1188 : let (request_lsn, img) = img_and_lsn.unwrap();
2126 1188 : replay_history.push((key, request_lsn, Value::Image(img.clone())));
2127 1188 : retention.push(vec![(request_lsn, Value::Image(img))]);
2128 2892 : } else {
2129 2892 : let deltas = split_for_lsn
2130 2892 : .iter()
2131 2892 : .map(|(_, lsn, value)| (*lsn, value.clone()))
2132 2892 : .collect_vec();
2133 2892 : retention.push(deltas);
2134 2892 : }
2135 : }
2136 1260 : let mut result = Vec::with_capacity(retention.len());
2137 1260 : assert_eq!(retention.len(), lsn_split_points.len() + 1);
2138 4080 : for (idx, logs) in retention.into_iter().enumerate() {
2139 4080 : if idx == lsn_split_points.len() {
2140 1260 : return Ok(KeyHistoryRetention {
2141 1260 : below_horizon: result,
2142 1260 : above_horizon: KeyLogAtLsn(logs),
2143 1260 : });
2144 2820 : } else {
2145 2820 : result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
2146 2820 : }
2147 : }
2148 0 : unreachable!("key retention is empty")
2149 1260 : }
2150 :
2151 : /// Check how much space is left on the disk
2152 104 : async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
2153 104 : let tenants_dir = self.conf.tenants_path();
2154 :
2155 104 : let stat = Statvfs::get(&tenants_dir, None)
2156 104 : .context("statvfs failed, presumably directory got unlinked")?;
2157 :
2158 104 : let (avail_bytes, _) = stat.get_avail_total_bytes();
2159 104 :
2160 104 : Ok(avail_bytes)
2161 104 : }
2162 :
2163 : /// Check if the compaction can proceed safely without running out of space. We assume the size
2164 : /// upper bound of the produced files of a compaction job is the same as all layers involved in
2165 : /// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
2166 : /// compaction.
2167 104 : async fn check_compaction_space(
2168 104 : self: &Arc<Self>,
2169 104 : layer_selection: &[Layer],
2170 104 : ) -> anyhow::Result<()> {
2171 104 : let available_space = self.check_available_space().await?;
2172 104 : let mut remote_layer_size = 0;
2173 104 : let mut all_layer_size = 0;
2174 408 : for layer in layer_selection {
2175 304 : let needs_download = layer.needs_download().await?;
2176 304 : if needs_download.is_some() {
2177 0 : remote_layer_size += layer.layer_desc().file_size;
2178 304 : }
2179 304 : all_layer_size += layer.layer_desc().file_size;
2180 : }
2181 104 : let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
2182 104 : if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
2183 : {
2184 0 : return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
2185 0 : available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
2186 104 : }
2187 104 : Ok(())
2188 104 : }
2189 :
2190 : /// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
2191 : /// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
2192 : /// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
2193 : /// here.
2194 108 : pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
2195 108 : let gc_cutoff_lsn = {
2196 108 : let gc_info = self.gc_info.read().unwrap();
2197 108 : gc_info.min_cutoff()
2198 108 : };
2199 108 :
2200 108 : // TODO: standby horizon should use leases so we don't really need to consider it here.
2201 108 : // let watermark = watermark.min(self.standby_horizon.load());
2202 108 :
2203 108 : // TODO: ensure the child branches will not use anything below the watermark, or consider
2204 108 : // them when computing the watermark.
2205 108 : gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
2206 108 : }
2207 :
2208 : /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job.
2209 : /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN
2210 : /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts
2211 : /// like a full compaction of the specified keyspace.
2212 0 : pub(crate) async fn gc_compaction_split_jobs(
2213 0 : self: &Arc<Self>,
2214 0 : job: GcCompactJob,
2215 0 : sub_compaction_max_job_size_mb: Option<u64>,
2216 0 : ) -> anyhow::Result<Vec<GcCompactJob>> {
2217 0 : let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
2218 0 : job.compact_lsn_range.end
2219 : } else {
2220 0 : self.get_gc_compaction_watermark()
2221 : };
2222 :
2223 0 : if compact_below_lsn == Lsn::INVALID {
2224 0 : tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
2225 0 : return Ok(vec![]);
2226 0 : }
2227 :
2228 : // Split compaction job to about 4GB each
2229 : const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
2230 0 : let sub_compaction_max_job_size_mb =
2231 0 : sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
2232 0 :
2233 0 : let mut compact_jobs = Vec::new();
2234 0 : // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
2235 0 : // by estimating the amount of files read for a compaction job. We should also partition on LSN.
2236 0 : let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
2237 : // Truncate the key range to be within user specified compaction range.
2238 0 : fn truncate_to(
2239 0 : source_start: &Key,
2240 0 : source_end: &Key,
2241 0 : target_start: &Key,
2242 0 : target_end: &Key,
2243 0 : ) -> Option<(Key, Key)> {
2244 0 : let start = source_start.max(target_start);
2245 0 : let end = source_end.min(target_end);
2246 0 : if start < end {
2247 0 : Some((*start, *end))
2248 : } else {
2249 0 : None
2250 : }
2251 0 : }
2252 0 : let mut split_key_ranges = Vec::new();
2253 0 : let ranges = dense_ks
2254 0 : .parts
2255 0 : .iter()
2256 0 : .map(|partition| partition.ranges.iter())
2257 0 : .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
2258 0 : .flatten()
2259 0 : .cloned()
2260 0 : .collect_vec();
2261 0 : for range in ranges.iter() {
2262 0 : let Some((start, end)) = truncate_to(
2263 0 : &range.start,
2264 0 : &range.end,
2265 0 : &job.compact_key_range.start,
2266 0 : &job.compact_key_range.end,
2267 0 : ) else {
2268 0 : continue;
2269 : };
2270 0 : split_key_ranges.push((start, end));
2271 : }
2272 0 : split_key_ranges.sort();
2273 0 : let all_layers = {
2274 0 : let guard = self.layers.read().await;
2275 0 : let layer_map = guard.layer_map()?;
2276 0 : layer_map.iter_historic_layers().collect_vec()
2277 0 : };
2278 0 : let mut current_start = None;
2279 0 : let ranges_num = split_key_ranges.len();
2280 0 : for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
2281 0 : if current_start.is_none() {
2282 0 : current_start = Some(start);
2283 0 : }
2284 0 : let start = current_start.unwrap();
2285 0 : if start >= end {
2286 : // We have already processed this partition.
2287 0 : continue;
2288 0 : }
2289 0 : let overlapping_layers = {
2290 0 : let mut desc = Vec::new();
2291 0 : for layer in all_layers.iter() {
2292 0 : if overlaps_with(&layer.get_key_range(), &(start..end))
2293 0 : && layer.get_lsn_range().start <= compact_below_lsn
2294 0 : {
2295 0 : desc.push(layer.clone());
2296 0 : }
2297 : }
2298 0 : desc
2299 0 : };
2300 0 : let total_size = overlapping_layers.iter().map(|x| x.file_size).sum::<u64>();
2301 0 : if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
2302 : // Try to extend the compaction range so that we include at least one full layer file.
2303 0 : let extended_end = overlapping_layers
2304 0 : .iter()
2305 0 : .map(|layer| layer.key_range.end)
2306 0 : .min();
2307 : // It is possible that the search range does not contain any layer files when we reach the end of the loop.
2308 : // In this case, we simply use the specified key range end.
2309 0 : let end = if let Some(extended_end) = extended_end {
2310 0 : extended_end.max(end)
2311 : } else {
2312 0 : end
2313 : };
2314 0 : let end = if ranges_num == idx + 1 {
2315 : // extend the compaction range to the end of the key range if it's the last partition
2316 0 : end.max(job.compact_key_range.end)
2317 : } else {
2318 0 : end
2319 : };
2320 0 : info!(
2321 0 : "splitting compaction job: {}..{}, estimated_size={}",
2322 : start, end, total_size
2323 : );
2324 0 : compact_jobs.push(GcCompactJob {
2325 0 : dry_run: job.dry_run,
2326 0 : compact_key_range: start..end,
2327 0 : compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
2328 0 : });
2329 0 : current_start = Some(end);
2330 0 : }
2331 : }
2332 0 : Ok(compact_jobs)
2333 0 : }
2334 :
2335 : /// An experimental compaction building block that combines compaction with garbage collection.
2336 : ///
2337 : /// The current implementation picks all delta + image layers that are below or intersecting with
2338 : /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
2339 : /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
2340 : /// and create delta layers with all deltas >= gc horizon.
2341 : ///
2342 : /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
2343 : /// Partial compaction will read and process all layers overlapping with the key range, even if it might
2344 : /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
2345 : /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
2346 : /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
2347 : /// part of the range.
2348 : ///
2349 : /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with
2350 : /// the LSN. Otherwise, it will use the gc cutoff by default.
2351 108 : pub(crate) async fn compact_with_gc(
2352 108 : self: &Arc<Self>,
2353 108 : cancel: &CancellationToken,
2354 108 : options: CompactOptions,
2355 108 : ctx: &RequestContext,
2356 108 : ) -> anyhow::Result<()> {
2357 108 : let sub_compaction = options.sub_compaction;
2358 108 : let job = GcCompactJob::from_compact_options(options.clone());
2359 108 : if sub_compaction {
2360 0 : info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
2361 0 : let jobs = self
2362 0 : .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
2363 0 : .await?;
2364 0 : let jobs_len = jobs.len();
2365 0 : for (idx, job) in jobs.into_iter().enumerate() {
2366 0 : info!(
2367 0 : "running enhanced gc bottom-most compaction, sub-compaction {}/{}",
2368 0 : idx + 1,
2369 : jobs_len
2370 : );
2371 0 : self.compact_with_gc_inner(cancel, job, ctx).await?;
2372 : }
2373 0 : if jobs_len == 0 {
2374 0 : info!("no jobs to run, skipping gc bottom-most compaction");
2375 0 : }
2376 0 : return Ok(());
2377 108 : }
2378 108 : self.compact_with_gc_inner(cancel, job, ctx).await
2379 108 : }
2380 :
2381 108 : async fn compact_with_gc_inner(
2382 108 : self: &Arc<Self>,
2383 108 : cancel: &CancellationToken,
2384 108 : job: GcCompactJob,
2385 108 : ctx: &RequestContext,
2386 108 : ) -> anyhow::Result<()> {
2387 108 : // Block other compaction/GC tasks from running for now. GC-compaction could run along
2388 108 : // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
2389 108 : // Note that we already acquired the compaction lock when the outer `compact` function gets called.
2390 108 :
2391 108 : let gc_lock = async {
2392 108 : tokio::select! {
2393 108 : guard = self.gc_lock.lock() => Ok(guard),
2394 : // TODO: refactor to CompactionError to correctly pass cancelled error
2395 108 : _ = cancel.cancelled() => Err(anyhow!("cancelled")),
2396 : }
2397 108 : };
2398 :
2399 108 : let gc_lock = crate::timed(
2400 108 : gc_lock,
2401 108 : "acquires gc lock",
2402 108 : std::time::Duration::from_secs(5),
2403 108 : )
2404 108 : .await?;
2405 :
2406 108 : let dry_run = job.dry_run;
2407 108 : let compact_key_range = job.compact_key_range;
2408 108 : let compact_lsn_range = job.compact_lsn_range;
2409 :
2410 108 : let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");
2411 :
2412 108 : info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);
2413 :
2414 108 : scopeguard::defer! {
2415 108 : info!("done enhanced gc bottom-most compaction");
2416 108 : };
2417 108 :
2418 108 : let mut stat = CompactionStatistics::default();
2419 :
2420 : // Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
2421 : // The layer selection has the following properties:
2422 : // 1. If a layer is in the selection, all layers below it are in the selection.
2423 : // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
2424 104 : let job_desc = {
2425 108 : let guard = self.layers.read().await;
2426 108 : let layers = guard.layer_map()?;
2427 108 : let gc_info = self.gc_info.read().unwrap();
2428 108 : let mut retain_lsns_below_horizon = Vec::new();
2429 108 : let gc_cutoff = {
2430 : // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
2431 : // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
2432 : // cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
2433 : // to get the truth data.
2434 108 : let real_gc_cutoff = self.get_gc_compaction_watermark();
2435 : // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
2436 : // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use
2437 : // the real cutoff.
2438 108 : let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX {
2439 96 : if real_gc_cutoff == Lsn::INVALID {
2440 : // If the gc_cutoff is not generated yet, we should not compact anything.
2441 0 : tracing::warn!("no layers to compact with gc: gc_cutoff not generated yet, skipping gc bottom-most compaction");
2442 0 : return Ok(());
2443 96 : }
2444 96 : real_gc_cutoff
2445 : } else {
2446 12 : compact_lsn_range.end
2447 : };
2448 108 : if gc_cutoff > real_gc_cutoff {
2449 8 : warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
2450 8 : gc_cutoff = real_gc_cutoff;
2451 100 : }
2452 108 : gc_cutoff
2453 : };
2454 140 : for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
2455 140 : if lsn < &gc_cutoff {
2456 140 : retain_lsns_below_horizon.push(*lsn);
2457 140 : }
2458 : }
2459 108 : for lsn in gc_info.leases.keys() {
2460 0 : if lsn < &gc_cutoff {
2461 0 : retain_lsns_below_horizon.push(*lsn);
2462 0 : }
2463 : }
2464 108 : let mut selected_layers: Vec<Layer> = Vec::new();
2465 108 : drop(gc_info);
2466 : // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
2467 108 : let Some(max_layer_lsn) = layers
2468 108 : .iter_historic_layers()
2469 488 : .filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
2470 416 : .map(|desc| desc.get_lsn_range().end)
2471 108 : .max()
2472 : else {
2473 0 : info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
2474 0 : return Ok(());
2475 : };
2476 : // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below
2477 : // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if
2478 : // it is a branch.
2479 108 : let Some(min_layer_lsn) = layers
2480 108 : .iter_historic_layers()
2481 488 : .filter(|desc| {
2482 488 : if compact_lsn_range.start == Lsn::INVALID {
2483 396 : true // select all layers below if start == Lsn(0)
2484 : } else {
2485 92 : desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn
2486 : }
2487 488 : })
2488 452 : .map(|desc| desc.get_lsn_range().start)
2489 108 : .min()
2490 : else {
2491 0 : info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end);
2492 0 : return Ok(());
2493 : };
2494 : // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
2495 : // layers to compact.
2496 108 : let mut rewrite_layers = Vec::new();
2497 488 : for desc in layers.iter_historic_layers() {
2498 488 : if desc.get_lsn_range().end <= max_layer_lsn
2499 416 : && desc.get_lsn_range().start >= min_layer_lsn
2500 380 : && overlaps_with(&desc.get_key_range(), &compact_key_range)
2501 : {
2502 : // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range,
2503 : // even if it might contain extra keys
2504 304 : selected_layers.push(guard.get_from_desc(&desc));
2505 304 : // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine
2506 304 : // to overlap image layers)
2507 304 : if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range())
2508 4 : {
2509 4 : rewrite_layers.push(desc);
2510 300 : }
2511 184 : }
2512 : }
2513 108 : if selected_layers.is_empty() {
2514 4 : info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end);
2515 4 : return Ok(());
2516 104 : }
2517 104 : retain_lsns_below_horizon.sort();
2518 104 : GcCompactionJobDescription {
2519 104 : selected_layers,
2520 104 : gc_cutoff,
2521 104 : retain_lsns_below_horizon,
2522 104 : min_layer_lsn,
2523 104 : max_layer_lsn,
2524 104 : compaction_key_range: compact_key_range,
2525 104 : rewrite_layers,
2526 104 : }
2527 : };
2528 104 : let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID {
2529 : // If we only compact above some LSN, we should get the history from the current branch below the specified LSN.
2530 : // We use job_desc.min_layer_lsn as if it's the lowest branch point.
2531 16 : (true, job_desc.min_layer_lsn)
2532 88 : } else if self.ancestor_timeline.is_some() {
2533 : // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the
2534 : // LSN ranges all the way to the ancestor timeline.
2535 4 : (true, self.ancestor_lsn)
2536 : } else {
2537 84 : let res = job_desc
2538 84 : .retain_lsns_below_horizon
2539 84 : .first()
2540 84 : .copied()
2541 84 : .unwrap_or(job_desc.gc_cutoff);
2542 84 : if debug_mode {
2543 84 : assert_eq!(
2544 84 : res,
2545 84 : job_desc
2546 84 : .retain_lsns_below_horizon
2547 84 : .iter()
2548 84 : .min()
2549 84 : .copied()
2550 84 : .unwrap_or(job_desc.gc_cutoff)
2551 84 : );
2552 0 : }
2553 84 : (false, res)
2554 : };
2555 104 : info!(
2556 0 : "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
2557 0 : job_desc.selected_layers.len(),
2558 0 : job_desc.rewrite_layers.len(),
2559 : job_desc.max_layer_lsn,
2560 : job_desc.min_layer_lsn,
2561 : job_desc.gc_cutoff,
2562 : lowest_retain_lsn,
2563 : job_desc.compaction_key_range.start,
2564 : job_desc.compaction_key_range.end,
2565 : has_data_below,
2566 : );
2567 :
2568 408 : for layer in &job_desc.selected_layers {
2569 304 : debug!("read layer: {}", layer.layer_desc().key());
2570 : }
2571 108 : for layer in &job_desc.rewrite_layers {
2572 4 : debug!("rewrite layer: {}", layer.key());
2573 : }
2574 :
2575 104 : self.check_compaction_space(&job_desc.selected_layers)
2576 104 : .await?;
2577 :
2578 : // Generate statistics for the compaction
2579 408 : for layer in &job_desc.selected_layers {
2580 304 : let desc = layer.layer_desc();
2581 304 : if desc.is_delta() {
2582 172 : stat.visit_delta_layer(desc.file_size());
2583 172 : } else {
2584 132 : stat.visit_image_layer(desc.file_size());
2585 132 : }
2586 : }
2587 :
2588 : // Step 1: construct a k-merge iterator over all layers.
2589 : // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
2590 104 : let layer_names = job_desc
2591 104 : .selected_layers
2592 104 : .iter()
2593 304 : .map(|layer| layer.layer_desc().layer_name())
2594 104 : .collect_vec();
2595 104 : if let Some(err) = check_valid_layermap(&layer_names) {
2596 0 : bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
2597 104 : }
2598 104 : // The maximum LSN we are processing in this compaction loop
2599 104 : let end_lsn = job_desc
2600 104 : .selected_layers
2601 104 : .iter()
2602 304 : .map(|l| l.layer_desc().lsn_range.end)
2603 104 : .max()
2604 104 : .unwrap();
2605 104 : let mut delta_layers = Vec::new();
2606 104 : let mut image_layers = Vec::new();
2607 104 : let mut downloaded_layers = Vec::new();
2608 104 : let mut total_downloaded_size = 0;
2609 104 : let mut total_layer_size = 0;
2610 408 : for layer in &job_desc.selected_layers {
2611 304 : if layer.needs_download().await?.is_some() {
2612 0 : total_downloaded_size += layer.layer_desc().file_size;
2613 304 : }
2614 304 : total_layer_size += layer.layer_desc().file_size;
2615 304 : let resident_layer = layer.download_and_keep_resident().await?;
2616 304 : downloaded_layers.push(resident_layer);
2617 : }
2618 104 : info!(
2619 0 : "finish downloading layers, downloaded={}, total={}, ratio={:.2}",
2620 0 : total_downloaded_size,
2621 0 : total_layer_size,
2622 0 : total_downloaded_size as f64 / total_layer_size as f64
2623 : );
2624 408 : for resident_layer in &downloaded_layers {
2625 304 : if resident_layer.layer_desc().is_delta() {
2626 172 : let layer = resident_layer.get_as_delta(ctx).await?;
2627 172 : delta_layers.push(layer);
2628 : } else {
2629 132 : let layer = resident_layer.get_as_image(ctx).await?;
2630 132 : image_layers.push(layer);
2631 : }
2632 : }
2633 104 : let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?;
2634 104 : let mut merge_iter = FilterIterator::create(
2635 104 : MergeIterator::create(&delta_layers, &image_layers, ctx),
2636 104 : dense_ks,
2637 104 : sparse_ks,
2638 104 : )?;
2639 :
2640 : // Step 2: Produce images+deltas.
2641 104 : let mut accumulated_values = Vec::new();
2642 104 : let mut last_key: Option<Key> = None;
2643 :
2644 : // Only create image layers when there is no ancestor branches. TODO: create covering image layer
2645 : // when some condition meet.
2646 104 : let mut image_layer_writer = if !has_data_below {
2647 : Some(
2648 84 : SplitImageLayerWriter::new(
2649 84 : self.conf,
2650 84 : self.timeline_id,
2651 84 : self.tenant_shard_id,
2652 84 : job_desc.compaction_key_range.start,
2653 84 : lowest_retain_lsn,
2654 84 : self.get_compaction_target_size(),
2655 84 : ctx,
2656 84 : )
2657 84 : .await?,
2658 : )
2659 : } else {
2660 20 : None
2661 : };
2662 :
2663 104 : let mut delta_layer_writer = SplitDeltaLayerWriter::new(
2664 104 : self.conf,
2665 104 : self.timeline_id,
2666 104 : self.tenant_shard_id,
2667 104 : lowest_retain_lsn..end_lsn,
2668 104 : self.get_compaction_target_size(),
2669 104 : )
2670 104 : .await?;
2671 :
2672 : #[derive(Default)]
2673 : struct RewritingLayers {
2674 : before: Option<DeltaLayerWriter>,
2675 : after: Option<DeltaLayerWriter>,
2676 : }
2677 104 : let mut delta_layer_rewriters = HashMap::<Arc<PersistentLayerKey>, RewritingLayers>::new();
2678 :
2679 : /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`).
2680 : /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set.
2681 : /// In those cases, we need to pull up data from below the LSN range we're compaction.
2682 : ///
2683 : /// This function unifies the cases so that later code doesn't have to think about it.
2684 : ///
2685 : /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
2686 : /// is needed for reconstruction. This should be fixed in the future.
2687 : ///
2688 : /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
2689 : /// images.
2690 1244 : async fn get_ancestor_image(
2691 1244 : this_tline: &Arc<Timeline>,
2692 1244 : key: Key,
2693 1244 : ctx: &RequestContext,
2694 1244 : has_data_below: bool,
2695 1244 : history_lsn_point: Lsn,
2696 1244 : ) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
2697 1244 : if !has_data_below {
2698 1168 : return Ok(None);
2699 76 : };
2700 : // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
2701 : // as much existing code as possible.
2702 76 : let img = this_tline.get(key, history_lsn_point, ctx).await?;
2703 76 : Ok(Some((key, history_lsn_point, img)))
2704 1244 : }
2705 :
2706 : // Actually, we can decide not to write to the image layer at all at this point because
2707 : // the key and LSN range are determined. However, to keep things simple here, we still
2708 : // create this writer, and discard the writer in the end.
2709 :
2710 1932 : while let Some(((key, lsn, val), desc)) = merge_iter.next_with_trace().await? {
2711 1828 : if cancel.is_cancelled() {
2712 0 : return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
2713 1828 : }
2714 1828 : if self.shard_identity.is_key_disposable(&key) {
2715 : // If this shard does not need to store this key, simply skip it.
2716 : //
2717 : // This is not handled in the filter iterator because shard is determined by hash.
2718 : // Therefore, it does not give us any performance benefit to do things like skip
2719 : // a whole layer file as handling key spaces (ranges).
2720 0 : if cfg!(debug_assertions) {
2721 0 : let shard = self.shard_identity.shard_index();
2722 0 : let owner = self.shard_identity.get_shard_number(&key);
2723 0 : panic!("key {key} does not belong on shard {shard}, owned by {owner}");
2724 0 : }
2725 0 : continue;
2726 1828 : }
2727 1828 : if !job_desc.compaction_key_range.contains(&key) {
2728 128 : if !desc.is_delta {
2729 120 : continue;
2730 8 : }
2731 8 : let rewriter = delta_layer_rewriters.entry(desc.clone()).or_default();
2732 8 : let rewriter = if key < job_desc.compaction_key_range.start {
2733 0 : if rewriter.before.is_none() {
2734 0 : rewriter.before = Some(
2735 0 : DeltaLayerWriter::new(
2736 0 : self.conf,
2737 0 : self.timeline_id,
2738 0 : self.tenant_shard_id,
2739 0 : desc.key_range.start,
2740 0 : desc.lsn_range.clone(),
2741 0 : ctx,
2742 0 : )
2743 0 : .await?,
2744 : );
2745 0 : }
2746 0 : rewriter.before.as_mut().unwrap()
2747 8 : } else if key >= job_desc.compaction_key_range.end {
2748 8 : if rewriter.after.is_none() {
2749 4 : rewriter.after = Some(
2750 4 : DeltaLayerWriter::new(
2751 4 : self.conf,
2752 4 : self.timeline_id,
2753 4 : self.tenant_shard_id,
2754 4 : job_desc.compaction_key_range.end,
2755 4 : desc.lsn_range.clone(),
2756 4 : ctx,
2757 4 : )
2758 4 : .await?,
2759 : );
2760 4 : }
2761 8 : rewriter.after.as_mut().unwrap()
2762 : } else {
2763 0 : unreachable!()
2764 : };
2765 8 : rewriter.put_value(key, lsn, val, ctx).await?;
2766 8 : continue;
2767 1700 : }
2768 1700 : match val {
2769 1220 : Value::Image(_) => stat.visit_image_key(&val),
2770 480 : Value::WalRecord(_) => stat.visit_wal_key(&val),
2771 : }
2772 1700 : if last_key.is_none() || last_key.as_ref() == Some(&key) {
2773 560 : if last_key.is_none() {
2774 104 : last_key = Some(key);
2775 456 : }
2776 560 : accumulated_values.push((key, lsn, val));
2777 : } else {
2778 1140 : let last_key: &mut Key = last_key.as_mut().unwrap();
2779 1140 : stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
2780 1140 : let retention = self
2781 1140 : .generate_key_retention(
2782 1140 : *last_key,
2783 1140 : &accumulated_values,
2784 1140 : job_desc.gc_cutoff,
2785 1140 : &job_desc.retain_lsns_below_horizon,
2786 1140 : COMPACTION_DELTA_THRESHOLD,
2787 1140 : get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn)
2788 1140 : .await?,
2789 : )
2790 1140 : .await?;
2791 1140 : retention
2792 1140 : .pipe_to(
2793 1140 : *last_key,
2794 1140 : &mut delta_layer_writer,
2795 1140 : image_layer_writer.as_mut(),
2796 1140 : &mut stat,
2797 1140 : ctx,
2798 1140 : )
2799 1140 : .await?;
2800 1140 : accumulated_values.clear();
2801 1140 : *last_key = key;
2802 1140 : accumulated_values.push((key, lsn, val));
2803 : }
2804 : }
2805 :
2806 : // TODO: move the below part to the loop body
2807 104 : let last_key = last_key.expect("no keys produced during compaction");
2808 104 : stat.on_unique_key_visited();
2809 :
2810 104 : let retention = self
2811 104 : .generate_key_retention(
2812 104 : last_key,
2813 104 : &accumulated_values,
2814 104 : job_desc.gc_cutoff,
2815 104 : &job_desc.retain_lsns_below_horizon,
2816 104 : COMPACTION_DELTA_THRESHOLD,
2817 104 : get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?,
2818 : )
2819 104 : .await?;
2820 104 : retention
2821 104 : .pipe_to(
2822 104 : last_key,
2823 104 : &mut delta_layer_writer,
2824 104 : image_layer_writer.as_mut(),
2825 104 : &mut stat,
2826 104 : ctx,
2827 104 : )
2828 104 : .await?;
2829 : // end: move the above part to the loop body
2830 :
2831 104 : let mut rewrote_delta_layers = Vec::new();
2832 108 : for (key, writers) in delta_layer_rewriters {
2833 4 : if let Some(delta_writer_before) = writers.before {
2834 0 : let (desc, path) = delta_writer_before
2835 0 : .finish(job_desc.compaction_key_range.start, ctx)
2836 0 : .await?;
2837 0 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2838 0 : rewrote_delta_layers.push(layer);
2839 4 : }
2840 4 : if let Some(delta_writer_after) = writers.after {
2841 4 : let (desc, path) = delta_writer_after.finish(key.key_range.end, ctx).await?;
2842 4 : let layer = Layer::finish_creating(self.conf, self, desc, &path)?;
2843 4 : rewrote_delta_layers.push(layer);
2844 0 : }
2845 : }
2846 :
2847 148 : let discard = |key: &PersistentLayerKey| {
2848 148 : let key = key.clone();
2849 148 : async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
2850 148 : };
2851 :
2852 104 : let produced_image_layers = if let Some(writer) = image_layer_writer {
2853 84 : if !dry_run {
2854 76 : let end_key = job_desc.compaction_key_range.end;
2855 76 : writer
2856 76 : .finish_with_discard_fn(self, ctx, end_key, discard)
2857 76 : .await?
2858 : } else {
2859 8 : drop(writer);
2860 8 : Vec::new()
2861 : }
2862 : } else {
2863 20 : Vec::new()
2864 : };
2865 :
2866 104 : let produced_delta_layers = if !dry_run {
2867 96 : delta_layer_writer
2868 96 : .finish_with_discard_fn(self, ctx, discard)
2869 96 : .await?
2870 : } else {
2871 8 : drop(delta_layer_writer);
2872 8 : Vec::new()
2873 : };
2874 :
2875 : // TODO: make image/delta/rewrote_delta layers generation atomic. At this point, we already generated resident layers, and if
2876 : // compaction is cancelled at this point, we might have some layers that are not cleaned up.
2877 104 : let mut compact_to = Vec::new();
2878 104 : let mut keep_layers = HashSet::new();
2879 104 : let produced_delta_layers_len = produced_delta_layers.len();
2880 104 : let produced_image_layers_len = produced_image_layers.len();
2881 176 : for action in produced_delta_layers {
2882 72 : match action {
2883 44 : BatchWriterResult::Produced(layer) => {
2884 44 : if cfg!(debug_assertions) {
2885 44 : info!("produced delta layer: {}", layer.layer_desc().key());
2886 0 : }
2887 44 : stat.produce_delta_layer(layer.layer_desc().file_size());
2888 44 : compact_to.push(layer);
2889 : }
2890 28 : BatchWriterResult::Discarded(l) => {
2891 28 : if cfg!(debug_assertions) {
2892 28 : info!("discarded delta layer: {}", l);
2893 0 : }
2894 28 : keep_layers.insert(l);
2895 28 : stat.discard_delta_layer();
2896 : }
2897 : }
2898 : }
2899 108 : for layer in &rewrote_delta_layers {
2900 4 : debug!(
2901 0 : "produced rewritten delta layer: {}",
2902 0 : layer.layer_desc().key()
2903 : );
2904 : }
2905 104 : compact_to.extend(rewrote_delta_layers);
2906 180 : for action in produced_image_layers {
2907 76 : match action {
2908 60 : BatchWriterResult::Produced(layer) => {
2909 60 : debug!("produced image layer: {}", layer.layer_desc().key());
2910 60 : stat.produce_image_layer(layer.layer_desc().file_size());
2911 60 : compact_to.push(layer);
2912 : }
2913 16 : BatchWriterResult::Discarded(l) => {
2914 16 : debug!("discarded image layer: {}", l);
2915 16 : keep_layers.insert(l);
2916 16 : stat.discard_image_layer();
2917 : }
2918 : }
2919 : }
2920 :
2921 104 : let mut layer_selection = job_desc.selected_layers;
2922 :
2923 : // Partial compaction might select more data than it processes, e.g., if
2924 : // the compaction_key_range only partially overlaps:
2925 : //
2926 : // [---compaction_key_range---]
2927 : // [---A----][----B----][----C----][----D----]
2928 : //
2929 : // For delta layers, we will rewrite the layers so that it is cut exactly at
2930 : // the compaction key range, so we can always discard them. However, for image
2931 : // layers, as we do not rewrite them for now, we need to handle them differently.
2932 : // Assume image layers A, B, C, D are all in the `layer_selection`.
2933 : //
2934 : // The created image layers contain whatever is needed from B, C, and from
2935 : // `----]` of A, and from `[---` of D.
2936 : //
2937 : // In contrast, `[---A` and `D----]` have not been processed, so, we must
2938 : // keep that data.
2939 : //
2940 : // The solution for now is to keep A and D completely if they are image layers.
2941 : // (layer_selection is what we'll remove from the layer map, so, retain what
2942 : // is _not_ fully covered by compaction_key_range).
2943 408 : for layer in &layer_selection {
2944 304 : if !layer.layer_desc().is_delta() {
2945 132 : if !overlaps_with(
2946 132 : &layer.layer_desc().key_range,
2947 132 : &job_desc.compaction_key_range,
2948 132 : ) {
2949 0 : bail!("violated constraint: image layer outside of compaction key range");
2950 132 : }
2951 132 : if !fully_contains(
2952 132 : &job_desc.compaction_key_range,
2953 132 : &layer.layer_desc().key_range,
2954 132 : ) {
2955 16 : keep_layers.insert(layer.layer_desc().key());
2956 116 : }
2957 172 : }
2958 : }
2959 :
2960 304 : layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
2961 104 :
2962 104 : info!(
2963 0 : "gc-compaction statistics: {}",
2964 0 : serde_json::to_string(&stat)?
2965 : );
2966 :
2967 104 : if dry_run {
2968 8 : return Ok(());
2969 96 : }
2970 96 :
2971 96 : info!(
2972 0 : "produced {} delta layers and {} image layers, {} layers are kept",
2973 0 : produced_delta_layers_len,
2974 0 : produced_image_layers_len,
2975 0 : keep_layers.len()
2976 : );
2977 :
2978 : // Step 3: Place back to the layer map.
2979 :
2980 : // First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
2981 96 : let all_layers = {
2982 96 : let guard = self.layers.read().await;
2983 96 : let layer_map = guard.layer_map()?;
2984 96 : layer_map.iter_historic_layers().collect_vec()
2985 96 : };
2986 96 :
2987 96 : let mut final_layers = all_layers
2988 96 : .iter()
2989 428 : .map(|layer| layer.layer_name())
2990 96 : .collect::<HashSet<_>>();
2991 304 : for layer in &layer_selection {
2992 208 : final_layers.remove(&layer.layer_desc().layer_name());
2993 208 : }
2994 204 : for layer in &compact_to {
2995 108 : final_layers.insert(layer.layer_desc().layer_name());
2996 108 : }
2997 96 : let final_layers = final_layers.into_iter().collect_vec();
2998 :
2999 : // TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
3000 : // the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
3001 : // in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
3002 96 : if let Some(err) = check_valid_layermap(&final_layers) {
3003 0 : bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
3004 96 : }
3005 :
3006 : // Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
3007 : // operate on L1 layers.
3008 : {
3009 : // Gc-compaction will rewrite the history of a key. This could happen in two ways:
3010 : //
3011 : // 1. We create an image layer to replace all the deltas below the compact LSN. In this case, assume
3012 : // we have 2 delta layers A and B, both below the compact LSN. We create an image layer I to replace
3013 : // A and B at the compact LSN. If the read path finishes reading A, yields, and now we update the layer
3014 : // map, the read path then cannot find any keys below A, reporting a missing key error, while the key
3015 : // now gets stored in I at the compact LSN.
3016 : //
3017 : // --------------- ---------------
3018 : // delta1@LSN20 image1@LSN20
3019 : // --------------- (read path collects delta@LSN20, => --------------- (read path cannot find anything
3020 : // delta1@LSN10 yields) below LSN 20)
3021 : // ---------------
3022 : //
3023 : // 2. We create a delta layer to replace all the deltas below the compact LSN, and in the delta layers,
3024 : // we combines the history of a key into a single image. For example, we have deltas at LSN 1, 2, 3, 4,
3025 : // Assume one delta layer contains LSN 1, 2, 3 and the other contains LSN 4.
3026 : //
3027 : // We let gc-compaction combine delta 2, 3, 4 into an image at LSN 4, which produces a delta layer that
3028 : // contains the delta at LSN 1, the image at LSN 4. If the read path finishes reading the original delta
3029 : // layer containing 4, yields, and we update the layer map to put the delta layer.
3030 : //
3031 : // --------------- ---------------
3032 : // delta1@LSN4 image1@LSN4
3033 : // --------------- (read path collects delta@LSN4, => --------------- (read path collects LSN4 and LSN1,
3034 : // delta1@LSN1-3 yields) delta1@LSN1 which is an invalid history)
3035 : // --------------- ---------------
3036 : //
3037 : // Therefore, the gc-compaction layer update operation should wait for all ongoing reads, block all pending reads,
3038 : // and only allow reads to continue after the update is finished.
3039 :
3040 96 : let update_guard = self.gc_compaction_layer_update_lock.write().await;
3041 : // Acquiring the update guard ensures current read operations end and new read operations are blocked.
3042 : // TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
3043 96 : let mut guard = self.layers.write().await;
3044 96 : guard
3045 96 : .open_mut()?
3046 96 : .finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
3047 96 : drop(update_guard); // Allow new reads to start ONLY after we finished updating the layer map.
3048 96 : };
3049 96 :
3050 96 : // Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
3051 96 : // Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
3052 96 : // find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
3053 96 : // be batched into `schedule_compaction_update`.
3054 96 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
3055 96 : self.schedule_uploads(disk_consistent_lsn, None)?;
3056 : // If a layer gets rewritten throughout gc-compaction, we need to keep that layer only in `compact_to` instead
3057 : // of `compact_from`.
3058 96 : let compact_from = {
3059 96 : let mut compact_from = Vec::new();
3060 96 : let mut compact_to_set = HashMap::new();
3061 204 : for layer in &compact_to {
3062 108 : compact_to_set.insert(layer.layer_desc().key(), layer);
3063 108 : }
3064 304 : for layer in &layer_selection {
3065 208 : if let Some(to) = compact_to_set.get(&layer.layer_desc().key()) {
3066 0 : tracing::info!(
3067 0 : "skipping delete {} because found same layer key at different generation {}",
3068 : layer, to
3069 : );
3070 208 : } else {
3071 208 : compact_from.push(layer.clone());
3072 208 : }
3073 : }
3074 96 : compact_from
3075 96 : };
3076 96 : self.remote_client
3077 96 : .schedule_compaction_update(&compact_from, &compact_to)?;
3078 :
3079 96 : drop(gc_lock);
3080 96 :
3081 96 : Ok(())
3082 108 : }
3083 : }
3084 :
3085 : struct TimelineAdaptor {
3086 : timeline: Arc<Timeline>,
3087 :
3088 : keyspace: (Lsn, KeySpace),
3089 :
3090 : new_deltas: Vec<ResidentLayer>,
3091 : new_images: Vec<ResidentLayer>,
3092 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
3093 : }
3094 :
3095 : impl TimelineAdaptor {
3096 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
3097 0 : Self {
3098 0 : timeline: timeline.clone(),
3099 0 : keyspace,
3100 0 : new_images: Vec::new(),
3101 0 : new_deltas: Vec::new(),
3102 0 : layers_to_delete: Vec::new(),
3103 0 : }
3104 0 : }
3105 :
3106 0 : pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
3107 0 : let layers_to_delete = {
3108 0 : let guard = self.timeline.layers.read().await;
3109 0 : self.layers_to_delete
3110 0 : .iter()
3111 0 : .map(|x| guard.get_from_desc(x))
3112 0 : .collect::<Vec<Layer>>()
3113 0 : };
3114 0 : self.timeline
3115 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
3116 0 : .await?;
3117 :
3118 0 : self.timeline
3119 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
3120 :
3121 0 : self.new_deltas.clear();
3122 0 : self.layers_to_delete.clear();
3123 0 : Ok(())
3124 0 : }
3125 : }
3126 :
3127 : #[derive(Clone)]
3128 : struct ResidentDeltaLayer(ResidentLayer);
3129 : #[derive(Clone)]
3130 : struct ResidentImageLayer(ResidentLayer);
3131 :
3132 : impl CompactionJobExecutor for TimelineAdaptor {
3133 : type Key = pageserver_api::key::Key;
3134 :
3135 : type Layer = OwnArc<PersistentLayerDesc>;
3136 : type DeltaLayer = ResidentDeltaLayer;
3137 : type ImageLayer = ResidentImageLayer;
3138 :
3139 : type RequestContext = crate::context::RequestContext;
3140 :
3141 0 : fn get_shard_identity(&self) -> &ShardIdentity {
3142 0 : self.timeline.get_shard_identity()
3143 0 : }
3144 :
3145 0 : async fn get_layers(
3146 0 : &mut self,
3147 0 : key_range: &Range<Key>,
3148 0 : lsn_range: &Range<Lsn>,
3149 0 : _ctx: &RequestContext,
3150 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
3151 0 : self.flush_updates().await?;
3152 :
3153 0 : let guard = self.timeline.layers.read().await;
3154 0 : let layer_map = guard.layer_map()?;
3155 :
3156 0 : let result = layer_map
3157 0 : .iter_historic_layers()
3158 0 : .filter(|l| {
3159 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
3160 0 : })
3161 0 : .map(OwnArc)
3162 0 : .collect();
3163 0 : Ok(result)
3164 0 : }
3165 :
3166 0 : async fn get_keyspace(
3167 0 : &mut self,
3168 0 : key_range: &Range<Key>,
3169 0 : lsn: Lsn,
3170 0 : _ctx: &RequestContext,
3171 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
3172 0 : if lsn == self.keyspace.0 {
3173 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
3174 0 : &self.keyspace.1.ranges,
3175 0 : key_range,
3176 0 : ))
3177 : } else {
3178 : // The current compaction implementation only ever requests the key space
3179 : // at the compaction end LSN.
3180 0 : anyhow::bail!("keyspace not available for requested lsn");
3181 : }
3182 0 : }
3183 :
3184 0 : async fn downcast_delta_layer(
3185 0 : &self,
3186 0 : layer: &OwnArc<PersistentLayerDesc>,
3187 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
3188 0 : // this is a lot more complex than a simple downcast...
3189 0 : if layer.is_delta() {
3190 0 : let l = {
3191 0 : let guard = self.timeline.layers.read().await;
3192 0 : guard.get_from_desc(layer)
3193 : };
3194 0 : let result = l.download_and_keep_resident().await?;
3195 :
3196 0 : Ok(Some(ResidentDeltaLayer(result)))
3197 : } else {
3198 0 : Ok(None)
3199 : }
3200 0 : }
3201 :
3202 0 : async fn create_image(
3203 0 : &mut self,
3204 0 : lsn: Lsn,
3205 0 : key_range: &Range<Key>,
3206 0 : ctx: &RequestContext,
3207 0 : ) -> anyhow::Result<()> {
3208 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
3209 0 : }
3210 :
3211 0 : async fn create_delta(
3212 0 : &mut self,
3213 0 : lsn_range: &Range<Lsn>,
3214 0 : key_range: &Range<Key>,
3215 0 : input_layers: &[ResidentDeltaLayer],
3216 0 : ctx: &RequestContext,
3217 0 : ) -> anyhow::Result<()> {
3218 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3219 :
3220 0 : let mut all_entries = Vec::new();
3221 0 : for dl in input_layers.iter() {
3222 0 : all_entries.extend(dl.load_keys(ctx).await?);
3223 : }
3224 :
3225 : // The current stdlib sorting implementation is designed in a way where it is
3226 : // particularly fast where the slice is made up of sorted sub-ranges.
3227 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3228 :
3229 0 : let mut writer = DeltaLayerWriter::new(
3230 0 : self.timeline.conf,
3231 0 : self.timeline.timeline_id,
3232 0 : self.timeline.tenant_shard_id,
3233 0 : key_range.start,
3234 0 : lsn_range.clone(),
3235 0 : ctx,
3236 0 : )
3237 0 : .await?;
3238 :
3239 0 : let mut dup_values = 0;
3240 0 :
3241 0 : // This iterator walks through all key-value pairs from all the layers
3242 0 : // we're compacting, in key, LSN order.
3243 0 : let mut prev: Option<(Key, Lsn)> = None;
3244 : for &DeltaEntry {
3245 0 : key, lsn, ref val, ..
3246 0 : } in all_entries.iter()
3247 : {
3248 0 : if prev == Some((key, lsn)) {
3249 : // This is a duplicate. Skip it.
3250 : //
3251 : // It can happen if compaction is interrupted after writing some
3252 : // layers but not all, and we are compacting the range again.
3253 : // The calculations in the algorithm assume that there are no
3254 : // duplicates, so the math on targeted file size is likely off,
3255 : // and we will create smaller files than expected.
3256 0 : dup_values += 1;
3257 0 : continue;
3258 0 : }
3259 :
3260 0 : let value = val.load(ctx).await?;
3261 :
3262 0 : writer.put_value(key, lsn, value, ctx).await?;
3263 :
3264 0 : prev = Some((key, lsn));
3265 : }
3266 :
3267 0 : if dup_values > 0 {
3268 0 : warn!("delta layer created with {} duplicate values", dup_values);
3269 0 : }
3270 :
3271 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3272 0 : Err(anyhow::anyhow!(
3273 0 : "failpoint delta-layer-writer-fail-before-finish"
3274 0 : ))
3275 0 : });
3276 :
3277 0 : let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
3278 0 : let new_delta_layer =
3279 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
3280 :
3281 0 : self.new_deltas.push(new_delta_layer);
3282 0 : Ok(())
3283 0 : }
3284 :
3285 0 : async fn delete_layer(
3286 0 : &mut self,
3287 0 : layer: &OwnArc<PersistentLayerDesc>,
3288 0 : _ctx: &RequestContext,
3289 0 : ) -> anyhow::Result<()> {
3290 0 : self.layers_to_delete.push(layer.clone().0);
3291 0 : Ok(())
3292 0 : }
3293 : }
3294 :
3295 : impl TimelineAdaptor {
3296 0 : async fn create_image_impl(
3297 0 : &mut self,
3298 0 : lsn: Lsn,
3299 0 : key_range: &Range<Key>,
3300 0 : ctx: &RequestContext,
3301 0 : ) -> Result<(), CreateImageLayersError> {
3302 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
3303 :
3304 0 : let image_layer_writer = ImageLayerWriter::new(
3305 0 : self.timeline.conf,
3306 0 : self.timeline.timeline_id,
3307 0 : self.timeline.tenant_shard_id,
3308 0 : key_range,
3309 0 : lsn,
3310 0 : ctx,
3311 0 : )
3312 0 : .await?;
3313 :
3314 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3315 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3316 0 : "failpoint image-layer-writer-fail-before-finish"
3317 0 : )))
3318 0 : });
3319 :
3320 0 : let keyspace = KeySpace {
3321 0 : ranges: self.get_keyspace(key_range, lsn, ctx).await?,
3322 : };
3323 : // TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
3324 0 : let outcome = self
3325 0 : .timeline
3326 0 : .create_image_layer_for_rel_blocks(
3327 0 : &keyspace,
3328 0 : image_layer_writer,
3329 0 : lsn,
3330 0 : ctx,
3331 0 : key_range.clone(),
3332 0 : IoConcurrency::sequential(),
3333 0 : )
3334 0 : .await?;
3335 :
3336 : if let ImageLayerCreationOutcome::Generated {
3337 0 : unfinished_image_layer,
3338 0 : } = outcome
3339 : {
3340 0 : let (desc, path) = unfinished_image_layer.finish(ctx).await?;
3341 0 : let image_layer =
3342 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
3343 0 : self.new_images.push(image_layer);
3344 0 : }
3345 :
3346 0 : timer.stop_and_record();
3347 0 :
3348 0 : Ok(())
3349 0 : }
3350 : }
3351 :
3352 : impl CompactionRequestContext for crate::context::RequestContext {}
3353 :
3354 : #[derive(Debug, Clone)]
3355 : pub struct OwnArc<T>(pub Arc<T>);
3356 :
3357 : impl<T> Deref for OwnArc<T> {
3358 : type Target = <Arc<T> as Deref>::Target;
3359 0 : fn deref(&self) -> &Self::Target {
3360 0 : &self.0
3361 0 : }
3362 : }
3363 :
3364 : impl<T> AsRef<T> for OwnArc<T> {
3365 0 : fn as_ref(&self) -> &T {
3366 0 : self.0.as_ref()
3367 0 : }
3368 : }
3369 :
3370 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
3371 0 : fn key_range(&self) -> &Range<Key> {
3372 0 : &self.key_range
3373 0 : }
3374 0 : fn lsn_range(&self) -> &Range<Lsn> {
3375 0 : &self.lsn_range
3376 0 : }
3377 0 : fn file_size(&self) -> u64 {
3378 0 : self.file_size
3379 0 : }
3380 0 : fn short_id(&self) -> std::string::String {
3381 0 : self.as_ref().short_id().to_string()
3382 0 : }
3383 0 : fn is_delta(&self) -> bool {
3384 0 : self.as_ref().is_delta()
3385 0 : }
3386 : }
3387 :
3388 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
3389 0 : fn key_range(&self) -> &Range<Key> {
3390 0 : &self.layer_desc().key_range
3391 0 : }
3392 0 : fn lsn_range(&self) -> &Range<Lsn> {
3393 0 : &self.layer_desc().lsn_range
3394 0 : }
3395 0 : fn file_size(&self) -> u64 {
3396 0 : self.layer_desc().file_size
3397 0 : }
3398 0 : fn short_id(&self) -> std::string::String {
3399 0 : self.layer_desc().short_id().to_string()
3400 0 : }
3401 0 : fn is_delta(&self) -> bool {
3402 0 : true
3403 0 : }
3404 : }
3405 :
3406 : use crate::tenant::timeline::DeltaEntry;
3407 :
3408 : impl CompactionLayer<Key> for ResidentDeltaLayer {
3409 0 : fn key_range(&self) -> &Range<Key> {
3410 0 : &self.0.layer_desc().key_range
3411 0 : }
3412 0 : fn lsn_range(&self) -> &Range<Lsn> {
3413 0 : &self.0.layer_desc().lsn_range
3414 0 : }
3415 0 : fn file_size(&self) -> u64 {
3416 0 : self.0.layer_desc().file_size
3417 0 : }
3418 0 : fn short_id(&self) -> std::string::String {
3419 0 : self.0.layer_desc().short_id().to_string()
3420 0 : }
3421 0 : fn is_delta(&self) -> bool {
3422 0 : true
3423 0 : }
3424 : }
3425 :
3426 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
3427 : type DeltaEntry<'a> = DeltaEntry<'a>;
3428 :
3429 0 : async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
3430 0 : self.0.get_as_delta(ctx).await?.index_entries(ctx).await
3431 0 : }
3432 : }
3433 :
3434 : impl CompactionLayer<Key> for ResidentImageLayer {
3435 0 : fn key_range(&self) -> &Range<Key> {
3436 0 : &self.0.layer_desc().key_range
3437 0 : }
3438 0 : fn lsn_range(&self) -> &Range<Lsn> {
3439 0 : &self.0.layer_desc().lsn_range
3440 0 : }
3441 0 : fn file_size(&self) -> u64 {
3442 0 : self.0.layer_desc().file_size
3443 0 : }
3444 0 : fn short_id(&self) -> std::string::String {
3445 0 : self.0.layer_desc().short_id().to_string()
3446 0 : }
3447 0 : fn is_delta(&self) -> bool {
3448 0 : false
3449 0 : }
3450 : }
3451 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|