Line data Source code
1 : //! New compaction implementation. The algorithm itself is implemented in the
2 : //! compaction crate. This file implements the callbacks and structs that allow
3 : //! the algorithm to drive the process.
4 : //!
5 : //! The old legacy algorithm is implemented directly in `timeline.rs`.
6 :
7 : use std::collections::BinaryHeap;
8 : use std::ops::{Deref, Range};
9 : use std::sync::Arc;
10 :
11 : use super::layer_manager::LayerManager;
12 : use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};
13 :
14 : use anyhow::{anyhow, Context};
15 : use async_trait::async_trait;
16 : use enumset::EnumSet;
17 : use fail::fail_point;
18 : use itertools::Itertools;
19 : use pageserver_api::shard::TenantShardId;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, info_span, trace, warn, Instrument};
22 : use utils::id::TimelineId;
23 :
24 : use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
25 : use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
26 : use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
27 : use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
28 : use crate::tenant::timeline::{Layer, ResidentLayer};
29 : use crate::tenant::DeltaLayer;
30 : use crate::tenant::PageReconstructError;
31 : use crate::virtual_file::{MaybeFatalIo, VirtualFile};
32 : use crate::{page_cache, ZERO_PAGE};
33 :
34 : use crate::keyspace::KeySpace;
35 : use crate::repository::Key;
36 :
37 : use utils::lsn::Lsn;
38 :
39 : use pageserver_compaction::helpers::overlaps_with;
40 : use pageserver_compaction::interface::*;
41 :
42 : use super::CompactionError;
43 :
44 : impl Timeline {
45 : /// TODO: cancellation
46 510 : pub(crate) async fn compact_legacy(
47 510 : self: &Arc<Self>,
48 510 : _cancel: &CancellationToken,
49 510 : flags: EnumSet<CompactFlags>,
50 510 : ctx: &RequestContext,
51 510 : ) -> Result<(), CompactionError> {
52 510 : // High level strategy for compaction / image creation:
53 510 : //
54 510 : // 1. First, calculate the desired "partitioning" of the
55 510 : // currently in-use key space. The goal is to partition the
56 510 : // key space into roughly fixed-size chunks, but also take into
57 510 : // account any existing image layers, and try to align the
58 510 : // chunk boundaries with the existing image layers to avoid
59 510 : // too much churn. Also try to align chunk boundaries with
60 510 : // relation boundaries. In principle, we don't know about
61 510 : // relation boundaries here, we just deal with key-value
62 510 : // pairs, and the code in pgdatadir_mapping.rs knows how to
63 510 : // map relations into key-value pairs. But in practice we know
64 510 : // that 'field6' is the block number, and the fields 1-5
65 510 : // identify a relation. This is just an optimization,
66 510 : // though.
67 510 : //
68 510 : // 2. Once we know the partitioning, for each partition,
69 510 : // decide if it's time to create a new image layer. The
70 510 : // criteria is: there has been too much "churn" since the last
71 510 : // image layer? The "churn" is fuzzy concept, it's a
72 510 : // combination of too many delta files, or too much WAL in
73 510 : // total in the delta file. Or perhaps: if creating an image
74 510 : // file would allow to delete some older files.
75 510 : //
76 510 : // 3. After that, we compact all level0 delta files if there
77 510 : // are too many of them. While compacting, we also garbage
78 510 : // collect any page versions that are no longer needed because
79 510 : // of the new image layers we created in step 2.
80 510 : //
81 510 : // TODO: This high level strategy hasn't been implemented yet.
82 510 : // Below are functions compact_level0() and create_image_layers()
83 510 : // but they are a bit ad hoc and don't quite work like it's explained
84 510 : // above. Rewrite it.
85 510 :
86 510 : // Is the timeline being deleted?
87 510 : if self.is_stopping() {
88 0 : trace!("Dropping out of compaction on timeline shutdown");
89 0 : return Err(CompactionError::ShuttingDown);
90 510 : }
91 510 :
92 510 : let target_file_size = self.get_checkpoint_distance();
93 510 :
94 510 : // Define partitioning schema if needed
95 510 :
96 510 : // FIXME: the match should only cover repartitioning, not the next steps
97 510 : match self
98 510 : .repartition(
99 510 : self.get_last_record_lsn(),
100 510 : self.get_compaction_target_size(),
101 510 : flags,
102 510 : ctx,
103 510 : )
104 13330 : .await
105 : {
106 510 : Ok((partitioning, lsn)) => {
107 510 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
108 510 : let image_ctx = RequestContextBuilder::extend(ctx)
109 510 : .access_stats_behavior(AccessStatsBehavior::Skip)
110 510 : .build();
111 510 :
112 510 : // 2. Compact
113 510 : let timer = self.metrics.compact_time_histo.start_timer();
114 76376 : self.compact_level0(target_file_size, ctx).await?;
115 510 : timer.stop_and_record();
116 :
117 : // 3. Create new image layers for partitions that have been modified
118 : // "enough".
119 510 : let layers = self
120 510 : .create_image_layers(
121 510 : &partitioning,
122 510 : lsn,
123 510 : flags.contains(CompactFlags::ForceImageLayerCreation),
124 510 : &image_ctx,
125 510 : )
126 0 : .await
127 510 : .map_err(anyhow::Error::from)?;
128 :
129 510 : self.upload_new_image_layers(layers)?;
130 : }
131 0 : Err(err) => {
132 0 : // no partitioning? This is normal, if the timeline was just created
133 0 : // as an empty timeline. Also in unit tests, when we use the timeline
134 0 : // as a simple key-value store, ignoring the datadir layout. Log the
135 0 : // error but continue.
136 0 : //
137 0 : // Suppress error when it's due to cancellation
138 0 : if !self.cancel.is_cancelled() {
139 0 : tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
140 0 : }
141 : }
142 : };
143 :
144 510 : Ok(())
145 510 : }
146 :
147 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
148 : /// as Level 1 files.
149 510 : async fn compact_level0(
150 510 : self: &Arc<Self>,
151 510 : target_file_size: u64,
152 510 : ctx: &RequestContext,
153 510 : ) -> Result<(), CompactionError> {
154 : let CompactLevel0Phase1Result {
155 510 : new_layers,
156 510 : deltas_to_compact,
157 : } = {
158 510 : let phase1_span = info_span!("compact_level0_phase1");
159 510 : let ctx = ctx.attached_child();
160 510 : let mut stats = CompactLevel0Phase1StatsBuilder {
161 510 : version: Some(2),
162 510 : tenant_id: Some(self.tenant_shard_id),
163 510 : timeline_id: Some(self.timeline_id),
164 510 : ..Default::default()
165 510 : };
166 510 :
167 510 : let begin = tokio::time::Instant::now();
168 510 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
169 510 : let now = tokio::time::Instant::now();
170 510 : stats.read_lock_acquisition_micros =
171 510 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
172 510 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
173 510 : .instrument(phase1_span)
174 76376 : .await?
175 : };
176 :
177 510 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
178 : // nothing to do
179 468 : return Ok(());
180 42 : }
181 42 :
182 42 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
183 0 : .await?;
184 42 : Ok(())
185 510 : }
186 :
187 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
188 510 : async fn compact_level0_phase1(
189 510 : self: &Arc<Self>,
190 510 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
191 510 : mut stats: CompactLevel0Phase1StatsBuilder,
192 510 : target_file_size: u64,
193 510 : ctx: &RequestContext,
194 510 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
195 510 : stats.read_lock_held_spawn_blocking_startup_micros =
196 510 : stats.read_lock_acquisition_micros.till_now(); // set by caller
197 510 : let layers = guard.layer_map();
198 510 : let level0_deltas = layers.get_level0_deltas()?;
199 510 : let mut level0_deltas = level0_deltas
200 510 : .into_iter()
201 2362 : .map(|x| guard.get_from_desc(&x))
202 510 : .collect_vec();
203 510 : stats.level0_deltas_count = Some(level0_deltas.len());
204 510 : // Only compact if enough layers have accumulated.
205 510 : let threshold = self.get_compaction_threshold();
206 510 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
207 468 : debug!(
208 0 : level0_deltas = level0_deltas.len(),
209 0 : threshold, "too few deltas to compact"
210 0 : );
211 468 : return Ok(CompactLevel0Phase1Result::default());
212 42 : }
213 42 :
214 42 : // This failpoint is used together with `test_duplicate_layers` integration test.
215 42 : // It returns the compaction result exactly the same layers as input to compaction.
216 42 : // We want to ensure that this will not cause any problem when updating the layer map
217 42 : // after the compaction is finished.
218 42 : //
219 42 : // Currently, there are two rare edge cases that will cause duplicated layers being
220 42 : // inserted.
221 42 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
222 42 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
223 42 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
224 42 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
225 42 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
226 42 : // layer replace instead of the normal remove / upload process.
227 42 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
228 42 : // size length. Compaction will likely create the same set of n files afterwards.
229 42 : //
230 42 : // This failpoint is a superset of both of the cases.
231 42 : if cfg!(feature = "testing") {
232 42 : let active = (|| {
233 42 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
234 42 : false
235 42 : })();
236 42 :
237 42 : if active {
238 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
239 0 : for delta in &level0_deltas {
240 : // we are just faking these layers as being produced again for this failpoint
241 0 : new_layers.push(
242 0 : delta
243 0 : .download_and_keep_resident()
244 0 : .await
245 0 : .context("download layer for failpoint")?,
246 : );
247 : }
248 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
249 0 : return Ok(CompactLevel0Phase1Result {
250 0 : new_layers,
251 0 : deltas_to_compact: level0_deltas,
252 0 : });
253 42 : }
254 0 : }
255 :
256 : // Gather the files to compact in this iteration.
257 : //
258 : // Start with the oldest Level 0 delta file, and collect any other
259 : // level 0 files that form a contiguous sequence, such that the end
260 : // LSN of previous file matches the start LSN of the next file.
261 : //
262 : // Note that if the files don't form such a sequence, we might
263 : // "compact" just a single file. That's a bit pointless, but it allows
264 : // us to get rid of the level 0 file, and compact the other files on
265 : // the next iteration. This could probably made smarter, but such
266 : // "gaps" in the sequence of level 0 files should only happen in case
267 : // of a crash, partial download from cloud storage, or something like
268 : // that, so it's not a big deal in practice.
269 800 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
270 42 : let mut level0_deltas_iter = level0_deltas.iter();
271 42 :
272 42 : let first_level0_delta = level0_deltas_iter.next().unwrap();
273 42 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
274 42 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
275 42 :
276 42 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
277 442 : for l in level0_deltas_iter {
278 400 : let lsn_range = &l.layer_desc().lsn_range;
279 400 :
280 400 : if lsn_range.start != prev_lsn_end {
281 0 : break;
282 400 : }
283 400 : deltas_to_compact.push(l.download_and_keep_resident().await?);
284 400 : prev_lsn_end = lsn_range.end;
285 : }
286 42 : let lsn_range = Range {
287 42 : start: deltas_to_compact
288 42 : .first()
289 42 : .unwrap()
290 42 : .layer_desc()
291 42 : .lsn_range
292 42 : .start,
293 42 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
294 42 : };
295 42 :
296 42 : info!(
297 42 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
298 42 : lsn_range.start,
299 42 : lsn_range.end,
300 42 : deltas_to_compact.len(),
301 42 : level0_deltas.len()
302 42 : );
303 :
304 442 : for l in deltas_to_compact.iter() {
305 442 : info!("compact includes {l}");
306 : }
307 :
308 : // We don't need the original list of layers anymore. Drop it so that
309 : // we don't accidentally use it later in the function.
310 42 : drop(level0_deltas);
311 42 :
312 42 : stats.read_lock_held_prerequisites_micros = stats
313 42 : .read_lock_held_spawn_blocking_startup_micros
314 42 : .till_now();
315 42 :
316 42 : // Determine N largest holes where N is number of compacted layers.
317 42 : let max_holes = deltas_to_compact.len();
318 42 : let last_record_lsn = self.get_last_record_lsn();
319 42 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
320 42 : let min_hole_coverage_size = 3; // TODO: something more flexible?
321 42 :
322 42 : // min-heap (reserve space for one more element added before eviction)
323 42 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
324 42 : let mut prev: Option<Key> = None;
325 42 :
326 42 : let mut all_keys = Vec::new();
327 :
328 442 : for l in deltas_to_compact.iter() {
329 3554 : all_keys.extend(l.load_keys(ctx).await?);
330 : }
331 :
332 : // FIXME: should spawn_blocking the rest of this function
333 :
334 : // The current stdlib sorting implementation is designed in a way where it is
335 : // particularly fast where the slice is made up of sorted sub-ranges.
336 6971288 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
337 42 :
338 42 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
339 :
340 3121998 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
341 3121998 : if let Some(prev_key) = prev {
342 : // just first fast filter
343 3121956 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
344 0 : let key_range = prev_key..next_key;
345 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
346 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
347 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
348 0 : // That is why it is better to measure size of hole as number of covering image layers.
349 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
350 0 : if coverage_size >= min_hole_coverage_size {
351 0 : heap.push(Hole {
352 0 : key_range,
353 0 : coverage_size,
354 0 : });
355 0 : if heap.len() > max_holes {
356 0 : heap.pop(); // remove smallest hole
357 0 : }
358 0 : }
359 3121956 : }
360 42 : }
361 3121998 : prev = Some(next_key.next());
362 : }
363 42 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
364 42 : drop_rlock(guard);
365 42 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
366 42 : let mut holes = heap.into_vec();
367 42 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
368 42 : let mut next_hole = 0; // index of next hole in holes vector
369 42 :
370 42 : // This iterator walks through all key-value pairs from all the layers
371 42 : // we're compacting, in key, LSN order.
372 42 : let all_values_iter = all_keys.iter();
373 42 :
374 42 : // This iterator walks through all keys and is needed to calculate size used by each key
375 42 : let mut all_keys_iter = all_keys
376 42 : .iter()
377 3121998 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
378 3121956 : .coalesce(|mut prev, cur| {
379 3121956 : // Coalesce keys that belong to the same key pair.
380 3121956 : // This ensures that compaction doesn't put them
381 3121956 : // into different layer files.
382 3121956 : // Still limit this by the target file size,
383 3121956 : // so that we keep the size of the files in
384 3121956 : // check.
385 3121956 : if prev.0 == cur.0 && prev.2 < target_file_size {
386 92000 : prev.2 += cur.2;
387 92000 : Ok(prev)
388 : } else {
389 3029956 : Err((prev, cur))
390 : }
391 3121956 : });
392 42 :
393 42 : // Merge the contents of all the input delta layers into a new set
394 42 : // of delta layers, based on the current partitioning.
395 42 : //
396 42 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
397 42 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
398 42 : // would be too large. In that case, we also split on the LSN dimension.
399 42 : //
400 42 : // LSN
401 42 : // ^
402 42 : // |
403 42 : // | +-----------+ +--+--+--+--+
404 42 : // | | | | | | | |
405 42 : // | +-----------+ | | | | |
406 42 : // | | | | | | | |
407 42 : // | +-----------+ ==> | | | | |
408 42 : // | | | | | | | |
409 42 : // | +-----------+ | | | | |
410 42 : // | | | | | | | |
411 42 : // | +-----------+ +--+--+--+--+
412 42 : // |
413 42 : // +--------------> key
414 42 : //
415 42 : //
416 42 : // If one key (X) has a lot of page versions:
417 42 : //
418 42 : // LSN
419 42 : // ^
420 42 : // | (X)
421 42 : // | +-----------+ +--+--+--+--+
422 42 : // | | | | | | | |
423 42 : // | +-----------+ | | +--+ |
424 42 : // | | | | | | | |
425 42 : // | +-----------+ ==> | | | | |
426 42 : // | | | | | +--+ |
427 42 : // | +-----------+ | | | | |
428 42 : // | | | | | | | |
429 42 : // | +-----------+ +--+--+--+--+
430 42 : // |
431 42 : // +--------------> key
432 42 : // TODO: this actually divides the layers into fixed-size chunks, not
433 42 : // based on the partitioning.
434 42 : //
435 42 : // TODO: we should also opportunistically materialize and
436 42 : // garbage collect what we can.
437 42 : let mut new_layers = Vec::new();
438 42 : let mut prev_key: Option<Key> = None;
439 42 : let mut writer: Option<DeltaLayerWriter> = None;
440 42 : let mut key_values_total_size = 0u64;
441 42 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
442 42 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
443 :
444 : for &DeltaEntry {
445 3121998 : key, lsn, ref val, ..
446 3122040 : } in all_values_iter
447 : {
448 3121998 : let value = val.load(ctx).await?;
449 3121998 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
450 3121998 : // We need to check key boundaries once we reach next key or end of layer with the same key
451 3121998 : if !same_key || lsn == dup_end_lsn {
452 3029998 : let mut next_key_size = 0u64;
453 3029998 : let is_dup_layer = dup_end_lsn.is_valid();
454 3029998 : dup_start_lsn = Lsn::INVALID;
455 3029998 : if !same_key {
456 3029998 : dup_end_lsn = Lsn::INVALID;
457 3029998 : }
458 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
459 3029998 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
460 3029998 : next_key_size = next_size;
461 3029998 : if key != next_key {
462 3029956 : if dup_end_lsn.is_valid() {
463 0 : // We are writting segment with duplicates:
464 0 : // place all remaining values of this key in separate segment
465 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
466 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
467 3029956 : }
468 3029956 : break;
469 42 : }
470 42 : key_values_total_size += next_size;
471 42 : // Check if it is time to split segment: if total keys size is larger than target file size.
472 42 : // We need to avoid generation of empty segments if next_size > target_file_size.
473 42 : if key_values_total_size > target_file_size && lsn != next_lsn {
474 : // Split key between multiple layers: such layer can contain only single key
475 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
476 0 : dup_end_lsn // new segment with duplicates starts where old one stops
477 : } else {
478 0 : lsn // start with the first LSN for this key
479 : };
480 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
481 0 : break;
482 42 : }
483 : }
484 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
485 3029998 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
486 0 : dup_start_lsn = dup_end_lsn;
487 0 : dup_end_lsn = lsn_range.end;
488 3029998 : }
489 3029998 : if writer.is_some() {
490 3029956 : let written_size = writer.as_mut().unwrap().size();
491 3029956 : let contains_hole =
492 3029956 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
493 : // check if key cause layer overflow or contains hole...
494 3029956 : if is_dup_layer
495 3029956 : || dup_end_lsn.is_valid()
496 3029956 : || written_size + key_values_total_size > target_file_size
497 3029756 : || contains_hole
498 : {
499 : // ... if so, flush previous layer and prepare to write new one
500 200 : new_layers.push(
501 200 : writer
502 200 : .take()
503 200 : .unwrap()
504 200 : .finish(prev_key.unwrap().next(), self)
505 508 : .await?,
506 : );
507 200 : writer = None;
508 200 :
509 200 : if contains_hole {
510 0 : // skip hole
511 0 : next_hole += 1;
512 200 : }
513 3029756 : }
514 42 : }
515 : // Remember size of key value because at next iteration we will access next item
516 3029998 : key_values_total_size = next_key_size;
517 92000 : }
518 3121998 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
519 0 : Err(CompactionError::Other(anyhow::anyhow!(
520 0 : "failpoint delta-layer-writer-fail-before-finish"
521 0 : )))
522 3121998 : });
523 :
524 3121998 : if !self.shard_identity.is_key_disposable(&key) {
525 3121998 : if writer.is_none() {
526 242 : // Create writer if not initiaized yet
527 242 : writer = Some(
528 : DeltaLayerWriter::new(
529 242 : self.conf,
530 242 : self.timeline_id,
531 242 : self.tenant_shard_id,
532 242 : key,
533 242 : if dup_end_lsn.is_valid() {
534 : // this is a layer containing slice of values of the same key
535 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
536 0 : dup_start_lsn..dup_end_lsn
537 : } else {
538 242 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
539 242 : lsn_range.clone()
540 : },
541 : )
542 121 : .await?,
543 : );
544 3121756 : }
545 :
546 3121998 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
547 : } else {
548 0 : debug!(
549 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
550 0 : key,
551 0 : self.shard_identity.get_shard_number(&key)
552 0 : );
553 : }
554 :
555 3121998 : if !new_layers.is_empty() {
556 19786 : fail_point!("after-timeline-compacted-first-L1");
557 3102212 : }
558 :
559 3121998 : prev_key = Some(key);
560 : }
561 42 : if let Some(writer) = writer {
562 3009 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
563 0 : }
564 :
565 : // Sync layers
566 42 : if !new_layers.is_empty() {
567 : // Print a warning if the created layer is larger than double the target size
568 : // Add two pages for potential overhead. This should in theory be already
569 : // accounted for in the target calculation, but for very small targets,
570 : // we still might easily hit the limit otherwise.
571 42 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
572 242 : for layer in new_layers.iter() {
573 242 : if layer.layer_desc().file_size > warn_limit {
574 0 : warn!(
575 0 : %layer,
576 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
577 0 : );
578 242 : }
579 : }
580 :
581 : // The writer.finish() above already did the fsync of the inodes.
582 : // We just need to fsync the directory in which these inodes are linked,
583 : // which we know to be the timeline directory.
584 : //
585 : // We use fatal_err() below because the after writer.finish() returns with success,
586 : // the in-memory state of the filesystem already has the layer file in its final place,
587 : // and subsequent pageserver code could think it's durable while it really isn't.
588 42 : let timeline_dir = VirtualFile::open(
589 42 : &self
590 42 : .conf
591 42 : .timeline_path(&self.tenant_shard_id, &self.timeline_id),
592 42 : )
593 21 : .await
594 42 : .fatal_err("VirtualFile::open for timeline dir fsync");
595 42 : timeline_dir
596 42 : .sync_all()
597 21 : .await
598 42 : .fatal_err("VirtualFile::sync_all timeline dir");
599 0 : }
600 :
601 42 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
602 42 : stats.new_deltas_count = Some(new_layers.len());
603 242 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
604 42 :
605 42 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
606 42 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
607 : {
608 42 : Ok(stats_json) => {
609 42 : info!(
610 42 : stats_json = stats_json.as_str(),
611 42 : "compact_level0_phase1 stats available"
612 42 : )
613 : }
614 0 : Err(e) => {
615 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
616 : }
617 : }
618 :
619 42 : Ok(CompactLevel0Phase1Result {
620 42 : new_layers,
621 42 : deltas_to_compact: deltas_to_compact
622 42 : .into_iter()
623 442 : .map(|x| x.drop_eviction_guard())
624 42 : .collect::<Vec<_>>(),
625 42 : })
626 510 : }
627 : }
628 :
629 : #[derive(Default)]
630 : struct CompactLevel0Phase1Result {
631 : new_layers: Vec<ResidentLayer>,
632 : deltas_to_compact: Vec<Layer>,
633 : }
634 :
635 : #[derive(Default)]
636 : struct CompactLevel0Phase1StatsBuilder {
637 : version: Option<u64>,
638 : tenant_id: Option<TenantShardId>,
639 : timeline_id: Option<TimelineId>,
640 : read_lock_acquisition_micros: DurationRecorder,
641 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
642 : read_lock_held_key_sort_micros: DurationRecorder,
643 : read_lock_held_prerequisites_micros: DurationRecorder,
644 : read_lock_held_compute_holes_micros: DurationRecorder,
645 : read_lock_drop_micros: DurationRecorder,
646 : write_layer_files_micros: DurationRecorder,
647 : level0_deltas_count: Option<usize>,
648 : new_deltas_count: Option<usize>,
649 : new_deltas_size: Option<u64>,
650 : }
651 :
652 : #[derive(serde::Serialize)]
653 : struct CompactLevel0Phase1Stats {
654 : version: u64,
655 : tenant_id: TenantShardId,
656 : timeline_id: TimelineId,
657 : read_lock_acquisition_micros: RecordedDuration,
658 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
659 : read_lock_held_key_sort_micros: RecordedDuration,
660 : read_lock_held_prerequisites_micros: RecordedDuration,
661 : read_lock_held_compute_holes_micros: RecordedDuration,
662 : read_lock_drop_micros: RecordedDuration,
663 : write_layer_files_micros: RecordedDuration,
664 : level0_deltas_count: usize,
665 : new_deltas_count: usize,
666 : new_deltas_size: u64,
667 : }
668 :
669 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
670 : type Error = anyhow::Error;
671 :
672 42 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
673 42 : Ok(Self {
674 42 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
675 42 : tenant_id: value
676 42 : .tenant_id
677 42 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
678 42 : timeline_id: value
679 42 : .timeline_id
680 42 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
681 42 : read_lock_acquisition_micros: value
682 42 : .read_lock_acquisition_micros
683 42 : .into_recorded()
684 42 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
685 42 : read_lock_held_spawn_blocking_startup_micros: value
686 42 : .read_lock_held_spawn_blocking_startup_micros
687 42 : .into_recorded()
688 42 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
689 42 : read_lock_held_key_sort_micros: value
690 42 : .read_lock_held_key_sort_micros
691 42 : .into_recorded()
692 42 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
693 42 : read_lock_held_prerequisites_micros: value
694 42 : .read_lock_held_prerequisites_micros
695 42 : .into_recorded()
696 42 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
697 42 : read_lock_held_compute_holes_micros: value
698 42 : .read_lock_held_compute_holes_micros
699 42 : .into_recorded()
700 42 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
701 42 : read_lock_drop_micros: value
702 42 : .read_lock_drop_micros
703 42 : .into_recorded()
704 42 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
705 42 : write_layer_files_micros: value
706 42 : .write_layer_files_micros
707 42 : .into_recorded()
708 42 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
709 42 : level0_deltas_count: value
710 42 : .level0_deltas_count
711 42 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
712 42 : new_deltas_count: value
713 42 : .new_deltas_count
714 42 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
715 42 : new_deltas_size: value
716 42 : .new_deltas_size
717 42 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
718 : })
719 42 : }
720 : }
721 :
722 : impl Timeline {
723 : /// Entry point for new tiered compaction algorithm.
724 : ///
725 : /// All the real work is in the implementation in the pageserver_compaction
726 : /// crate. The code here would apply to any algorithm implemented by the
727 : /// same interface, but tiered is the only one at the moment.
728 : ///
729 : /// TODO: cancellation
730 0 : pub(crate) async fn compact_tiered(
731 0 : self: &Arc<Self>,
732 0 : _cancel: &CancellationToken,
733 0 : ctx: &RequestContext,
734 0 : ) -> Result<(), CompactionError> {
735 0 : let fanout = self.get_compaction_threshold() as u64;
736 0 : let target_file_size = self.get_checkpoint_distance();
737 :
738 : // Find the top of the historical layers
739 0 : let end_lsn = {
740 0 : let guard = self.layers.read().await;
741 0 : let layers = guard.layer_map();
742 :
743 0 : let l0_deltas = layers.get_level0_deltas()?;
744 0 : drop(guard);
745 0 :
746 0 : // As an optimization, if we find that there are too few L0 layers,
747 0 : // bail out early. We know that the compaction algorithm would do
748 0 : // nothing in that case.
749 0 : if l0_deltas.len() < fanout as usize {
750 : // doesn't need compacting
751 0 : return Ok(());
752 0 : }
753 0 : l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
754 0 : };
755 0 :
756 0 : // Is the timeline being deleted?
757 0 : if self.is_stopping() {
758 0 : trace!("Dropping out of compaction on timeline shutdown");
759 0 : return Err(CompactionError::ShuttingDown);
760 0 : }
761 :
762 0 : let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
763 0 : let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
764 0 :
765 0 : pageserver_compaction::compact_tiered::compact_tiered(
766 0 : &mut adaptor,
767 0 : end_lsn,
768 0 : target_file_size,
769 0 : fanout,
770 0 : ctx,
771 0 : )
772 0 : .await?;
773 :
774 0 : adaptor.flush_updates().await?;
775 0 : Ok(())
776 0 : }
777 : }
778 :
779 : struct TimelineAdaptor {
780 : timeline: Arc<Timeline>,
781 :
782 : keyspace: (Lsn, KeySpace),
783 :
784 : new_deltas: Vec<ResidentLayer>,
785 : new_images: Vec<ResidentLayer>,
786 : layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
787 : }
788 :
789 : impl TimelineAdaptor {
790 0 : pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
791 0 : Self {
792 0 : timeline: timeline.clone(),
793 0 : keyspace,
794 0 : new_images: Vec::new(),
795 0 : new_deltas: Vec::new(),
796 0 : layers_to_delete: Vec::new(),
797 0 : }
798 0 : }
799 :
800 0 : pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
801 0 : let layers_to_delete = {
802 0 : let guard = self.timeline.layers.read().await;
803 0 : self.layers_to_delete
804 0 : .iter()
805 0 : .map(|x| guard.get_from_desc(x))
806 0 : .collect::<Vec<Layer>>()
807 0 : };
808 0 : self.timeline
809 0 : .finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
810 0 : .await?;
811 :
812 0 : self.timeline
813 0 : .upload_new_image_layers(std::mem::take(&mut self.new_images))?;
814 :
815 0 : self.new_deltas.clear();
816 0 : self.layers_to_delete.clear();
817 0 : Ok(())
818 0 : }
819 : }
820 :
821 : #[derive(Clone)]
822 : struct ResidentDeltaLayer(ResidentLayer);
823 : #[derive(Clone)]
824 : struct ResidentImageLayer(ResidentLayer);
825 :
826 : impl CompactionJobExecutor for TimelineAdaptor {
827 : type Key = crate::repository::Key;
828 :
829 : type Layer = OwnArc<PersistentLayerDesc>;
830 : type DeltaLayer = ResidentDeltaLayer;
831 : type ImageLayer = ResidentImageLayer;
832 :
833 : type RequestContext = crate::context::RequestContext;
834 :
835 0 : async fn get_layers(
836 0 : &mut self,
837 0 : key_range: &Range<Key>,
838 0 : lsn_range: &Range<Lsn>,
839 0 : _ctx: &RequestContext,
840 0 : ) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
841 0 : self.flush_updates().await?;
842 :
843 0 : let guard = self.timeline.layers.read().await;
844 0 : let layer_map = guard.layer_map();
845 0 :
846 0 : let result = layer_map
847 0 : .iter_historic_layers()
848 0 : .filter(|l| {
849 0 : overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
850 0 : })
851 0 : .map(OwnArc)
852 0 : .collect();
853 0 : Ok(result)
854 0 : }
855 :
856 0 : async fn get_keyspace(
857 0 : &mut self,
858 0 : key_range: &Range<Key>,
859 0 : lsn: Lsn,
860 0 : _ctx: &RequestContext,
861 0 : ) -> anyhow::Result<Vec<Range<Key>>> {
862 0 : if lsn == self.keyspace.0 {
863 0 : Ok(pageserver_compaction::helpers::intersect_keyspace(
864 0 : &self.keyspace.1.ranges,
865 0 : key_range,
866 0 : ))
867 : } else {
868 : // The current compaction implementatin only ever requests the key space
869 : // at the compaction end LSN.
870 0 : anyhow::bail!("keyspace not available for requested lsn");
871 : }
872 0 : }
873 :
874 0 : async fn downcast_delta_layer(
875 0 : &self,
876 0 : layer: &OwnArc<PersistentLayerDesc>,
877 0 : ) -> anyhow::Result<Option<ResidentDeltaLayer>> {
878 0 : // this is a lot more complex than a simple downcast...
879 0 : if layer.is_delta() {
880 0 : let l = {
881 0 : let guard = self.timeline.layers.read().await;
882 0 : guard.get_from_desc(layer)
883 : };
884 0 : let result = l.download_and_keep_resident().await?;
885 :
886 0 : Ok(Some(ResidentDeltaLayer(result)))
887 : } else {
888 0 : Ok(None)
889 : }
890 0 : }
891 :
892 0 : async fn create_image(
893 0 : &mut self,
894 0 : lsn: Lsn,
895 0 : key_range: &Range<Key>,
896 0 : ctx: &RequestContext,
897 0 : ) -> anyhow::Result<()> {
898 0 : Ok(self.create_image_impl(lsn, key_range, ctx).await?)
899 0 : }
900 :
901 0 : async fn create_delta(
902 0 : &mut self,
903 0 : lsn_range: &Range<Lsn>,
904 0 : key_range: &Range<Key>,
905 0 : input_layers: &[ResidentDeltaLayer],
906 0 : ctx: &RequestContext,
907 0 : ) -> anyhow::Result<()> {
908 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
909 :
910 0 : let mut all_entries = Vec::new();
911 0 : for dl in input_layers.iter() {
912 0 : all_entries.extend(dl.load_keys(ctx).await?);
913 : }
914 :
915 : // The current stdlib sorting implementation is designed in a way where it is
916 : // particularly fast where the slice is made up of sorted sub-ranges.
917 0 : all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
918 :
919 0 : let mut writer = DeltaLayerWriter::new(
920 0 : self.timeline.conf,
921 0 : self.timeline.timeline_id,
922 0 : self.timeline.tenant_shard_id,
923 0 : key_range.start,
924 0 : lsn_range.clone(),
925 0 : )
926 0 : .await?;
927 :
928 0 : let mut dup_values = 0;
929 0 :
930 0 : // This iterator walks through all key-value pairs from all the layers
931 0 : // we're compacting, in key, LSN order.
932 0 : let mut prev: Option<(Key, Lsn)> = None;
933 : for &DeltaEntry {
934 0 : key, lsn, ref val, ..
935 0 : } in all_entries.iter()
936 : {
937 0 : if prev == Some((key, lsn)) {
938 : // This is a duplicate. Skip it.
939 : //
940 : // It can happen if compaction is interrupted after writing some
941 : // layers but not all, and we are compacting the range again.
942 : // The calculations in the algorithm assume that there are no
943 : // duplicates, so the math on targeted file size is likely off,
944 : // and we will create smaller files than expected.
945 0 : dup_values += 1;
946 0 : continue;
947 0 : }
948 :
949 0 : let value = val.load(ctx).await?;
950 :
951 0 : writer.put_value(key, lsn, value).await?;
952 :
953 0 : prev = Some((key, lsn));
954 : }
955 :
956 0 : if dup_values > 0 {
957 0 : warn!("delta layer created with {} duplicate values", dup_values);
958 0 : }
959 :
960 0 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
961 0 : Err(anyhow::anyhow!(
962 0 : "failpoint delta-layer-writer-fail-before-finish"
963 0 : ))
964 0 : });
965 :
966 0 : let new_delta_layer = writer
967 0 : .finish(prev.unwrap().0.next(), &self.timeline)
968 0 : .await?;
969 :
970 0 : self.new_deltas.push(new_delta_layer);
971 0 : Ok(())
972 0 : }
973 :
974 0 : async fn delete_layer(
975 0 : &mut self,
976 0 : layer: &OwnArc<PersistentLayerDesc>,
977 0 : _ctx: &RequestContext,
978 0 : ) -> anyhow::Result<()> {
979 0 : self.layers_to_delete.push(layer.clone().0);
980 0 : Ok(())
981 0 : }
982 : }
983 :
984 : impl TimelineAdaptor {
985 0 : async fn create_image_impl(
986 0 : &mut self,
987 0 : lsn: Lsn,
988 0 : key_range: &Range<Key>,
989 0 : ctx: &RequestContext,
990 0 : ) -> Result<(), PageReconstructError> {
991 0 : let timer = self.timeline.metrics.create_images_time_histo.start_timer();
992 :
993 0 : let mut image_layer_writer = ImageLayerWriter::new(
994 0 : self.timeline.conf,
995 0 : self.timeline.timeline_id,
996 0 : self.timeline.tenant_shard_id,
997 0 : key_range,
998 0 : lsn,
999 0 : )
1000 0 : .await?;
1001 :
1002 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
1003 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
1004 0 : "failpoint image-layer-writer-fail-before-finish"
1005 0 : )))
1006 0 : });
1007 0 : let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
1008 0 : for range in &keyspace_ranges {
1009 0 : let mut key = range.start;
1010 0 : while key < range.end {
1011 0 : let img = match self.timeline.get(key, lsn, ctx).await {
1012 0 : Ok(img) => img,
1013 0 : Err(err) => {
1014 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
1015 0 : // page without losing any actual user data. That seems better
1016 0 : // than failing repeatedly and getting stuck.
1017 0 : //
1018 0 : // We had a bug at one point, where we truncated the FSM and VM
1019 0 : // in the pageserver, but the Postgres didn't know about that
1020 0 : // and continued to generate incremental WAL records for pages
1021 0 : // that didn't exist in the pageserver. Trying to replay those
1022 0 : // WAL records failed to find the previous image of the page.
1023 0 : // This special case allows us to recover from that situation.
1024 0 : // See https://github.com/neondatabase/neon/issues/2601.
1025 0 : //
1026 0 : // Unfortunately we cannot do this for the main fork, or for
1027 0 : // any metadata keys, keys, as that would lead to actual data
1028 0 : // loss.
1029 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
1030 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
1031 0 : ZERO_PAGE.clone()
1032 : } else {
1033 0 : return Err(err);
1034 : }
1035 : }
1036 : };
1037 0 : image_layer_writer.put_image(key, img).await?;
1038 0 : key = key.next();
1039 : }
1040 : }
1041 0 : let image_layer = image_layer_writer.finish(&self.timeline).await?;
1042 :
1043 0 : self.new_images.push(image_layer);
1044 0 :
1045 0 : timer.stop_and_record();
1046 0 :
1047 0 : Ok(())
1048 0 : }
1049 : }
1050 :
1051 : impl CompactionRequestContext for crate::context::RequestContext {}
1052 :
1053 : #[derive(Debug, Clone)]
1054 : pub struct OwnArc<T>(pub Arc<T>);
1055 :
1056 : impl<T> Deref for OwnArc<T> {
1057 : type Target = <Arc<T> as Deref>::Target;
1058 0 : fn deref(&self) -> &Self::Target {
1059 0 : &self.0
1060 0 : }
1061 : }
1062 :
1063 : impl<T> AsRef<T> for OwnArc<T> {
1064 0 : fn as_ref(&self) -> &T {
1065 0 : self.0.as_ref()
1066 0 : }
1067 : }
1068 :
1069 : impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
1070 0 : fn key_range(&self) -> &Range<Key> {
1071 0 : &self.key_range
1072 0 : }
1073 0 : fn lsn_range(&self) -> &Range<Lsn> {
1074 0 : &self.lsn_range
1075 0 : }
1076 0 : fn file_size(&self) -> u64 {
1077 0 : self.file_size
1078 0 : }
1079 0 : fn short_id(&self) -> std::string::String {
1080 0 : self.as_ref().short_id().to_string()
1081 0 : }
1082 0 : fn is_delta(&self) -> bool {
1083 0 : self.as_ref().is_delta()
1084 0 : }
1085 : }
1086 :
1087 : impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
1088 0 : fn key_range(&self) -> &Range<Key> {
1089 0 : &self.layer_desc().key_range
1090 0 : }
1091 0 : fn lsn_range(&self) -> &Range<Lsn> {
1092 0 : &self.layer_desc().lsn_range
1093 0 : }
1094 0 : fn file_size(&self) -> u64 {
1095 0 : self.layer_desc().file_size
1096 0 : }
1097 0 : fn short_id(&self) -> std::string::String {
1098 0 : self.layer_desc().short_id().to_string()
1099 0 : }
1100 0 : fn is_delta(&self) -> bool {
1101 0 : true
1102 0 : }
1103 : }
1104 :
1105 : use crate::tenant::timeline::DeltaEntry;
1106 :
1107 : impl CompactionLayer<Key> for ResidentDeltaLayer {
1108 0 : fn key_range(&self) -> &Range<Key> {
1109 0 : &self.0.layer_desc().key_range
1110 0 : }
1111 0 : fn lsn_range(&self) -> &Range<Lsn> {
1112 0 : &self.0.layer_desc().lsn_range
1113 0 : }
1114 0 : fn file_size(&self) -> u64 {
1115 0 : self.0.layer_desc().file_size
1116 0 : }
1117 0 : fn short_id(&self) -> std::string::String {
1118 0 : self.0.layer_desc().short_id().to_string()
1119 0 : }
1120 0 : fn is_delta(&self) -> bool {
1121 0 : true
1122 0 : }
1123 : }
1124 :
1125 : #[async_trait]
1126 : impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
1127 : type DeltaEntry<'a> = DeltaEntry<'a>;
1128 :
1129 0 : async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
1130 0 : self.0.load_keys(ctx).await
1131 0 : }
1132 : }
1133 :
1134 : impl CompactionLayer<Key> for ResidentImageLayer {
1135 0 : fn key_range(&self) -> &Range<Key> {
1136 0 : &self.0.layer_desc().key_range
1137 0 : }
1138 0 : fn lsn_range(&self) -> &Range<Lsn> {
1139 0 : &self.0.layer_desc().lsn_range
1140 0 : }
1141 0 : fn file_size(&self) -> u64 {
1142 0 : self.0.layer_desc().file_size
1143 0 : }
1144 0 : fn short_id(&self) -> std::string::String {
1145 0 : self.0.layer_desc().short_id().to_string()
1146 0 : }
1147 0 : fn is_delta(&self) -> bool {
1148 0 : false
1149 0 : }
1150 : }
1151 : impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}
|