Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};
13 :
14 : use anyhow::{anyhow, Context};
15 : use enumset::EnumSet;
16 : use fail::fail_point;
17 : use itertools::Itertools;
18 : use pageserver_api::shard::TenantShardId;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::{debug, info, info_span, trace, warn, Instrument};
21 : use utils::id::TimelineId;
22 :
23 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
24 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
25 : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
26 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
27 : use crate::tenant::timeline::{Layer, ResidentLayer};
28 : use crate::tenant::DeltaLayer;
29 : use crate::tenant::PageReconstructError;
30 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
31 : use crate::{page_cache, ZERO_PAGE};
32 :
33 : use crate::keyspace::KeySpace;
34 : use crate::repository::Key;
35 :
36 : use utils::lsn::Lsn;
37 :
38 : use pageserver_compaction::helpers::overlaps_with;
39 : use pageserver_compaction::interface::*;
40 :
41 : use super::CompactionError;
42 :
43 : impl Timeline {
44 : /// TODO: cancellation
45 510 : pub(crate) async fn compact_legacy(
46 510 : self: &Arc<Self>,
47 510 : _cancel: &CancellationToken,
48 510 : flags: EnumSet<CompactFlags>,
49 510 : ctx: &RequestContext,
50 510 : ) -> Result<(), CompactionError> {
51 510 : // High level strategy for compaction / image creation:
52 510 : //
53 510 : // 1. First, calculate the desired "partitioning" of the
54 510 : // currently in-use key space. The goal is to partition the
55 510 : // key space into roughly fixed-size chunks, but also take into
56 510 : // account any existing image layers, and try to align the
57 510 : // chunk boundaries with the existing image layers to avoid
58 510 : // too much churn. Also try to align chunk boundaries with
59 510 : // relation boundaries. In principle, we don't know about
60 510 : // relation boundaries here, we just deal with key-value
61 510 : // pairs, and the code in pgdatadir_mapping.rs knows how to
62 510 : // map relations into key-value pairs. But in practice we know
63 510 : // that 'field6' is the block number, and the fields 1-5
64 510 : // identify a relation. This is just an optimization,
65 510 : // though.
66 510 : //
67 510 : // 2. Once we know the partitioning, for each partition,
68 510 : // decide if it's time to create a new image layer. The
69 510 : // criteria is: there has been too much "churn" since the last
70 510 : // image layer? The "churn" is fuzzy concept, it's a
71 510 : // combination of too many delta files, or too much WAL in
72 510 : // total in the delta file. Or perhaps: if creating an image
73 510 : // file would allow to delete some older files.
74 510 : //
75 510 : // 3. After that, we compact all level0 delta files if there
76 510 : // are too many of them. While compacting, we also garbage
77 510 : // collect any page versions that are no longer needed because
78 510 : // of the new image layers we created in step 2.
79 510 : //
80 510 : // TODO: This high level strategy hasn't been implemented yet.
81 510 : // Below are functions compact_level0() and create_image_layers()
82 510 : // but they are a bit ad hoc and don't quite work like it's explained
83 510 : // above. Rewrite it.
84 510 :
85 510 : // Is the timeline being deleted?
86 510 : if self.is_stopping() {
87 0 : trace!("Dropping out of compaction on timeline shutdown");
88 0 : return Err(CompactionError::ShuttingDown);
89 510 : }
90 510 :
91 510 : let target_file_size = self.get_checkpoint_distance();
92 510 :
93 510 : // Define partitioning schema if needed
94 510 :
95 510 : // FIXME: the match should only cover repartitioning, not the next steps
96 510 : match self
97 510 : .repartition(
98 510 : self.get_last_record_lsn(),
99 510 : self.get_compaction_target_size(),
100 510 : flags,
101 510 : ctx,
102 510 : )
103 13300 : .await
104 : {
105 510 : Ok((partitioning, lsn)) => {
106 510 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
107 510 : let image_ctx = RequestContextBuilder::extend(ctx)
108 510 : .access_stats_behavior(AccessStatsBehavior::Skip)
109 510 : .build();
110 510 :
111 510 : // 2. Compact
112 510 : let timer = self.metrics.compact_time_histo.start_timer();
113 76358 : self.compact_level0(target_file_size, ctx).await?;
114 510 : timer.stop_and_record();
115 :
116 : // 3. Create new image layers for partitions that have been modified
117 : // "enough".
118 510 : let layers = self
119 510 : .create_image_layers(
120 510 : &partitioning,
121 510 : lsn,
122 510 : flags.contains(CompactFlags::ForceImageLayerCreation),
123 510 : &image_ctx,
124 510 : )
125 1 : .await
126 510 : .map_err(anyhow::Error::from)?;
127 :
128 510 : self.upload_new_image_layers(layers)?;
129 : }
130 0 : Err(err) => {
131 0 : // no partitioning? This is normal, if the timeline was just created
132 0 : // as an empty timeline. Also in unit tests, when we use the timeline
133 0 : // as a simple key-value store, ignoring the datadir layout. Log the
134 0 : // error but continue.
135 0 : //
136 0 : // Suppress error when it's due to cancellation
137 0 : if !self.cancel.is_cancelled() {
138 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
139 0 : }
140 : }
141 : };
142 :
143 510 : Ok(())
144 510 : }
145 :
146 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
147 : /// as Level 1 files.
148 510 : async fn compact_level0(
149 510 : self: &Arc<Self>,
150 510 : target_file_size: u64,
151 510 : ctx: &RequestContext,
152 510 : ) -> Result<(), CompactionError> {
153 : let CompactLevel0Phase1Result {
154 510 : new_layers,
155 510 : deltas_to_compact,
156 : } = {
157 510 : let phase1_span = info_span!("compact_level0_phase1");
158 510 : let ctx = ctx.attached_child();
159 510 : let mut stats = CompactLevel0Phase1StatsBuilder {
160 510 : version: Some(2),
161 510 : tenant_id: Some(self.tenant_shard_id),
162 510 : timeline_id: Some(self.timeline_id),
163 510 : ..Default::default()
164 510 : };
165 510 :
166 510 : let begin = tokio::time::Instant::now();
167 510 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
168 510 : let now = tokio::time::Instant::now();
169 510 : stats.read_lock_acquisition_micros =
170 510 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
171 510 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
172 510 : .instrument(phase1_span)
173 76358 : .await?
174 : };
175 :
176 510 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
177 : // nothing to do
178 468 : return Ok(());
179 42 : }
180 42 :
181 42 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
182 0 : .await?;
183 42 : Ok(())
184 510 : }
185 :
186 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
187 510 : async fn compact_level0_phase1(
188 510 : self: &Arc<Self>,
189 510 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
190 510 : mut stats: CompactLevel0Phase1StatsBuilder,
191 510 : target_file_size: u64,
192 510 : ctx: &RequestContext,
193 510 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
194 510 : stats.read_lock_held_spawn_blocking_startup_micros =
195 510 : stats.read_lock_acquisition_micros.till_now(); // set by caller
196 510 : let layers = guard.layer_map();
197 510 : let level0_deltas = layers.get_level0_deltas()?;
198 510 : let mut level0_deltas = level0_deltas
199 510 : .into_iter()
200 2362 : .map(|x| guard.get_from_desc(&x))
201 510 : .collect_vec();
202 510 : stats.level0_deltas_count = Some(level0_deltas.len());
203 510 : // Only compact if enough layers have accumulated.
204 510 : let threshold = self.get_compaction_threshold();
205 510 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
206 468 : debug!(
207 0 : level0_deltas = level0_deltas.len(),
208 0 : threshold, "too few deltas to compact"
209 0 : );
210 468 : return Ok(CompactLevel0Phase1Result::default());
211 42 : }
212 42 :
213 42 : // This failpoint is used together with `test_duplicate_layers` integration test.
214 42 : // It returns the compaction result exactly the same layers as input to compaction.
215 42 : // We want to ensure that this will not cause any problem when updating the layer map
216 42 : // after the compaction is finished.
217 42 : //
218 42 : // Currently, there are two rare edge cases that will cause duplicated layers being
219 42 : // inserted.
220 42 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
221 42 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
222 42 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
223 42 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
224 42 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
225 42 : // layer replace instead of the normal remove / upload process.
226 42 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
227 42 : // size length. Compaction will likely create the same set of n files afterwards.
228 42 : //
229 42 : // This failpoint is a superset of both of the cases.
230 42 : if cfg!(feature = "testing") {
231 42 : let active = (|| {
232 42 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
233 42 : false
234 42 : })();
235 42 :
236 42 : if active {
237 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
238 0 : for delta in &level0_deltas {
239 : // we are just faking these layers as being produced again for this failpoint
240 0 : new_layers.push(
241 0 : delta
242 0 : .download_and_keep_resident()
243 0 : .await
244 0 : .context("download layer for failpoint")?,
245 : );
246 : }
247 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
248 0 : return Ok(CompactLevel0Phase1Result {
249 0 : new_layers,
250 0 : deltas_to_compact: level0_deltas,
251 0 : });
252 42 : }
253 0 : }
254 :
255 : // Gather the files to compact in this iteration.
256 : //
257 : // Start with the oldest Level 0 delta file, and collect any other
258 : // level 0 files that form a contiguous sequence, such that the end
259 : // LSN of previous file matches the start LSN of the next file.
260 : //
261 : // Note that if the files don't form such a sequence, we might
262 : // "compact" just a single file. That's a bit pointless, but it allows
263 : // us to get rid of the level 0 file, and compact the other files on
264 : // the next iteration. This could probably made smarter, but such
265 : // "gaps" in the sequence of level 0 files should only happen in case
266 : // of a crash, partial download from cloud storage, or something like
267 : // that, so it's not a big deal in practice.
268 800 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
269 42 : let mut level0_deltas_iter = level0_deltas.iter();
270 42 :
271 42 : let first_level0_delta = level0_deltas_iter.next().unwrap();
272 42 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
273 42 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
274 42 :
275 42 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
276 442 : for l in level0_deltas_iter {
277 400 : let lsn_range = &l.layer_desc().lsn_range;
278 400 :
279 400 : if lsn_range.start != prev_lsn_end {
280 0 : break;
281 400 : }
282 400 : deltas_to_compact.push(l.download_and_keep_resident().await?);
283 400 : prev_lsn_end = lsn_range.end;
284 : }
285 42 : let lsn_range = Range {
286 42 : start: deltas_to_compact
287 42 : .first()
288 42 : .unwrap()
289 42 : .layer_desc()
290 42 : .lsn_range
291 42 : .start,
292 42 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
293 42 : };
294 42 :
295 42 : info!(
296 42 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
297 42 : lsn_range.start,
298 42 : lsn_range.end,
299 42 : deltas_to_compact.len(),
300 42 : level0_deltas.len()
301 42 : );
302 :
303 442 : for l in deltas_to_compact.iter() {
304 442 : info!("compact includes {l}");
305 : }
306 :
307 : // We don't need the original list of layers anymore. Drop it so that
308 : // we don't accidentally use it later in the function.
309 42 : drop(level0_deltas);
310 42 :
311 42 : stats.read_lock_held_prerequisites_micros = stats
312 42 : .read_lock_held_spawn_blocking_startup_micros
313 42 : .till_now();
314 42 :
315 42 : // Determine N largest holes where N is number of compacted layers.
316 42 : let max_holes = deltas_to_compact.len();
317 42 : let last_record_lsn = self.get_last_record_lsn();
318 42 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
319 42 : let min_hole_coverage_size = 3; // TODO: something more flexible?
320 42 :
321 42 : // min-heap (reserve space for one more element added before eviction)
322 42 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
323 42 : let mut prev: Option<Key> = None;
324 42 :
325 42 : let mut all_keys = Vec::new();
326 :
327 442 : for l in deltas_to_compact.iter() {
328 3549 : all_keys.extend(l.load_keys(ctx).await?);
329 : }
330 :
331 : // FIXME: should spawn_blocking the rest of this function
332 :
333 : // The current stdlib sorting implementation is designed in a way where it is
334 : // particularly fast where the slice is made up of sorted sub-ranges.
335 6971354 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
336 42 :
337 42 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
338 :
339 3121998 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
340 3121998 : if let Some(prev_key) = prev {
341 : // just first fast filter
342 3121956 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
343 0 : let key_range = prev_key..next_key;
344 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
345 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
346 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
347 0 : // That is why it is better to measure size of hole as number of covering image layers.
348 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
349 0 : if coverage_size >= min_hole_coverage_size {
350 0 : heap.push(Hole {
351 0 : key_range,
352 0 : coverage_size,
353 0 : });
354 0 : if heap.len() > max_holes {
355 0 : heap.pop(); // remove smallest hole
356 0 : }
357 0 : }
358 3121956 : }
359 42 : }
360 3121998 : prev = Some(next_key.next());
361 : }
362 42 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
363 42 : drop_rlock(guard);
364 42 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
365 42 : let mut holes = heap.into_vec();
366 42 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
367 42 : let mut next_hole = 0; // index of next hole in holes vector
368 42 :
369 42 : // This iterator walks through all key-value pairs from all the layers
370 42 : // we're compacting, in key, LSN order.
371 42 : let all_values_iter = all_keys.iter();
372 42 :
373 42 : // This iterator walks through all keys and is needed to calculate size used by each key
374 42 : let mut all_keys_iter = all_keys
375 42 : .iter()
376 3121998 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
377 3121956 : .coalesce(|mut prev, cur| {
378 3121956 : // Coalesce keys that belong to the same key pair.
379 3121956 : // This ensures that compaction doesn't put them
380 3121956 : // into different layer files.
381 3121956 : // Still limit this by the target file size,
382 3121956 : // so that we keep the size of the files in
383 3121956 : // check.
384 3121956 : if prev.0 == cur.0 && prev.2 < target_file_size {
385 92000 : prev.2 += cur.2;
386 92000 : Ok(prev)
387 : } else {
388 3029956 : Err((prev, cur))
389 : }
390 3121956 : });
391 42 :
392 42 : // Merge the contents of all the input delta layers into a new set
393 42 : // of delta layers, based on the current partitioning.
394 42 : //
395 42 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
396 42 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
397 42 : // would be too large. In that case, we also split on the LSN dimension.
398 42 : //
399 42 : // LSN
400 42 : // ^
401 42 : // |
402 42 : // | +-----------+ +--+--+--+--+
403 42 : // | | | | | | | |
404 42 : // | +-----------+ | | | | |
405 42 : // | | | | | | | |
406 42 : // | +-----------+ ==> | | | | |
407 42 : // | | | | | | | |
408 42 : // | +-----------+ | | | | |
409 42 : // | | | | | | | |
410 42 : // | +-----------+ +--+--+--+--+
411 42 : // |
412 42 : // +--------------> key
413 42 : //
414 42 : //
415 42 : // If one key (X) has a lot of page versions:
416 42 : //
417 42 : // LSN
418 42 : // ^
419 42 : // | (X)
420 42 : // | +-----------+ +--+--+--+--+
421 42 : // | | | | | | | |
422 42 : // | +-----------+ | | +--+ |
423 42 : // | | | | | | | |
424 42 : // | +-----------+ ==> | | | | |
425 42 : // | | | | | +--+ |
426 42 : // | +-----------+ | | | | |
427 42 : // | | | | | | | |
428 42 : // | +-----------+ +--+--+--+--+
429 42 : // |
430 42 : // +--------------> key
431 42 : // TODO: this actually divides the layers into fixed-size chunks, not
432 42 : // based on the partitioning.
433 42 : //
434 42 : // TODO: we should also opportunistically materialize and
435 42 : // garbage collect what we can.
436 42 : let mut new_layers = Vec::new();
437 42 : let mut prev_key: Option<Key> = None;
438 42 : let mut writer: Option<DeltaLayerWriter> = None;
439 42 : let mut key_values_total_size = 0u64;
440 42 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
441 42 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
442 :
443 : for &DeltaEntry {
444 3121998 : key, lsn, ref val, ..
445 3122040 : } in all_values_iter
446 : {
447 3121998 : let value = val.load(ctx).await?;
448 3121998 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
449 3121998 : // We need to check key boundaries once we reach next key or end of layer with the same key
450 3121998 : if !same_key || lsn == dup_end_lsn {
451 3029998 : let mut next_key_size = 0u64;
452 3029998 : let is_dup_layer = dup_end_lsn.is_valid();
453 3029998 : dup_start_lsn = Lsn::INVALID;
454 3029998 : if !same_key {
455 3029998 : dup_end_lsn = Lsn::INVALID;
456 3029998 : }
457 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
458 3029998 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
459 3029998 : next_key_size = next_size;
460 3029998 : if key != next_key {
461 3029956 : if dup_end_lsn.is_valid() {
462 0 : // We are writting segment with duplicates:
463 0 : // place all remaining values of this key in separate segment
464 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
465 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
466 3029956 : }
467 3029956 : break;
468 42 : }
469 42 : key_values_total_size += next_size;
470 42 : // Check if it is time to split segment: if total keys size is larger than target file size.
471 42 : // We need to avoid generation of empty segments if next_size > target_file_size.
472 42 : if key_values_total_size > target_file_size && lsn != next_lsn {
473 : // Split key between multiple layers: such layer can contain only single key
474 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
475 0 : dup_end_lsn // new segment with duplicates starts where old one stops
476 : } else {
477 0 : lsn // start with the first LSN for this key
478 : };
479 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
480 0 : break;
481 42 : }
482 : }
483 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
484 3029998 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
485 0 : dup_start_lsn = dup_end_lsn;
486 0 : dup_end_lsn = lsn_range.end;
487 3029998 : }
488 3029998 : if writer.is_some() {
489 3029956 : let written_size = writer.as_mut().unwrap().size();
490 3029956 : let contains_hole =
491 3029956 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
492 : // check if key cause layer overflow or contains hole...
493 3029956 : if is_dup_layer
494 3029956 : || dup_end_lsn.is_valid()
495 3029956 : || written_size + key_values_total_size > target_file_size
496 3029756 : || contains_hole
497 : {
498 : // ... if so, flush previous layer and prepare to write new one
499 200 : new_layers.push(
500 200 : writer
501 200 : .take()
502 200 : .unwrap()
503 200 : .finish(prev_key.unwrap().next(), self)
504 508 : .await?,
505 : );
506 200 : writer = None;
507 200 :
508 200 : if contains_hole {
509 0 : // skip hole
510 0 : next_hole += 1;
511 200 : }
512 3029756 : }
513 42 : }
514 : // Remember size of key value because at next iteration we will access next item
515 3029998 : key_values_total_size = next_key_size;
516 92000 : }
517 3121998 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
518 0 : Err(CompactionError::Other(anyhow::anyhow!(
519 0 : "failpoint delta-layer-writer-fail-before-finish"
520 0 : )))
521 3121998 : });
522 :
523 3121998 : if !self.shard_identity.is_key_disposable(&key) {
524 3121998 : if writer.is_none() {
525 242 : // Create writer if not initiaized yet
526 242 : writer = Some(
527 : DeltaLayerWriter::new(
528 242 : self.conf,
529 242 : self.timeline_id,
530 242 : self.tenant_shard_id,
531 242 : key,
532 242 : if dup_end_lsn.is_valid() {
533 : // this is a layer containing slice of values of the same key
534 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
535 0 : dup_start_lsn..dup_end_lsn
536 : } else {
537 242 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
538 242 : lsn_range.clone()
539 : },
540 : )
541 121 : .await?,
542 : );
543 3121756 : }
544 :
545 3121998 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
546 : } else {
547 0 : debug!(
548 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
549 0 : key,
550 0 : self.shard_identity.get_shard_number(&key)
551 0 : );
552 : }
553 :
554 3121998 : if !new_layers.is_empty() {
555 19786 : fail_point!("after-timeline-compacted-first-L1");
556 3102212 : }
557 :
558 3121998 : prev_key = Some(key);
559 : }
560 42 : if let Some(writer) = writer {
561 3009 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
562 0 : }
563 :
564 : // Sync layers
565 42 : if !new_layers.is_empty() {
566 : // Print a warning if the created layer is larger than double the target size
567 : // Add two pages for potential overhead. This should in theory be already
568 : // accounted for in the target calculation, but for very small targets,
569 : // we still might easily hit the limit otherwise.
570 42 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
571 242 : for layer in new_layers.iter() {
572 242 : if layer.layer_desc().file_size > warn_limit {
573 0 : warn!(
574 0 : %layer,
575 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
576 0 : );
577 242 : }
578 : }
579 :
580 : // The writer.finish() above already did the fsync of the inodes.
581 : // We just need to fsync the directory in which these inodes are linked,
582 : // which we know to be the timeline directory.
583 : //
584 : // We use fatal_err() below because the after writer.finish() returns with success,
585 : // the in-memory state of the filesystem already has the layer file in its final place,
586 : // and subsequent pageserver code could think it's durable while it really isn't.
587 42 : let timeline_dir = VirtualFile::open(
588 42 : &self
589 42 : .conf
590 42 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
591 42 : )
592 21 : .await
593 42 : .fatal_err("VirtualFile::open for timeline dir fsync");
594 42 : timeline_dir
595 42 : .sync_all()
596 21 : .await
597 42 : .fatal_err("VirtualFile::sync_all timeline dir");
598 0 : }
599 :
600 42 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
601 42 : stats.new_deltas_count = Some(new_layers.len());
602 242 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
603 42 :
604 42 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
605 42 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
606 : {
607 42 : Ok(stats_json) => {
608 42 : info!(
609 42 : stats_json = stats_json.as_str(),
610 42 : "compact_level0_phase1 stats available"
611 42 : )
612 : }
613 0 : Err(e) => {
614 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
615 : }
616 : }
617 :
618 42 : Ok(CompactLevel0Phase1Result {
619 42 : new_layers,
620 42 : deltas_to_compact: deltas_to_compact
621 42 : .into_iter()
622 442 : .map(|x| x.drop_eviction_guard())
623 42 : .collect::<Vec<_>>(),
624 42 : })
625 510 : }
626 : }
627 :
628 : #[derive(Default)]
629 : struct CompactLevel0Phase1Result {
630 : new_layers: Vec<ResidentLayer>,
631 : deltas_to_compact: Vec<Layer>,
632 : }
633 :
634 : #[derive(Default)]
635 : struct CompactLevel0Phase1StatsBuilder {
636 : version: Option<u64>,
637 : tenant_id: Option<TenantShardId>,
638 : timeline_id: Option<TimelineId>,
639 : read_lock_acquisition_micros: DurationRecorder,
640 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
641 : read_lock_held_key_sort_micros: DurationRecorder,
642 : read_lock_held_prerequisites_micros: DurationRecorder,
643 : read_lock_held_compute_holes_micros: DurationRecorder,
644 : read_lock_drop_micros: DurationRecorder,
645 : write_layer_files_micros: DurationRecorder,
646 : level0_deltas_count: Option<usize>,
647 : new_deltas_count: Option<usize>,
648 : new_deltas_size: Option<u64>,
649 : }
650 :
651 : #[derive(serde::Serialize)]
652 : struct CompactLevel0Phase1Stats {
653 : version: u64,
654 : tenant_id: TenantShardId,
655 : timeline_id: TimelineId,
656 : read_lock_acquisition_micros: RecordedDuration,
657 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
658 : read_lock_held_key_sort_micros: RecordedDuration,
659 : read_lock_held_prerequisites_micros: RecordedDuration,
660 : read_lock_held_compute_holes_micros: RecordedDuration,
661 : read_lock_drop_micros: RecordedDuration,
662 : write_layer_files_micros: RecordedDuration,
663 : level0_deltas_count: usize,
664 : new_deltas_count: usize,
665 : new_deltas_size: u64,
666 : }
667 :
668 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
669 : type Error = anyhow::Error;
670 :
671 42 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
672 42 : Ok(Self {
673 42 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
674 42 : tenant_id: value
675 42 : .tenant_id
676 42 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
677 42 : timeline_id: value
678 42 : .timeline_id
679 42 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
680 42 : read_lock_acquisition_micros: value
681 42 : .read_lock_acquisition_micros
682 42 : .into_recorded()
683 42 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
684 42 : read_lock_held_spawn_blocking_startup_micros: value
685 42 : .read_lock_held_spawn_blocking_startup_micros
686 42 : .into_recorded()
687 42 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
688 42 : read_lock_held_key_sort_micros: value
689 42 : .read_lock_held_key_sort_micros
690 42 : .into_recorded()
691 42 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
692 42 : read_lock_held_prerequisites_micros: value
693 42 : .read_lock_held_prerequisites_micros
694 42 : .into_recorded()
695 42 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
696 42 : read_lock_held_compute_holes_micros: value
697 42 : .read_lock_held_compute_holes_micros
698 42 : .into_recorded()
699 42 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
700 42 : read_lock_drop_micros: value
701 42 : .read_lock_drop_micros
702 42 : .into_recorded()
703 42 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
704 42 : write_layer_files_micros: value
705 42 : .write_layer_files_micros
706 42 : .into_recorded()
707 42 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
708 42 : level0_deltas_count: value
709 42 : .level0_deltas_count
710 42 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
711 42 : new_deltas_count: value
712 42 : .new_deltas_count
713 42 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
714 42 : new_deltas_size: value
715 42 : .new_deltas_size
716 42 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
717 : })
718 42 : }
719 : }
720 :
721 : impl Timeline {
722 : /// Entry point for new tiered compaction algorithm.
723 : ///
724 : /// All the real work is in the implementation in the pageserver_compaction
725 : /// crate. The code here would apply to any algorithm implemented by the
726 : /// same interface, but tiered is the only one at the moment.
727 : ///
728 : /// TODO: cancellation
729 0 : pub(crate) async fn compact_tiered(
730 0 : self: &Arc<Self>,
731 0 : _cancel: &CancellationToken,
732 0 : ctx: &RequestContext,
733 0 : ) -> Result<(), CompactionError> {
734 0 : let fanout = self.get_compaction_threshold() as u64;
735 0 : let target_file_size = self.get_checkpoint_distance();
736 :
737 : // Find the top of the historical layers
738 0 : let end_lsn = {
739 0 : let guard = self.layers.read().await;
740 0 : let layers = guard.layer_map();
741 :
742 0 : let l0_deltas = layers.get_level0_deltas()?;
743 0 : drop(guard);
744 0 :
745 0 : // As an optimization, if we find that there are too few L0 layers,
746 0 : // bail out early. We know that the compaction algorithm would do
747 0 : // nothing in that case.
748 0 : if l0_deltas.len() < fanout as usize {
749 : // doesn't need compacting
750 0 : return Ok(());
751 0 : }
752 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
753 0 : };
754 0 :
755 0 : // Is the timeline being deleted?
756 0 : if self.is_stopping() {
757 0 : trace!("Dropping out of compaction on timeline shutdown");
758 0 : return Err(CompactionError::ShuttingDown);
759 0 : }
760 :
761 0 : let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
762 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
763 0 :
764 0 : pageserver_compaction::compact_tiered::compact_tiered(
765 0 : &mut adaptor,
766 0 : end_lsn,
767 0 : target_file_size,
768 0 : fanout,
769 0 : ctx,
770 0 : )
771 0 : .await?;
772 :
773 0 : adaptor.flush_updates().await?;
774 0 : Ok(())
775 0 : }
776 : }
777 :
778 : struct TimelineAdaptor {
779 : timeline: Arc<Timeline>,
780 :
781 : keyspace: (Lsn, KeySpace),
782 :
783 : new_deltas: Vec<ResidentLayer>,
784 : new_images: Vec<ResidentLayer>,
785 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
786 : }
787 :
788 : impl TimelineAdaptor {
789 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
790 0 : Self {
791 0 : timeline: timeline.clone(),
792 0 : keyspace,
793 0 : new_images: Vec::new(),
794 0 : new_deltas: Vec::new(),
795 0 : layers_to_delete: Vec::new(),
796 0 : }
797 0 : }
798 :
799 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
800 0 : let layers_to_delete = {
801 0 : let guard = self.timeline.layers.read().await;
802 0 : self.layers_to_delete
803 0 : .iter()
804 0 : .map(|x| guard.get_from_desc(x))
805 0 : .collect::<Vec<Layer>>()
806 0 : };
807 0 : self.timeline
808 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
809 0 : .await?;
810 :
811 0 : self.timeline
812 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
813 :
814 0 : self.new_deltas.clear();
815 0 : self.layers_to_delete.clear();
816 0 : Ok(())
817 0 : }
818 : }
819 :
820 : #[derive(Clone)]
821 : struct ResidentDeltaLayer(ResidentLayer);
822 : #[derive(Clone)]
823 : struct ResidentImageLayer(ResidentLayer);
824 :
825 : impl CompactionJobExecutor for TimelineAdaptor {
826 : type Key = crate::repository::Key;
827 :
828 : type Layer = OwnArc<PersistentLayerDesc>;
829 : type DeltaLayer = ResidentDeltaLayer;
830 : type ImageLayer = ResidentImageLayer;
831 :
832 : type RequestContext = crate::context::RequestContext;
833 :
834 0 : async fn get_layers(
835 0 : &mut self,
836 0 : key_range: &Range<Key>,
837 0 : lsn_range: &Range<Lsn>,
838 0 : _ctx: &RequestContext,
839 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
840 0 : self.flush_updates().await?;
841 :
842 0 : let guard = self.timeline.layers.read().await;
843 0 : let layer_map = guard.layer_map();
844 0 :
845 0 : let result = layer_map
846 0 : .iter_historic_layers()
847 0 : .filter(|l| {
848 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
849 0 : })
850 0 : .map(OwnArc)
851 0 : .collect();
852 0 : Ok(result)
853 0 : }
854 :
855 0 : async fn get_keyspace(
856 0 : &mut self,
857 0 : key_range: &Range<Key>,
858 0 : lsn: Lsn,
859 0 : _ctx: &RequestContext,
860 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
861 0 : if lsn == self.keyspace.0 {
862 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
863 0 : &self.keyspace.1.ranges,
864 0 : key_range,
865 0 : ))
866 : } else {
867 : // The current compaction implementatin only ever requests the key space
868 : // at the compaction end LSN.
869 0 : anyhow::bail!("keyspace not available for requested lsn");
870 : }
871 0 : }
872 :
873 0 : async fn downcast_delta_layer(
874 0 : &self,
875 0 : layer: &OwnArc<PersistentLayerDesc>,
876 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
877 0 : // this is a lot more complex than a simple downcast...
878 0 : if layer.is_delta() {
879 0 : let l = {
880 0 : let guard = self.timeline.layers.read().await;
881 0 : guard.get_from_desc(layer)
882 : };
883 0 : let result = l.download_and_keep_resident().await?;
884 :
885 0 : Ok(Some(ResidentDeltaLayer(result)))
886 : } else {
887 0 : Ok(None)
888 : }
889 0 : }
890 :
891 0 : async fn create_image(
892 0 : &mut self,
893 0 : lsn: Lsn,
894 0 : key_range: &Range<Key>,
895 0 : ctx: &RequestContext,
896 0 : ) -> anyhow::Result<()> {
897 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
898 0 : }
899 :
900 0 : async fn create_delta(
901 0 : &mut self,
902 0 : lsn_range: &Range<Lsn>,
903 0 : key_range: &Range<Key>,
904 0 : input_layers: &[ResidentDeltaLayer],
905 0 : ctx: &RequestContext,
906 0 : ) -> anyhow::Result<()> {
907 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
908 :
909 0 : let mut all_entries = Vec::new();
910 0 : for dl in input_layers.iter() {
911 0 : all_entries.extend(dl.load_keys(ctx).await?);
912 : }
913 :
914 : // The current stdlib sorting implementation is designed in a way where it is
915 : // particularly fast where the slice is made up of sorted sub-ranges.
916 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
917 :
918 0 : let mut writer = DeltaLayerWriter::new(
919 0 : self.timeline.conf,
920 0 : self.timeline.timeline_id,
921 0 : self.timeline.tenant_shard_id,
922 0 : key_range.start,
923 0 : lsn_range.clone(),
924 0 : )
925 0 : .await?;
926 :
927 0 : let mut dup_values = 0;
928 0 :
929 0 : // This iterator walks through all key-value pairs from all the layers
930 0 : // we're compacting, in key, LSN order.
931 0 : let mut prev: Option<(Key, Lsn)> = None;
932 : for &DeltaEntry {
933 0 : key, lsn, ref val, ..
934 0 : } in all_entries.iter()
935 : {
936 0 : if prev == Some((key, lsn)) {
937 : // This is a duplicate. Skip it.
938 : //
939 : // It can happen if compaction is interrupted after writing some
940 : // layers but not all, and we are compacting the range again.
941 : // The calculations in the algorithm assume that there are no
942 : // duplicates, so the math on targeted file size is likely off,
943 : // and we will create smaller files than expected.
944 0 : dup_values += 1;
945 0 : continue;
946 0 : }
947 :
948 0 : let value = val.load(ctx).await?;
949 :
950 0 : writer.put_value(key, lsn, value).await?;
951 :
952 0 : prev = Some((key, lsn));
953 : }
954 :
955 0 : if dup_values > 0 {
956 0 : warn!("delta layer created with {} duplicate values", dup_values);
957 0 : }
958 :
959 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
960 0 : Err(anyhow::anyhow!(
961 0 : "failpoint delta-layer-writer-fail-before-finish"
962 0 : ))
963 0 : });
964 :
965 0 : let new_delta_layer = writer
966 0 : .finish(prev.unwrap().0.next(), &self.timeline)
967 0 : .await?;
968 :
969 0 : self.new_deltas.push(new_delta_layer);
970 0 : Ok(())
971 0 : }
972 :
973 0 : async fn delete_layer(
974 0 : &mut self,
975 0 : layer: &OwnArc<PersistentLayerDesc>,
976 0 : _ctx: &RequestContext,
977 0 : ) -> anyhow::Result<()> {
978 0 : self.layers_to_delete.push(layer.clone().0);
979 0 : Ok(())
980 0 : }
981 : }
982 :
983 : impl TimelineAdaptor {
984 0 : async fn create_image_impl(
985 0 : &mut self,
986 0 : lsn: Lsn,
987 0 : key_range: &Range<Key>,
988 0 : ctx: &RequestContext,
989 0 : ) -> Result<(), PageReconstructError> {
990 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
991 :
992 0 : let mut image_layer_writer = ImageLayerWriter::new(
993 0 : self.timeline.conf,
994 0 : self.timeline.timeline_id,
995 0 : self.timeline.tenant_shard_id,
996 0 : key_range,
997 0 : lsn,
998 0 : )
999 0 : .await?;
1000 :
1001 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1002 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1003 0 : "failpoint image-layer-writer-fail-before-finish"
1004 0 : )))
1005 0 : });
1006 0 : let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
1007 0 : for range in &keyspace_ranges {
1008 0 : let mut key = range.start;
1009 0 : while key < range.end {
1010 0 : let img = match self.timeline.get(key, lsn, ctx).await {
1011 0 : Ok(img) => img,
1012 0 : Err(err) => {
1013 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
1014 0 : // page without losing any actual user data. That seems better
1015 0 : // than failing repeatedly and getting stuck.
1016 0 : //
1017 0 : // We had a bug at one point, where we truncated the FSM and VM
1018 0 : // in the pageserver, but the Postgres didn't know about that
1019 0 : // and continued to generate incremental WAL records for pages
1020 0 : // that didn't exist in the pageserver. Trying to replay those
1021 0 : // WAL records failed to find the previous image of the page.
1022 0 : // This special case allows us to recover from that situation.
1023 0 : // See https://github.com/neondatabase/neon/issues/2601.
1024 0 : //
1025 0 : // Unfortunately we cannot do this for the main fork, or for
1026 0 : // any metadata keys, keys, as that would lead to actual data
1027 0 : // loss.
1028 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
1029 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
1030 0 : ZERO_PAGE.clone()
1031 : } else {
1032 0 : return Err(err);
1033 : }
1034 : }
1035 : };
1036 0 : image_layer_writer.put_image(key, img).await?;
1037 0 : key = key.next();
1038 : }
1039 : }
1040 0 : let image_layer = image_layer_writer.finish(&self.timeline).await?;
1041 :
1042 0 : self.new_images.push(image_layer);
1043 0 :
1044 0 : timer.stop_and_record();
1045 0 :
1046 0 : Ok(())
1047 0 : }
1048 : }
1049 :
1050 : impl CompactionRequestContext for crate::context::RequestContext {}
1051 :
1052 : #[derive(Debug, Clone)]
1053 : pub struct OwnArc<T>(pub Arc<T>);
1054 :
1055 : impl<T> Deref for OwnArc<T> {
1056 : type Target = <Arc<T> as Deref>::Target;
1057 0 : fn deref(&self) -> &Self::Target {
1058 0 : &self.0
1059 0 : }
1060 : }
1061 :
1062 : impl<T> AsRef<T> for OwnArc<T> {
1063 0 : fn as_ref(&self) -> &T {
1064 0 : self.0.as_ref()
1065 0 : }
1066 : }
1067 :
1068 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1069 0 : fn key_range(&self) -> &Range<Key> {
1070 0 : &self.key_range
1071 0 : }
1072 0 : fn lsn_range(&self) -> &Range<Lsn> {
1073 0 : &self.lsn_range
1074 0 : }
1075 0 : fn file_size(&self) -> u64 {
1076 0 : self.file_size
1077 0 : }
1078 0 : fn short_id(&self) -> std::string::String {
1079 0 : self.as_ref().short_id().to_string()
1080 0 : }
1081 0 : fn is_delta(&self) -> bool {
1082 0 : self.as_ref().is_delta()
1083 0 : }
1084 : }
1085 :
1086 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1087 0 : fn key_range(&self) -> &Range<Key> {
1088 0 : &self.layer_desc().key_range
1089 0 : }
1090 0 : fn lsn_range(&self) -> &Range<Lsn> {
1091 0 : &self.layer_desc().lsn_range
1092 0 : }
1093 0 : fn file_size(&self) -> u64 {
1094 0 : self.layer_desc().file_size
1095 0 : }
1096 0 : fn short_id(&self) -> std::string::String {
1097 0 : self.layer_desc().short_id().to_string()
1098 0 : }
1099 0 : fn is_delta(&self) -> bool {
1100 0 : true
1101 0 : }
1102 : }
1103 :
1104 : use crate::tenant::timeline::DeltaEntry;
1105 :
1106 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1107 0 : fn key_range(&self) -> &Range<Key> {
1108 0 : &self.0.layer_desc().key_range
1109 0 : }
1110 0 : fn lsn_range(&self) -> &Range<Lsn> {
1111 0 : &self.0.layer_desc().lsn_range
1112 0 : }
1113 0 : fn file_size(&self) -> u64 {
1114 0 : self.0.layer_desc().file_size
1115 0 : }
1116 0 : fn short_id(&self) -> std::string::String {
1117 0 : self.0.layer_desc().short_id().to_string()
1118 0 : }
1119 0 : fn is_delta(&self) -> bool {
1120 0 : true
1121 0 : }
1122 : }
1123 :
1124 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1125 : type DeltaEntry<'a> = DeltaEntry<'a>;
1126 :
1127 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1128 0 : self.0.load_keys(ctx).await
1129 0 : }
1130 : }
1131 :
1132 : impl CompactionLayer<Key> for ResidentImageLayer {
1133 0 : fn key_range(&self) -> &Range<Key> {
1134 0 : &self.0.layer_desc().key_range
1135 0 : }
1136 0 : fn lsn_range(&self) -> &Range<Lsn> {
1137 0 : &self.0.layer_desc().lsn_range
1138 0 : }
1139 0 : fn file_size(&self) -> u64 {
1140 0 : self.0.layer_desc().file_size
1141 0 : }
1142 0 : fn short_id(&self) -> std::string::String {
1143 0 : self.0.layer_desc().short_id().to_string()
1144 0 : }
1145 0 : fn is_delta(&self) -> bool {
1146 0 : false
1147 0 : }
1148 : }
1149 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|