Line data Source code
1 : //!
2 : //! The layer map tracks what layers exist in a timeline.
3 : //!
4 : //! When the timeline is first accessed, the server lists of all layer files
5 : //! in the timelines/<timeline_id> directory, and populates this map with
6 : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
7 : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
8 : //! records. Now and then, in the checkpoint() function, the in-memory layer is
9 : //! are frozen, and it is split up into new image and delta layers and the
10 : //! corresponding files are written to disk.
11 : //!
12 : //! Design overview:
13 : //!
14 : //! The `search` method of the layer map is on the read critical path, so we've
15 : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
16 : //! Other read methods are less critical but still impact performance of background tasks.
17 : //!
18 : //! This data structure relies on a persistent/immutable binary search tree. See the
19 : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
20 : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
21 : //! you to modify the tree in such a way that each modification creates a new "version"
22 : //! of the tree. When you modify it, you get a new version, but all previous versions are
23 : //! still accessible too. So if someone is still holding a reference to an older version,
24 : //! they continue to see the tree as it was then. The persistent BST stores all the
25 : //! different versions in an efficient way.
26 : //!
27 : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
28 : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
29 : //! to handle the LSN dimension.
30 : //!
31 : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
32 : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
33 : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
34 : //! `historic_layer_coverage.rs`.
35 : //!
36 : //! To search for a particular key-LSN pair, you first look up the right "version" in the
37 : //! BTreeMap. Then you search that version of the BST with the key.
38 : //!
39 : //! The persistent BST keeps all the versions, but there is no way to change the old versions
40 : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
41 : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
42 : //! to throw away most of the persistent BST and build a new one, starting from the oldest
43 : //! LSN. See [`LayerMap::flush_updates()`].
44 : //!
45 :
46 : mod historic_layer_coverage;
47 : mod layer_coverage;
48 :
49 : use crate::context::RequestContext;
50 : use crate::keyspace::KeyPartitioning;
51 : use crate::tenant::storage_layer::InMemoryLayer;
52 : use anyhow::Result;
53 : use pageserver_api::key::Key;
54 : use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
55 : use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
56 : use std::collections::{HashMap, VecDeque};
57 : use std::iter::Peekable;
58 : use std::ops::Range;
59 : use std::sync::Arc;
60 : use tokio::sync::watch;
61 : use utils::lsn::Lsn;
62 :
63 : use historic_layer_coverage::BufferedHistoricLayerCoverage;
64 : pub use historic_layer_coverage::LayerKey;
65 :
66 : use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
67 :
68 : ///
69 : /// LayerMap tracks what layers exist on a timeline.
70 : ///
71 : pub struct LayerMap {
72 : //
73 : // 'open_layer' holds the current InMemoryLayer that is accepting new
74 : // records. If it is None, 'next_open_layer_at' will be set instead, indicating
75 : // where the start LSN of the next InMemoryLayer that is to be created.
76 : //
77 : pub open_layer: Option<Arc<InMemoryLayer>>,
78 : pub next_open_layer_at: Option<Lsn>,
79 :
80 : ///
81 : /// Frozen layers, if any. Frozen layers are in-memory layers that
82 : /// are no longer added to, but haven't been written out to disk
83 : /// yet. They contain WAL older than the current 'open_layer' or
84 : /// 'next_open_layer_at', but newer than any historic layer.
85 : /// The frozen layers are in order from oldest to newest, so that
86 : /// the newest one is in the 'back' of the VecDeque, and the oldest
87 : /// in the 'front'.
88 : ///
89 : pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
90 :
91 : /// Index of the historic layers optimized for search
92 : historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
93 :
94 : /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
95 : /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
96 : ///
97 : /// NB: make sure to notify `watch_l0_deltas` on changes.
98 : l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
99 :
100 : /// Notifies about L0 delta layer changes, sending the current number of L0 layers.
101 : watch_l0_deltas: watch::Sender<usize>,
102 : }
103 :
104 : impl Default for LayerMap {
105 912 : fn default() -> Self {
106 912 : Self {
107 912 : open_layer: Default::default(),
108 912 : next_open_layer_at: Default::default(),
109 912 : frozen_layers: Default::default(),
110 912 : historic: Default::default(),
111 912 : l0_delta_layers: Default::default(),
112 912 : watch_l0_deltas: watch::channel(0).0,
113 912 : }
114 912 : }
115 : }
116 :
117 : /// The primary update API for the layer map.
118 : ///
119 : /// Batching historic layer insertions and removals is good for
120 : /// performance and this struct helps us do that correctly.
121 : #[must_use]
122 : pub struct BatchedUpdates<'a> {
123 : // While we hold this exclusive reference to the layer map the type checker
124 : // will prevent us from accidentally reading any unflushed updates.
125 : layer_map: &'a mut LayerMap,
126 : }
127 :
128 : /// Provide ability to batch more updates while hiding the read
129 : /// API so we don't accidentally read without flushing.
130 : impl BatchedUpdates<'_> {
131 : ///
132 : /// Insert an on-disk layer.
133 : ///
134 : // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
135 9792 : pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
136 9792 : self.layer_map.insert_historic_noflush(layer_desc)
137 9792 : }
138 :
139 : ///
140 : /// Remove an on-disk layer from the map.
141 : ///
142 : /// This should be called when the corresponding file on disk has been deleted.
143 : ///
144 1032 : pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
145 1032 : self.layer_map.remove_historic_noflush(layer_desc)
146 1032 : }
147 :
148 : // We will flush on drop anyway, but this method makes it
149 : // more explicit that there is some work being done.
150 : /// Apply all updates
151 3568 : pub fn flush(self) {
152 3568 : // Flush happens on drop
153 3568 : }
154 : }
155 :
156 : // Ideally the flush() method should be called explicitly for more
157 : // controlled execution. But if we forget we'd rather flush on drop
158 : // than panic later or read without flushing.
159 : //
160 : // TODO maybe warn if flush hasn't explicitly been called
161 : impl Drop for BatchedUpdates<'_> {
162 3568 : fn drop(&mut self) {
163 3568 : self.layer_map.flush_updates();
164 3568 : }
165 : }
166 :
167 : /// Return value of LayerMap::search
168 : #[derive(Eq, PartialEq, Debug, Hash)]
169 : pub struct SearchResult {
170 : pub layer: Arc<PersistentLayerDesc>,
171 : pub lsn_floor: Lsn,
172 : }
173 :
174 : /// Return value of [`LayerMap::range_search`]
175 : ///
176 : /// Contains a mapping from a layer description to a keyspace
177 : /// accumulator that contains all the keys which intersect the layer
178 : /// from the original search space. Keys that were not found are accumulated
179 : /// in a separate key space accumulator.
180 : #[derive(Debug)]
181 : pub struct RangeSearchResult {
182 : pub found: HashMap<SearchResult, KeySpaceAccum>,
183 : pub not_found: KeySpaceAccum,
184 : }
185 :
186 : impl RangeSearchResult {
187 951168 : fn new() -> Self {
188 951168 : Self {
189 951168 : found: HashMap::new(),
190 951168 : not_found: KeySpaceAccum::new(),
191 951168 : }
192 951168 : }
193 : }
194 :
195 : /// Collector for results of range search queries on the LayerMap.
196 : /// It should be provided with two iterators for the delta and image coverage
197 : /// that contain all the changes for layers which intersect the range.
198 : struct RangeSearchCollector<Iter>
199 : where
200 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
201 : {
202 : delta_coverage: Peekable<Iter>,
203 : image_coverage: Peekable<Iter>,
204 : key_range: Range<Key>,
205 : end_lsn: Lsn,
206 :
207 : current_delta: Option<Arc<PersistentLayerDesc>>,
208 : current_image: Option<Arc<PersistentLayerDesc>>,
209 :
210 : result: RangeSearchResult,
211 : }
212 :
213 : #[derive(Debug)]
214 : enum NextLayerType {
215 : Delta(i128),
216 : Image(i128),
217 : Both(i128),
218 : }
219 :
220 : impl NextLayerType {
221 986732 : fn next_change_at_key(&self) -> Key {
222 986732 : match self {
223 287434 : NextLayerType::Delta(at) => Key::from_i128(*at),
224 11844 : NextLayerType::Image(at) => Key::from_i128(*at),
225 687454 : NextLayerType::Both(at) => Key::from_i128(*at),
226 : }
227 986732 : }
228 : }
229 :
230 : impl<Iter> RangeSearchCollector<Iter>
231 : where
232 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
233 : {
234 490724 : fn new(
235 490724 : key_range: Range<Key>,
236 490724 : end_lsn: Lsn,
237 490724 : delta_coverage: Iter,
238 490724 : image_coverage: Iter,
239 490724 : ) -> Self {
240 490724 : Self {
241 490724 : delta_coverage: delta_coverage.peekable(),
242 490724 : image_coverage: image_coverage.peekable(),
243 490724 : key_range,
244 490724 : end_lsn,
245 490724 : current_delta: None,
246 490724 : current_image: None,
247 490724 : result: RangeSearchResult::new(),
248 490724 : }
249 490724 : }
250 :
251 : /// Run the collector. Collection is implemented via a two pointer algorithm.
252 : /// One pointer tracks the start of the current range and the other tracks
253 : /// the beginning of the next range which will overlap with the next change
254 : /// in coverage across both image and delta.
255 490724 : fn collect(mut self) -> RangeSearchResult {
256 490724 : let next_layer_type = self.choose_next_layer_type();
257 486212 : let mut current_range_start = match next_layer_type {
258 : None => {
259 : // No changes for the range
260 4512 : self.pad_range(self.key_range.clone());
261 4512 : return self.result;
262 : }
263 486212 : Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
264 0 : // Changes only after the end of the range
265 0 : self.pad_range(self.key_range.clone());
266 0 : return self.result;
267 : }
268 486212 : Some(layer_type) => {
269 486212 : // Changes for the range exist. Record anything before the first
270 486212 : // coverage change as not found.
271 486212 : let coverage_start = layer_type.next_change_at_key();
272 486212 : let range_before = self.key_range.start..coverage_start;
273 486212 : self.pad_range(range_before);
274 486212 :
275 486212 : self.advance(&layer_type);
276 486212 : coverage_start
277 : }
278 : };
279 :
280 986732 : while current_range_start < self.key_range.end {
281 500520 : let next_layer_type = self.choose_next_layer_type();
282 500520 : match next_layer_type {
283 14308 : Some(t) => {
284 14308 : let current_range_end = t.next_change_at_key();
285 14308 : self.add_range(current_range_start..current_range_end);
286 14308 : current_range_start = current_range_end;
287 14308 :
288 14308 : self.advance(&t);
289 14308 : }
290 486212 : None => {
291 486212 : self.add_range(current_range_start..self.key_range.end);
292 486212 : current_range_start = self.key_range.end;
293 486212 : }
294 : }
295 : }
296 :
297 486212 : self.result
298 490724 : }
299 :
300 : /// Mark a range as not found (i.e. no layers intersect it)
301 492600 : fn pad_range(&mut self, key_range: Range<Key>) {
302 492600 : if !key_range.is_empty() {
303 8372 : self.result.not_found.add_range(key_range);
304 484228 : }
305 492600 : }
306 :
307 : /// Select the appropiate layer for the given range and update
308 : /// the collector.
309 500520 : fn add_range(&mut self, covered_range: Range<Key>) {
310 500520 : let selected = LayerMap::select_layer(
311 500520 : self.current_delta.clone(),
312 500520 : self.current_image.clone(),
313 500520 : self.end_lsn,
314 500520 : );
315 500520 :
316 500520 : match selected {
317 498644 : Some(search_result) => self
318 498644 : .result
319 498644 : .found
320 498644 : .entry(search_result)
321 498644 : .or_default()
322 498644 : .add_range(covered_range),
323 1876 : None => self.pad_range(covered_range),
324 : }
325 500520 : }
326 :
327 : /// Move to the next coverage change.
328 500520 : fn advance(&mut self, layer_type: &NextLayerType) {
329 500520 : match layer_type {
330 145437 : NextLayerType::Delta(_) => {
331 145437 : let (_, layer) = self.delta_coverage.next().unwrap();
332 145437 : self.current_delta = layer;
333 145437 : }
334 6832 : NextLayerType::Image(_) => {
335 6832 : let (_, layer) = self.image_coverage.next().unwrap();
336 6832 : self.current_image = layer;
337 6832 : }
338 348251 : NextLayerType::Both(_) => {
339 348251 : let (_, image_layer) = self.image_coverage.next().unwrap();
340 348251 : let (_, delta_layer) = self.delta_coverage.next().unwrap();
341 348251 :
342 348251 : self.current_image = image_layer;
343 348251 : self.current_delta = delta_layer;
344 348251 : }
345 : }
346 500520 : }
347 :
348 : /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
349 991244 : fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
350 991244 : let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
351 991244 : let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
352 991244 :
353 991244 : match (next_delta_at, next_image_at) {
354 490724 : (None, None) => None,
355 140393 : (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
356 5952 : (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
357 354175 : (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
358 880 : Some(NextLayerType::Image(*next_image_at))
359 : }
360 353295 : (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
361 5044 : Some(NextLayerType::Delta(*next_delta_at))
362 : }
363 348251 : (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
364 : }
365 991244 : }
366 : }
367 :
368 : impl LayerMap {
369 : ///
370 : /// Find the latest layer (by lsn.end) that covers the given
371 : /// 'key', with lsn.start < 'end_lsn'.
372 : ///
373 : /// The caller of this function is the page reconstruction
374 : /// algorithm looking for the next relevant delta layer, or
375 : /// the terminal image layer. The caller will pass the lsn_floor
376 : /// value as end_lsn in the next call to search.
377 : ///
378 : /// If there's an image layer exactly below the given end_lsn,
379 : /// search should return that layer regardless if there are
380 : /// overlapping deltas.
381 : ///
382 : /// If the latest layer is a delta and there is an overlapping
383 : /// image with it below, the lsn_floor returned should be right
384 : /// above that image so we don't skip it in the search. Otherwise
385 : /// the lsn_floor returned should be the bottom of the delta layer
386 : /// because we should make as much progress down the lsn axis
387 : /// as possible. It's fine if this way we skip some overlapping
388 : /// deltas, because the delta we returned would contain the same
389 : /// wal content.
390 : ///
391 : /// TODO: This API is convoluted and inefficient. If the caller
392 : /// makes N search calls, we'll end up finding the same latest
393 : /// image layer N times. We should either cache the latest image
394 : /// layer result, or simplify the api to `get_latest_image` and
395 : /// `get_latest_delta`, and only call `get_latest_image` once.
396 : ///
397 : /// NOTE: This only searches the 'historic' layers, *not* the
398 : /// 'open' and 'frozen' layers!
399 : ///
400 143964 : pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
401 143964 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
402 143964 : let latest_delta = version.delta_coverage.query(key.to_i128());
403 143964 : let latest_image = version.image_coverage.query(key.to_i128());
404 143964 :
405 143964 : Self::select_layer(latest_delta, latest_image, end_lsn)
406 143964 : }
407 :
408 644484 : fn select_layer(
409 644484 : delta_layer: Option<Arc<PersistentLayerDesc>>,
410 644484 : image_layer: Option<Arc<PersistentLayerDesc>>,
411 644484 : end_lsn: Lsn,
412 644484 : ) -> Option<SearchResult> {
413 644484 : assert!(delta_layer.as_ref().is_none_or(|l| l.is_delta()));
414 644484 : assert!(image_layer.as_ref().is_none_or(|l| !l.is_delta()));
415 :
416 644484 : match (delta_layer, image_layer) {
417 23196 : (None, None) => None,
418 73640 : (None, Some(image)) => {
419 73640 : let lsn_floor = image.get_lsn_range().start;
420 73640 : Some(SearchResult {
421 73640 : layer: image,
422 73640 : lsn_floor,
423 73640 : })
424 : }
425 154197 : (Some(delta), None) => {
426 154197 : let lsn_floor = delta.get_lsn_range().start;
427 154197 : Some(SearchResult {
428 154197 : layer: delta,
429 154197 : lsn_floor,
430 154197 : })
431 : }
432 393451 : (Some(delta), Some(image)) => {
433 393451 : let img_lsn = image.get_lsn_range().start;
434 393451 : let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
435 393451 : let image_exact_match = img_lsn + 1 == end_lsn;
436 393451 : if image_is_newer || image_exact_match {
437 61888 : Some(SearchResult {
438 61888 : layer: image,
439 61888 : lsn_floor: img_lsn,
440 61888 : })
441 : } else {
442 331563 : let lsn_floor =
443 331563 : std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
444 331563 : Some(SearchResult {
445 331563 : layer: delta,
446 331563 : lsn_floor,
447 331563 : })
448 : }
449 : }
450 : }
451 644484 : }
452 :
453 944088 : pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
454 944088 : let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
455 490724 : Some(version) => version,
456 : None => {
457 453364 : let mut result = RangeSearchResult::new();
458 453364 : result.not_found.add_range(key_range);
459 453364 : return result;
460 : }
461 : };
462 :
463 490724 : let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
464 490724 : let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
465 490724 : let image_changes = version.image_coverage.range_overlaps(&raw_range);
466 490724 :
467 490724 : let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
468 490724 : collector.collect()
469 944088 : }
470 :
471 : /// Start a batch of updates, applied on drop
472 3568 : pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
473 3568 : BatchedUpdates { layer_map: self }
474 3568 : }
475 :
476 : ///
477 : /// Insert an on-disk layer
478 : ///
479 : /// Helper function for BatchedUpdates::insert_historic
480 : ///
481 : /// TODO(chi): remove L generic so that we do not need to pass layer object.
482 9812 : pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
483 9812 : // TODO: See #3869, resulting #4088, attempted fix and repro #4094
484 9812 :
485 9812 : if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
486 1996 : self.l0_delta_layers.push(layer_desc.clone().into());
487 1996 : self.watch_l0_deltas
488 1996 : .send_replace(self.l0_delta_layers.len());
489 7816 : }
490 :
491 9812 : self.historic.insert(
492 9812 : historic_layer_coverage::LayerKey::from(&layer_desc),
493 9812 : layer_desc.into(),
494 9812 : );
495 9812 : }
496 :
497 : ///
498 : /// Remove an on-disk layer from the map.
499 : ///
500 : /// Helper function for BatchedUpdates::remove_historic
501 : ///
502 1032 : pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
503 1032 : self.historic
504 1032 : .remove(historic_layer_coverage::LayerKey::from(layer_desc));
505 1032 : let layer_key = layer_desc.key();
506 1032 : if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
507 808 : let len_before = self.l0_delta_layers.len();
508 808 : let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
509 11048 : l0_delta_layers.retain(|other| other.key() != layer_key);
510 808 : self.l0_delta_layers = l0_delta_layers;
511 808 : self.watch_l0_deltas
512 808 : .send_replace(self.l0_delta_layers.len());
513 808 : // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
514 808 : // there's a chance that the comparison fails at runtime due to it comparing (pointer,
515 808 : // vtable) pairs.
516 808 : assert_eq!(
517 808 : self.l0_delta_layers.len(),
518 808 : len_before - 1,
519 0 : "failed to locate removed historic layer from l0_delta_layers"
520 : );
521 224 : }
522 1032 : }
523 :
524 : /// Helper function for BatchedUpdates::drop.
525 3572 : pub(self) fn flush_updates(&mut self) {
526 3572 : self.historic.rebuild();
527 3572 : }
528 :
529 : /// Is there a newer image layer for given key- and LSN-range? Or a set
530 : /// of image layers within the specified lsn range that cover the entire
531 : /// specified key range?
532 : ///
533 : /// This is used for garbage collection, to determine if an old layer can
534 : /// be deleted.
535 16 : pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
536 16 : if key.is_empty() {
537 : // Vacuously true. There's a newer image for all 0 of the kerys in the range.
538 0 : return true;
539 16 : }
540 :
541 16 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
542 16 : Some(v) => v,
543 0 : None => return false,
544 : };
545 :
546 16 : let start = key.start.to_i128();
547 16 : let end = key.end.to_i128();
548 16 :
549 20 : let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
550 20 : Some(layer) => layer.get_lsn_range().start >= lsn.start,
551 0 : None => false,
552 20 : };
553 :
554 : // Check the start is covered
555 16 : if !layer_covers(version.image_coverage.query(start)) {
556 12 : return false;
557 4 : }
558 :
559 : // Check after all changes of coverage
560 4 : for (_, change_val) in version.image_coverage.range(start..end) {
561 4 : if !layer_covers(change_val) {
562 0 : return false;
563 4 : }
564 : }
565 :
566 4 : true
567 16 : }
568 :
569 7324 : pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
570 7324 : self.historic.iter()
571 7324 : }
572 :
573 : /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
574 2149811 : pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
575 2149811 : where
576 2149811 : Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
577 2149811 : {
578 2149811 : if let Some(open) = &self.open_layer {
579 1823122 : if pred(open) {
580 1208471 : return Some(open.clone());
581 614651 : }
582 326689 : }
583 :
584 941340 : self.frozen_layers.iter().rfind(|l| pred(l)).cloned()
585 2149811 : }
586 :
587 : ///
588 : /// Divide the whole given range of keys into sub-ranges based on the latest
589 : /// image layer that covers each range at the specified lsn (inclusive).
590 : /// This is used when creating new image layers.
591 28 : pub fn image_coverage(
592 28 : &self,
593 28 : key_range: &Range<Key>,
594 28 : lsn: Lsn,
595 28 : ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
596 28 : let version = match self.historic.get().unwrap().get_version(lsn.0) {
597 28 : Some(v) => v,
598 0 : None => return vec![],
599 : };
600 :
601 28 : let start = key_range.start.to_i128();
602 28 : let end = key_range.end.to_i128();
603 28 :
604 28 : // Initialize loop variables
605 28 : let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
606 28 : let mut current_key = start;
607 28 : let mut current_val = version.image_coverage.query(start);
608 :
609 : // Loop through the change events and push intervals
610 28 : for (change_key, change_val) in version.image_coverage.range(start..end) {
611 0 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
612 0 : coverage.push((kr, current_val.take()));
613 0 : current_key = change_key;
614 0 : current_val.clone_from(&change_val);
615 0 : }
616 :
617 : // Add the final interval
618 28 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
619 28 : coverage.push((kr, current_val.take()));
620 28 :
621 28 : coverage
622 28 : }
623 :
624 : /// Check if the key range resembles that of an L0 layer.
625 17094 : pub fn is_l0(key_range: &Range<Key>, is_delta_layer: bool) -> bool {
626 17094 : is_delta_layer && key_range == &(Key::MIN..Key::MAX)
627 17094 : }
628 :
629 : /// This function determines which layers are counted in `count_deltas`:
630 : /// layers that should count towards deciding whether or not to reimage
631 : /// a certain partition range.
632 : ///
633 : /// There are two kinds of layers we currently consider reimage-worthy:
634 : ///
635 : /// Case 1: Non-L0 layers are currently reimage-worthy by default.
636 : /// TODO Some of these layers are very sparse and cover the entire key
637 : /// range. Replacing 256MB of data (or less!) with terabytes of
638 : /// images doesn't seem wise. We need a better heuristic, possibly
639 : /// based on some of these factors:
640 : /// a) whether this layer has any wal in this partition range
641 : /// b) the size of the layer
642 : /// c) the number of images needed to cover it
643 : /// d) the estimated time until we'll have to reimage over it for GC
644 : ///
645 : /// Case 2: Since L0 layers by definition cover the entire key space, we consider
646 : /// them reimage-worthy only when the entire key space can be covered by very few
647 : /// images (currently 1).
648 : /// TODO The optimal number should probably be slightly higher than 1, but to
649 : /// implement that we need to plumb a lot more context into this function
650 : /// than just the current partition_range.
651 0 : pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
652 0 : // Case 1
653 0 : if !Self::is_l0(&layer.key_range, layer.is_delta) {
654 0 : return true;
655 0 : }
656 0 :
657 0 : // Case 2
658 0 : if partition_range == &(Key::MIN..Key::MAX) {
659 0 : return true;
660 0 : }
661 0 :
662 0 : false
663 0 : }
664 :
665 : /// Count the height of the tallest stack of reimage-worthy deltas
666 : /// in this 2d region.
667 : ///
668 : /// If `limit` is provided we don't try to count above that number.
669 : ///
670 : /// This number is used to compute the largest number of deltas that
671 : /// we'll need to visit for any page reconstruction in this region.
672 : /// We use this heuristic to decide whether to create an image layer.
673 28 : pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
674 28 : // We get the delta coverage of the region, and for each part of the coverage
675 28 : // we recurse right underneath the delta. The recursion depth is limited by
676 28 : // the largest result this function could return, which is in practice between
677 28 : // 3 and 10 (since we usually try to create an image when the number gets larger).
678 28 :
679 28 : if lsn.is_empty() || key.is_empty() || limit == Some(0) {
680 0 : return 0;
681 28 : }
682 :
683 28 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
684 28 : Some(v) => v,
685 0 : None => return 0,
686 : };
687 :
688 28 : let start = key.start.to_i128();
689 28 : let end = key.end.to_i128();
690 28 :
691 28 : // Initialize loop variables
692 28 : let mut max_stacked_deltas = 0;
693 28 : let mut current_key = start;
694 28 : let mut current_val = version.delta_coverage.query(start);
695 :
696 : // Loop through the delta coverage and recurse on each part
697 28 : for (change_key, change_val) in version.delta_coverage.range(start..end) {
698 : // If there's a relevant delta in this part, add 1 and recurse down
699 0 : if let Some(val) = ¤t_val {
700 0 : if val.get_lsn_range().end > lsn.start {
701 0 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
702 0 : let lr = lsn.start..val.get_lsn_range().start;
703 0 : if !kr.is_empty() {
704 0 : let base_count = Self::is_reimage_worthy(val, key) as usize;
705 0 : let new_limit = limit.map(|l| l - base_count);
706 0 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
707 0 : max_stacked_deltas = std::cmp::max(
708 0 : max_stacked_deltas,
709 0 : base_count + max_stacked_deltas_underneath,
710 0 : );
711 0 : }
712 0 : }
713 0 : }
714 :
715 0 : current_key = change_key;
716 0 : current_val.clone_from(&change_val);
717 : }
718 :
719 : // Consider the last part
720 28 : if let Some(val) = ¤t_val {
721 0 : if val.get_lsn_range().end > lsn.start {
722 0 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
723 0 : let lr = lsn.start..val.get_lsn_range().start;
724 0 :
725 0 : if !kr.is_empty() {
726 0 : let base_count = Self::is_reimage_worthy(val, key) as usize;
727 0 : let new_limit = limit.map(|l| l - base_count);
728 0 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
729 0 : max_stacked_deltas = std::cmp::max(
730 0 : max_stacked_deltas,
731 0 : base_count + max_stacked_deltas_underneath,
732 0 : );
733 0 : }
734 0 : }
735 28 : }
736 :
737 28 : max_stacked_deltas
738 28 : }
739 :
740 : /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
741 : ///
742 : /// The `partition_range` argument is used as context for the reimage-worthiness decision.
743 : ///
744 : /// Used as a helper for correctness checks only. Performance not critical.
745 0 : pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
746 0 : match self.search(key, lsn) {
747 0 : Some(search_result) => {
748 0 : if search_result.layer.is_incremental() {
749 0 : (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
750 0 : + self.get_difficulty(search_result.lsn_floor, key, partition_range)
751 : } else {
752 0 : 0
753 : }
754 : }
755 0 : None => 0,
756 : }
757 0 : }
758 :
759 : /// Used for correctness checking. Results are expected to be identical to
760 : /// self.get_difficulty_map. Assumes self.search is correct.
761 0 : pub fn get_difficulty_map_bruteforce(
762 0 : &self,
763 0 : lsn: Lsn,
764 0 : partitioning: &KeyPartitioning,
765 0 : ) -> Vec<usize> {
766 0 : // Looking at the difficulty as a function of key, it could only increase
767 0 : // when a delta layer starts or an image layer ends. Therefore it's sufficient
768 0 : // to check the difficulties at:
769 0 : // - the key.start for each non-empty part range
770 0 : // - the key.start for each delta
771 0 : // - the key.end for each image
772 0 : let keys_iter: Box<dyn Iterator<Item = Key>> = {
773 0 : let mut keys: Vec<Key> = self
774 0 : .iter_historic_layers()
775 0 : .map(|layer| {
776 0 : if layer.is_incremental() {
777 0 : layer.get_key_range().start
778 : } else {
779 0 : layer.get_key_range().end
780 : }
781 0 : })
782 0 : .collect();
783 0 : keys.sort();
784 0 : Box::new(keys.into_iter())
785 0 : };
786 0 : let mut keys_iter = keys_iter.peekable();
787 0 :
788 0 : // Iter the partition and keys together and query all the necessary
789 0 : // keys, computing the max difficulty for each part.
790 0 : partitioning
791 0 : .parts
792 0 : .iter()
793 0 : .map(|part| {
794 0 : let mut difficulty = 0;
795 : // Partition ranges are assumed to be sorted and disjoint
796 : // TODO assert it
797 0 : for range in &part.ranges {
798 0 : if !range.is_empty() {
799 0 : difficulty =
800 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
801 0 : }
802 0 : while let Some(key) = keys_iter.peek() {
803 0 : if key >= &range.end {
804 0 : break;
805 0 : }
806 0 : let key = keys_iter.next().unwrap();
807 0 : if key < range.start {
808 0 : continue;
809 0 : }
810 0 : difficulty =
811 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
812 : }
813 : }
814 0 : difficulty
815 0 : })
816 0 : .collect()
817 0 : }
818 :
819 : /// For each part of a keyspace partitioning, return the maximum number of layers
820 : /// that would be needed for page reconstruction in that part at the given LSN.
821 : ///
822 : /// If `limit` is provided we don't try to count above that number.
823 : ///
824 : /// This method is used to decide where to create new image layers. Computing the
825 : /// result for the entire partitioning at once allows this function to be more
826 : /// efficient, and further optimization is possible by using iterators instead,
827 : /// to allow early return.
828 : ///
829 : /// TODO actually use this method instead of count_deltas. Currently we only use
830 : /// it for benchmarks.
831 0 : pub fn get_difficulty_map(
832 0 : &self,
833 0 : lsn: Lsn,
834 0 : partitioning: &KeyPartitioning,
835 0 : limit: Option<usize>,
836 0 : ) -> Vec<usize> {
837 0 : // TODO This is a naive implementation. Perf improvements to do:
838 0 : // 1. Instead of calling self.image_coverage and self.count_deltas,
839 0 : // iterate the image and delta coverage only once.
840 0 : partitioning
841 0 : .parts
842 0 : .iter()
843 0 : .map(|part| {
844 0 : let mut difficulty = 0;
845 0 : for range in &part.ranges {
846 0 : if limit == Some(difficulty) {
847 0 : break;
848 0 : }
849 0 : for (img_range, last_img) in self.image_coverage(range, lsn) {
850 0 : if limit == Some(difficulty) {
851 0 : break;
852 0 : }
853 0 : let img_lsn = if let Some(last_img) = last_img {
854 0 : last_img.get_lsn_range().end
855 : } else {
856 0 : Lsn(0)
857 : };
858 :
859 0 : if img_lsn < lsn {
860 0 : let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
861 0 : difficulty = std::cmp::max(difficulty, num_deltas);
862 0 : }
863 : }
864 : }
865 0 : difficulty
866 0 : })
867 0 : .collect()
868 0 : }
869 :
870 : /// Return all L0 delta layers
871 3304 : pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
872 3304 : &self.l0_delta_layers
873 3304 : }
874 :
875 : /// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
876 884 : pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
877 884 : self.watch_l0_deltas.subscribe()
878 884 : }
879 :
880 : /// debugging function to print out the contents of the layer map
881 : #[allow(unused)]
882 4 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
883 4 : println!("Begin dump LayerMap");
884 4 :
885 4 : println!("open_layer:");
886 4 : if let Some(open_layer) = &self.open_layer {
887 0 : open_layer.dump(verbose, ctx).await?;
888 4 : }
889 :
890 4 : println!("frozen_layers:");
891 4 : for frozen_layer in self.frozen_layers.iter() {
892 0 : frozen_layer.dump(verbose, ctx).await?;
893 : }
894 :
895 4 : println!("historic_layers:");
896 24 : for desc in self.iter_historic_layers() {
897 24 : desc.dump();
898 24 : }
899 4 : println!("End dump LayerMap");
900 4 : Ok(())
901 4 : }
902 :
903 : /// `read_points` represent the tip of a timeline and any branch points, i.e. the places
904 : /// where we expect to serve reads.
905 : ///
906 : /// This function is O(N) and should be called infrequently. The caller is responsible for
907 : /// looking up and updating the Layer objects for these layer descriptors.
908 468 : pub fn get_visibility(
909 468 : &self,
910 468 : mut read_points: Vec<Lsn>,
911 468 : ) -> (
912 468 : Vec<(Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
913 468 : KeySpace,
914 468 : ) {
915 : // This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
916 : // KeySpace is intended to be composed statically and iterated over.
917 : struct KeyShadow {
918 : // Map of range start to range end
919 : inner: RangeSetBlaze<i128>,
920 : }
921 :
922 : impl KeyShadow {
923 468 : fn new() -> Self {
924 468 : Self {
925 468 : inner: Default::default(),
926 468 : }
927 468 : }
928 :
929 3204 : fn contains(&self, range: Range<Key>) -> bool {
930 3204 : let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
931 3204 : self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
932 3204 : CheckSortedDisjoint::from([range_incl]),
933 3204 : ))
934 3204 : }
935 :
936 : /// Add the input range to the keys covered by self.
937 : ///
938 : /// Return true if inserting this range covered some keys that were previously not covered
939 3812 : fn cover(&mut self, insert: Range<Key>) -> bool {
940 3812 : let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
941 3812 : self.inner.ranges_insert(range_incl)
942 3812 : }
943 :
944 476 : fn reset(&mut self) {
945 476 : self.inner = Default::default();
946 476 : }
947 :
948 468 : fn to_keyspace(&self) -> KeySpace {
949 468 : let mut accum = KeySpaceAccum::new();
950 476 : for range_incl in self.inner.ranges() {
951 476 : let range = Range {
952 476 : start: Key::from_i128(*range_incl.start()),
953 476 : end: Key::from_i128(range_incl.end() + 1),
954 476 : };
955 476 : accum.add_range(range)
956 : }
957 :
958 468 : accum.to_keyspace()
959 468 : }
960 : }
961 :
962 : // The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
963 : // and a ReadPoint
964 468 : read_points.sort_by_key(|rp| rp.0);
965 468 : let mut shadow = KeyShadow::new();
966 :
967 : // We will interleave all our read points and layers into a sorted collection
968 : enum Item {
969 : ReadPoint { lsn: Lsn },
970 : Layer(Arc<PersistentLayerDesc>),
971 : }
972 :
973 468 : let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
974 468 : items.extend(self.iter_historic_layers().map(Item::Layer));
975 468 : items.extend(
976 468 : read_points
977 468 : .into_iter()
978 476 : .map(|rp| Item::ReadPoint { lsn: rp }),
979 468 : );
980 468 :
981 468 : // Ordering: we want to iterate like this:
982 468 : // 1. Highest LSNs first
983 468 : // 2. Consider images before deltas if they end at the same LSNs (images cover deltas)
984 468 : // 3. Consider ReadPoints before image layers if they're at the same LSN (readpoints make that image visible)
985 127160 : items.sort_by_key(|item| {
986 127160 : std::cmp::Reverse(match item {
987 126272 : Item::Layer(layer) => {
988 126272 : if layer.is_delta() {
989 59256 : (Lsn(layer.get_lsn_range().end.0 - 1), 0)
990 : } else {
991 67016 : (layer.image_layer_lsn(), 1)
992 : }
993 : }
994 888 : Item::ReadPoint { lsn } => (*lsn, 2),
995 : })
996 127160 : });
997 468 :
998 468 : let mut results = Vec::with_capacity(self.historic.len());
999 468 :
1000 468 : let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
1001 :
1002 7960 : for item in items {
1003 7492 : let (reached_lsn, is_readpoint) = match &item {
1004 476 : Item::ReadPoint { lsn } => (lsn, true),
1005 7016 : Item::Layer(layer) => (&layer.lsn_range.start, false),
1006 : };
1007 7492 : maybe_covered_deltas.retain(|d| {
1008 212 : if *reached_lsn >= d.lsn_range.start && is_readpoint {
1009 : // We encountered a readpoint within the delta layer: it is visible
1010 :
1011 4 : results.push((d.clone(), LayerVisibilityHint::Visible));
1012 4 : false
1013 208 : } else if *reached_lsn < d.lsn_range.start {
1014 : // We passed the layer's range without encountering a read point: it is not visible
1015 64 : results.push((d.clone(), LayerVisibilityHint::Covered));
1016 64 : false
1017 : } else {
1018 : // We're still in the delta layer: continue iterating
1019 144 : true
1020 : }
1021 7492 : });
1022 7492 :
1023 7492 : match item {
1024 476 : Item::ReadPoint { lsn: _lsn } => {
1025 476 : // TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
1026 476 : // to assume that the whole key range is visible at the branch point.
1027 476 : shadow.reset();
1028 476 : }
1029 7016 : Item::Layer(layer) => {
1030 7016 : let visibility = if layer.is_delta() {
1031 3204 : if shadow.contains(layer.get_key_range()) {
1032 : // If a layer isn't visible based on current state, we must defer deciding whether
1033 : // it is truly not visible until we have advanced past the delta's range: we might
1034 : // encounter another branch point within this delta layer's LSN range.
1035 76 : maybe_covered_deltas.push(layer);
1036 76 : continue;
1037 : } else {
1038 3128 : LayerVisibilityHint::Visible
1039 : }
1040 : } else {
1041 3812 : let modified = shadow.cover(layer.get_key_range());
1042 3812 : if modified {
1043 : // An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
1044 3740 : LayerVisibilityHint::Visible
1045 : } else {
1046 : // An image layer in a region that was already covered
1047 72 : LayerVisibilityHint::Covered
1048 : }
1049 : };
1050 :
1051 6940 : results.push((layer, visibility));
1052 : }
1053 : }
1054 : }
1055 :
1056 : // Drain any remaining maybe_covered deltas
1057 468 : results.extend(
1058 468 : maybe_covered_deltas
1059 468 : .into_iter()
1060 468 : .map(|d| (d, LayerVisibilityHint::Covered)),
1061 468 : );
1062 468 :
1063 468 : (results, shadow.to_keyspace())
1064 468 : }
1065 : }
1066 :
1067 : #[cfg(test)]
1068 : mod tests {
1069 : use crate::tenant::{storage_layer::LayerName, IndexPart};
1070 : use pageserver_api::{
1071 : key::DBDIR_KEY,
1072 : keyspace::{KeySpace, KeySpaceRandomAccum},
1073 : };
1074 : use std::{collections::HashMap, path::PathBuf};
1075 : use utils::{
1076 : id::{TenantId, TimelineId},
1077 : shard::TenantShardId,
1078 : };
1079 :
1080 : use super::*;
1081 :
1082 : #[derive(Clone)]
1083 : struct LayerDesc {
1084 : key_range: Range<Key>,
1085 : lsn_range: Range<Lsn>,
1086 : is_delta: bool,
1087 : }
1088 :
1089 4 : fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
1090 4 : let mut layer_map = LayerMap::default();
1091 :
1092 24 : for layer in layers {
1093 20 : layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
1094 20 : layer.key_range,
1095 20 : layer.lsn_range,
1096 20 : layer.is_delta,
1097 20 : ));
1098 20 : }
1099 :
1100 4 : layer_map.flush_updates();
1101 4 : layer_map
1102 4 : }
1103 :
1104 7080 : fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
1105 7080 : assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
1106 7080 : let lhs: HashMap<SearchResult, KeySpace> = lhs
1107 7080 : .found
1108 7080 : .into_iter()
1109 16460 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
1110 7080 : .collect();
1111 7080 : let rhs: HashMap<SearchResult, KeySpace> = rhs
1112 7080 : .found
1113 7080 : .into_iter()
1114 16460 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
1115 7080 : .collect();
1116 7080 :
1117 7080 : assert_eq!(lhs, rhs);
1118 7080 : }
1119 :
1120 : #[cfg(test)]
1121 7080 : fn brute_force_range_search(
1122 7080 : layer_map: &LayerMap,
1123 7080 : key_range: Range<Key>,
1124 7080 : end_lsn: Lsn,
1125 7080 : ) -> RangeSearchResult {
1126 7080 : let mut range_search_result = RangeSearchResult::new();
1127 7080 :
1128 7080 : let mut key = key_range.start;
1129 151040 : while key != key_range.end {
1130 143960 : let res = layer_map.search(key, end_lsn);
1131 143960 : match res {
1132 122640 : Some(res) => {
1133 122640 : range_search_result
1134 122640 : .found
1135 122640 : .entry(res)
1136 122640 : .or_default()
1137 122640 : .add_key(key);
1138 122640 : }
1139 21320 : None => {
1140 21320 : range_search_result.not_found.add_key(key);
1141 21320 : }
1142 : }
1143 :
1144 143960 : key = key.next();
1145 : }
1146 :
1147 7080 : range_search_result
1148 7080 : }
1149 :
1150 : #[test]
1151 4 : fn ranged_search_on_empty_layer_map() {
1152 4 : let layer_map = LayerMap::default();
1153 4 : let range = Key::from_i128(100)..Key::from_i128(200);
1154 4 :
1155 4 : let res = layer_map.range_search(range.clone(), Lsn(100));
1156 4 : assert_eq!(
1157 4 : res.not_found.to_keyspace(),
1158 4 : KeySpace {
1159 4 : ranges: vec![range]
1160 4 : }
1161 4 : );
1162 4 : }
1163 :
1164 : #[test]
1165 4 : fn ranged_search() {
1166 4 : let layers = vec![
1167 4 : LayerDesc {
1168 4 : key_range: Key::from_i128(15)..Key::from_i128(50),
1169 4 : lsn_range: Lsn(0)..Lsn(5),
1170 4 : is_delta: false,
1171 4 : },
1172 4 : LayerDesc {
1173 4 : key_range: Key::from_i128(10)..Key::from_i128(20),
1174 4 : lsn_range: Lsn(5)..Lsn(20),
1175 4 : is_delta: true,
1176 4 : },
1177 4 : LayerDesc {
1178 4 : key_range: Key::from_i128(15)..Key::from_i128(25),
1179 4 : lsn_range: Lsn(20)..Lsn(30),
1180 4 : is_delta: true,
1181 4 : },
1182 4 : LayerDesc {
1183 4 : key_range: Key::from_i128(35)..Key::from_i128(40),
1184 4 : lsn_range: Lsn(25)..Lsn(35),
1185 4 : is_delta: true,
1186 4 : },
1187 4 : LayerDesc {
1188 4 : key_range: Key::from_i128(35)..Key::from_i128(40),
1189 4 : lsn_range: Lsn(35)..Lsn(40),
1190 4 : is_delta: false,
1191 4 : },
1192 4 : ];
1193 4 :
1194 4 : let layer_map = create_layer_map(layers.clone());
1195 244 : for start in 0..60 {
1196 7080 : for end in (start + 1)..60 {
1197 7080 : let range = Key::from_i128(start)..Key::from_i128(end);
1198 7080 : let result = layer_map.range_search(range.clone(), Lsn(100));
1199 7080 : let expected = brute_force_range_search(&layer_map, range, Lsn(100));
1200 7080 :
1201 7080 : assert_range_search_result_eq(result, expected);
1202 7080 : }
1203 : }
1204 4 : }
1205 :
1206 : #[test]
1207 4 : fn layer_visibility_basic() {
1208 4 : // A simple synthetic input, as a smoke test.
1209 4 : let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
1210 4 : let timeline_id = TimelineId::generate();
1211 4 : let mut layer_map = LayerMap::default();
1212 4 : let mut updates = layer_map.batch_update();
1213 :
1214 : const FAKE_LAYER_SIZE: u64 = 1024;
1215 :
1216 4 : let inject_delta = |updates: &mut BatchedUpdates,
1217 : key_start: i128,
1218 : key_end: i128,
1219 : lsn_start: u64,
1220 28 : lsn_end: u64| {
1221 28 : let desc = PersistentLayerDesc::new_delta(
1222 28 : tenant_shard_id,
1223 28 : timeline_id,
1224 28 : Range {
1225 28 : start: Key::from_i128(key_start),
1226 28 : end: Key::from_i128(key_end),
1227 28 : },
1228 28 : Range {
1229 28 : start: Lsn(lsn_start),
1230 28 : end: Lsn(lsn_end),
1231 28 : },
1232 28 : 1024,
1233 28 : );
1234 28 : updates.insert_historic(desc.clone());
1235 28 : desc
1236 28 : };
1237 :
1238 4 : let inject_image =
1239 24 : |updates: &mut BatchedUpdates, key_start: i128, key_end: i128, lsn: u64| {
1240 24 : let desc = PersistentLayerDesc::new_img(
1241 24 : tenant_shard_id,
1242 24 : timeline_id,
1243 24 : Range {
1244 24 : start: Key::from_i128(key_start),
1245 24 : end: Key::from_i128(key_end),
1246 24 : },
1247 24 : Lsn(lsn),
1248 24 : FAKE_LAYER_SIZE,
1249 24 : );
1250 24 : updates.insert_historic(desc.clone());
1251 24 : desc
1252 24 : };
1253 :
1254 : //
1255 : // Construct our scenario: the following lines go in backward-LSN order, constructing the various scenarios
1256 : // we expect to handle. You can follow these examples through in the same order as they would be processed
1257 : // by the function under test.
1258 : //
1259 :
1260 4 : let mut read_points = vec![Lsn(1000)];
1261 4 :
1262 4 : // A delta ahead of any image layer
1263 4 : let ahead_layer = inject_delta(&mut updates, 10, 20, 101, 110);
1264 4 :
1265 4 : // An image layer is visible and covers some layers beneath itself
1266 4 : let visible_covering_img = inject_image(&mut updates, 5, 25, 99);
1267 4 :
1268 4 : // A delta layer covered by the image layer: should be covered
1269 4 : let covered_delta = inject_delta(&mut updates, 10, 20, 90, 100);
1270 4 :
1271 4 : // A delta layer partially covered by an image layer: should be visible
1272 4 : let partially_covered_delta = inject_delta(&mut updates, 1, 7, 90, 100);
1273 4 :
1274 4 : // A delta layer not covered by an image layer: should be visible
1275 4 : let not_covered_delta = inject_delta(&mut updates, 1, 4, 90, 100);
1276 4 :
1277 4 : // An image layer covered by the image layer above: should be covered
1278 4 : let covered_image = inject_image(&mut updates, 10, 20, 89);
1279 4 :
1280 4 : // An image layer partially covered by an image layer: should be visible
1281 4 : let partially_covered_image = inject_image(&mut updates, 1, 7, 89);
1282 4 :
1283 4 : // An image layer not covered by an image layer: should be visible
1284 4 : let not_covered_image = inject_image(&mut updates, 1, 4, 89);
1285 4 :
1286 4 : // A read point: this will make subsequent layers below here visible, even if there are
1287 4 : // more recent layers covering them.
1288 4 : read_points.push(Lsn(80));
1289 4 :
1290 4 : // A delta layer covered by an earlier image layer, but visible to a readpoint below that covering layer
1291 4 : let covered_delta_below_read_point = inject_delta(&mut updates, 10, 20, 70, 79);
1292 4 :
1293 4 : // A delta layer whose end LSN is covered, but where a read point is present partway through its LSN range:
1294 4 : // the read point should make it visible, even though its end LSN is covered
1295 4 : let covering_img_between_read_points = inject_image(&mut updates, 10, 20, 69);
1296 4 : let covered_delta_between_read_points = inject_delta(&mut updates, 10, 15, 67, 69);
1297 4 : read_points.push(Lsn(65));
1298 4 : let covered_delta_intersects_read_point = inject_delta(&mut updates, 15, 20, 60, 69);
1299 4 :
1300 4 : let visible_img_after_last_read_point = inject_image(&mut updates, 10, 20, 65);
1301 4 :
1302 4 : updates.flush();
1303 4 :
1304 4 : let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
1305 4 : let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
1306 4 :
1307 4 : assert_eq!(
1308 4 : layer_visibilities.get(&ahead_layer),
1309 4 : Some(&LayerVisibilityHint::Visible)
1310 4 : );
1311 4 : assert_eq!(
1312 4 : layer_visibilities.get(&visible_covering_img),
1313 4 : Some(&LayerVisibilityHint::Visible)
1314 4 : );
1315 4 : assert_eq!(
1316 4 : layer_visibilities.get(&covered_delta),
1317 4 : Some(&LayerVisibilityHint::Covered)
1318 4 : );
1319 4 : assert_eq!(
1320 4 : layer_visibilities.get(&partially_covered_delta),
1321 4 : Some(&LayerVisibilityHint::Visible)
1322 4 : );
1323 4 : assert_eq!(
1324 4 : layer_visibilities.get(¬_covered_delta),
1325 4 : Some(&LayerVisibilityHint::Visible)
1326 4 : );
1327 4 : assert_eq!(
1328 4 : layer_visibilities.get(&covered_image),
1329 4 : Some(&LayerVisibilityHint::Covered)
1330 4 : );
1331 4 : assert_eq!(
1332 4 : layer_visibilities.get(&partially_covered_image),
1333 4 : Some(&LayerVisibilityHint::Visible)
1334 4 : );
1335 4 : assert_eq!(
1336 4 : layer_visibilities.get(¬_covered_image),
1337 4 : Some(&LayerVisibilityHint::Visible)
1338 4 : );
1339 4 : assert_eq!(
1340 4 : layer_visibilities.get(&covered_delta_below_read_point),
1341 4 : Some(&LayerVisibilityHint::Visible)
1342 4 : );
1343 4 : assert_eq!(
1344 4 : layer_visibilities.get(&covering_img_between_read_points),
1345 4 : Some(&LayerVisibilityHint::Visible)
1346 4 : );
1347 4 : assert_eq!(
1348 4 : layer_visibilities.get(&covered_delta_between_read_points),
1349 4 : Some(&LayerVisibilityHint::Covered)
1350 4 : );
1351 4 : assert_eq!(
1352 4 : layer_visibilities.get(&covered_delta_intersects_read_point),
1353 4 : Some(&LayerVisibilityHint::Visible)
1354 4 : );
1355 4 : assert_eq!(
1356 4 : layer_visibilities.get(&visible_img_after_last_read_point),
1357 4 : Some(&LayerVisibilityHint::Visible)
1358 4 : );
1359 :
1360 : // Shadow should include all the images below the last read point
1361 4 : let expected_shadow = KeySpace {
1362 4 : ranges: vec![Key::from_i128(10)..Key::from_i128(20)],
1363 4 : };
1364 4 : assert_eq!(shadow, expected_shadow);
1365 4 : }
1366 :
1367 4 : fn fixture_path(relative: &str) -> PathBuf {
1368 4 : PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
1369 4 : }
1370 :
1371 : #[test]
1372 4 : fn layer_visibility_realistic() {
1373 4 : // Load a large example layermap
1374 4 : let index_raw = std::fs::read_to_string(fixture_path(
1375 4 : "test_data/indices/mixed_workload/index_part.json",
1376 4 : ))
1377 4 : .unwrap();
1378 4 : let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
1379 4 :
1380 4 : let tenant_id = TenantId::generate();
1381 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1382 4 : let timeline_id = TimelineId::generate();
1383 4 :
1384 4 : let mut layer_map = LayerMap::default();
1385 4 : let mut updates = layer_map.batch_update();
1386 6260 : for (layer_name, layer_metadata) in index.layer_metadata {
1387 6256 : let layer_desc = match layer_name {
1388 3208 : LayerName::Image(layer_name) => PersistentLayerDesc {
1389 3208 : key_range: layer_name.key_range.clone(),
1390 3208 : lsn_range: layer_name.lsn_as_range(),
1391 3208 : tenant_shard_id,
1392 3208 : timeline_id,
1393 3208 : is_delta: false,
1394 3208 : file_size: layer_metadata.file_size,
1395 3208 : },
1396 3048 : LayerName::Delta(layer_name) => PersistentLayerDesc {
1397 3048 : key_range: layer_name.key_range,
1398 3048 : lsn_range: layer_name.lsn_range,
1399 3048 : tenant_shard_id,
1400 3048 : timeline_id,
1401 3048 : is_delta: true,
1402 3048 : file_size: layer_metadata.file_size,
1403 3048 : },
1404 : };
1405 6256 : updates.insert_historic(layer_desc);
1406 : }
1407 4 : updates.flush();
1408 4 :
1409 4 : let read_points = vec![index.metadata.disk_consistent_lsn()];
1410 4 : let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
1411 6260 : for (layer_desc, visibility) in &layer_visibilities {
1412 6256 : tracing::info!("{layer_desc:?}: {visibility:?}");
1413 6256 : eprintln!("{layer_desc:?}: {visibility:?}");
1414 : }
1415 :
1416 : // The shadow should be non-empty, since there were some image layers
1417 4 : assert!(!shadow.ranges.is_empty());
1418 :
1419 : // At least some layers should be marked covered
1420 4 : assert!(layer_visibilities
1421 4 : .iter()
1422 76 : .any(|i| matches!(i.1, LayerVisibilityHint::Covered)));
1423 :
1424 4 : let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
1425 :
1426 : // Brute force validation: a layer should be marked covered if and only if there are image layers above it in LSN order which cover it
1427 6260 : for (layer_desc, visible) in &layer_visibilities {
1428 6256 : let mut coverage = KeySpaceRandomAccum::new();
1429 6256 : let mut covered_by = Vec::new();
1430 :
1431 9784384 : for other_layer in layer_map.iter_historic_layers() {
1432 9784384 : if &other_layer == layer_desc {
1433 6256 : continue;
1434 9778128 : }
1435 9778128 : if !other_layer.is_delta()
1436 5014104 : && other_layer.image_layer_lsn() >= Lsn(layer_desc.get_lsn_range().end.0 - 1)
1437 2527024 : && other_layer.key_range.start <= layer_desc.key_range.end
1438 930608 : && layer_desc.key_range.start <= other_layer.key_range.end
1439 171292 : {
1440 171292 : coverage.add_range(other_layer.get_key_range());
1441 171292 : covered_by.push((*other_layer).clone());
1442 9606836 : }
1443 : }
1444 6256 : let coverage = coverage.to_keyspace();
1445 :
1446 6256 : let expect_visible = if coverage.ranges.len() == 1
1447 1512 : && coverage.contains(&layer_desc.key_range.start)
1448 68 : && coverage.contains(&Key::from_i128(layer_desc.key_range.end.to_i128() - 1))
1449 : {
1450 40 : LayerVisibilityHint::Covered
1451 : } else {
1452 6216 : LayerVisibilityHint::Visible
1453 : };
1454 :
1455 6256 : if expect_visible != *visible {
1456 0 : eprintln!(
1457 0 : "Layer {}..{} @ {}..{} (delta={}) is {visible:?}, should be {expect_visible:?}",
1458 0 : layer_desc.key_range.start,
1459 0 : layer_desc.key_range.end,
1460 0 : layer_desc.lsn_range.start,
1461 0 : layer_desc.lsn_range.end,
1462 0 : layer_desc.is_delta()
1463 0 : );
1464 0 : if expect_visible == LayerVisibilityHint::Covered {
1465 0 : eprintln!("Covered by:");
1466 0 : for other in covered_by {
1467 0 : eprintln!(
1468 0 : " {}..{} @ {}",
1469 0 : other.get_key_range().start,
1470 0 : other.get_key_range().end,
1471 0 : other.image_layer_lsn()
1472 0 : );
1473 0 : }
1474 0 : if let Some(range) = coverage.ranges.first() {
1475 0 : eprintln!(
1476 0 : "Total coverage from contributing layers: {}..{}",
1477 0 : range.start, range.end
1478 0 : );
1479 0 : } else {
1480 0 : eprintln!(
1481 0 : "Total coverage from contributing layers: {:?}",
1482 0 : coverage.ranges
1483 0 : );
1484 0 : }
1485 0 : }
1486 6256 : }
1487 6256 : assert_eq!(expect_visible, *visible);
1488 : }
1489 :
1490 : // Sanity: the layer that holds latest data for the DBDIR key should always be visible
1491 : // (just using this key as a key that will always exist for any layermap fixture)
1492 4 : let dbdir_layer = layer_map
1493 4 : .search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
1494 4 : .unwrap();
1495 4 : assert!(matches!(
1496 4 : layer_visibilities.get(&dbdir_layer.layer).unwrap(),
1497 : LayerVisibilityHint::Visible
1498 : ));
1499 4 : }
1500 : }
|