Line data Source code
1 : //!
2 : //! The layer map tracks what layers exist in a timeline.
3 : //!
4 : //! When the timeline is first accessed, the server lists of all layer files
5 : //! in the timelines/<timeline_id> directory, and populates this map with
6 : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
7 : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
8 : //! records. Now and then, in the checkpoint() function, the in-memory layer is
9 : //! are frozen, and it is split up into new image and delta layers and the
10 : //! corresponding files are written to disk.
11 : //!
12 : //! Design overview:
13 : //!
14 : //! The `search` method of the layer map is on the read critical path, so we've
15 : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
16 : //! Other read methods are less critical but still impact performance of background tasks.
17 : //!
18 : //! This data structure relies on a persistent/immutable binary search tree. See the
19 : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
20 : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
21 : //! you to modify the tree in such a way that each modification creates a new "version"
22 : //! of the tree. When you modify it, you get a new version, but all previous versions are
23 : //! still accessible too. So if someone is still holding a reference to an older version,
24 : //! they continue to see the tree as it was then. The persistent BST stores all the
25 : //! different versions in an efficient way.
26 : //!
27 : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
28 : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
29 : //! to handle the LSN dimension.
30 : //!
31 : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
32 : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
33 : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
34 : //! `historic_layer_coverage.rs`.
35 : //!
36 : //! To search for a particular key-LSN pair, you first look up the right "version" in the
37 : //! BTreeMap. Then you search that version of the BST with the key.
38 : //!
39 : //! The persistent BST keeps all the versions, but there is no way to change the old versions
40 : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
41 : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
42 : //! to throw away most of the persistent BST and build a new one, starting from the oldest
43 : //! LSN. See [`LayerMap::flush_updates()`].
44 : //!
45 :
46 : mod historic_layer_coverage;
47 : mod layer_coverage;
48 :
49 : use crate::context::RequestContext;
50 : use crate::keyspace::KeyPartitioning;
51 : use crate::repository::Key;
52 : use crate::tenant::storage_layer::InMemoryLayer;
53 : use anyhow::Result;
54 : use pageserver_api::keyspace::KeySpaceAccum;
55 : use std::collections::{HashMap, VecDeque};
56 : use std::iter::Peekable;
57 : use std::ops::Range;
58 : use std::sync::Arc;
59 : use utils::lsn::Lsn;
60 :
61 : use historic_layer_coverage::BufferedHistoricLayerCoverage;
62 : pub use historic_layer_coverage::LayerKey;
63 :
64 : use super::storage_layer::PersistentLayerDesc;
65 :
66 : ///
67 : /// LayerMap tracks what layers exist on a timeline.
68 : ///
69 : #[derive(Default)]
70 : pub struct LayerMap {
71 : //
72 : // 'open_layer' holds the current InMemoryLayer that is accepting new
73 : // records. If it is None, 'next_open_layer_at' will be set instead, indicating
74 : // where the start LSN of the next InMemoryLayer that is to be created.
75 : //
76 : pub open_layer: Option<Arc<InMemoryLayer>>,
77 : pub next_open_layer_at: Option<Lsn>,
78 :
79 : ///
80 : /// Frozen layers, if any. Frozen layers are in-memory layers that
81 : /// are no longer added to, but haven't been written out to disk
82 : /// yet. They contain WAL older than the current 'open_layer' or
83 : /// 'next_open_layer_at', but newer than any historic layer.
84 : /// The frozen layers are in order from oldest to newest, so that
85 : /// the newest one is in the 'back' of the VecDeque, and the oldest
86 : /// in the 'front'.
87 : ///
88 : pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
89 :
90 : /// Index of the historic layers optimized for search
91 : historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
92 :
93 : /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
94 : /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
95 : l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
96 : }
97 :
98 : /// The primary update API for the layer map.
99 : ///
100 : /// Batching historic layer insertions and removals is good for
101 : /// performance and this struct helps us do that correctly.
102 : #[must_use]
103 : pub struct BatchedUpdates<'a> {
104 : // While we hold this exclusive reference to the layer map the type checker
105 : // will prevent us from accidentally reading any unflushed updates.
106 : layer_map: &'a mut LayerMap,
107 : }
108 :
109 : /// Provide ability to batch more updates while hiding the read
110 : /// API so we don't accidentally read without flushing.
111 : impl BatchedUpdates<'_> {
112 : ///
113 : /// Insert an on-disk layer.
114 : ///
115 : // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
116 1540 : pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
117 1540 : self.layer_map.insert_historic_noflush(layer_desc)
118 1540 : }
119 :
120 : ///
121 : /// Remove an on-disk layer from the map.
122 : ///
123 : /// This should be called when the corresponding file on disk has been deleted.
124 : ///
125 426 : pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
126 426 : self.layer_map.remove_historic_noflush(layer_desc)
127 426 : }
128 :
129 : // We will flush on drop anyway, but this method makes it
130 : // more explicit that there is some work being done.
131 : /// Apply all updates
132 1584 : pub fn flush(self) {
133 1584 : // Flush happens on drop
134 1584 : }
135 : }
136 :
137 : // Ideally the flush() method should be called explicitly for more
138 : // controlled execution. But if we forget we'd rather flush on drop
139 : // than panic later or read without flushing.
140 : //
141 : // TODO maybe warn if flush hasn't explicitly been called
142 : impl Drop for BatchedUpdates<'_> {
143 1584 : fn drop(&mut self) {
144 1584 : self.layer_map.flush_updates();
145 1584 : }
146 : }
147 :
148 : /// Return value of LayerMap::search
149 : #[derive(Eq, PartialEq, Debug, Hash)]
150 : pub struct SearchResult {
151 : pub layer: Arc<PersistentLayerDesc>,
152 : pub lsn_floor: Lsn,
153 : }
154 :
155 : /// Return value of [`LayerMap::range_search`]
156 : ///
157 : /// Contains a mapping from a layer description to a keyspace
158 : /// accumulator that contains all the keys which intersect the layer
159 : /// from the original search space. Keys that were not found are accumulated
160 : /// in a separate key space accumulator.
161 : #[derive(Debug)]
162 : pub struct RangeSearchResult {
163 : pub found: HashMap<SearchResult, KeySpaceAccum>,
164 : pub not_found: KeySpaceAccum,
165 : }
166 :
167 : impl RangeSearchResult {
168 79080 : fn new() -> Self {
169 79080 : Self {
170 79080 : found: HashMap::new(),
171 79080 : not_found: KeySpaceAccum::new(),
172 79080 : }
173 79080 : }
174 : }
175 :
176 : /// Collector for results of range search queries on the LayerMap.
177 : /// It should be provided with two iterators for the delta and image coverage
178 : /// that contain all the changes for layers which intersect the range.
179 : struct RangeSearchCollector<Iter>
180 : where
181 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
182 : {
183 : delta_coverage: Peekable<Iter>,
184 : image_coverage: Peekable<Iter>,
185 : key_range: Range<Key>,
186 : end_lsn: Lsn,
187 :
188 : current_delta: Option<Arc<PersistentLayerDesc>>,
189 : current_image: Option<Arc<PersistentLayerDesc>>,
190 :
191 : result: RangeSearchResult,
192 : }
193 :
194 : #[derive(Debug)]
195 : enum NextLayerType {
196 : Delta(i128),
197 : Image(i128),
198 : Both(i128),
199 : }
200 :
201 : impl NextLayerType {
202 141699 : fn next_change_at_key(&self) -> Key {
203 141699 : match self {
204 32031 : NextLayerType::Delta(at) => Key::from_i128(*at),
205 2568 : NextLayerType::Image(at) => Key::from_i128(*at),
206 107100 : NextLayerType::Both(at) => Key::from_i128(*at),
207 : }
208 141699 : }
209 : }
210 :
211 : impl<Iter> RangeSearchCollector<Iter>
212 : where
213 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
214 : {
215 75502 : fn new(
216 75502 : key_range: Range<Key>,
217 75502 : end_lsn: Lsn,
218 75502 : delta_coverage: Iter,
219 75502 : image_coverage: Iter,
220 75502 : ) -> Self {
221 75502 : Self {
222 75502 : delta_coverage: delta_coverage.peekable(),
223 75502 : image_coverage: image_coverage.peekable(),
224 75502 : key_range,
225 75502 : end_lsn,
226 75502 : current_delta: None,
227 75502 : current_image: None,
228 75502 : result: RangeSearchResult::new(),
229 75502 : }
230 75502 : }
231 :
232 : /// Run the collector. Collection is implemented via a two pointer algorithm.
233 : /// One pointer tracks the start of the current range and the other tracks
234 : /// the beginning of the next range which will overlap with the next change
235 : /// in coverage across both image and delta.
236 75502 : fn collect(mut self) -> RangeSearchResult {
237 75502 : let next_layer_type = self.choose_next_layer_type();
238 67272 : let mut current_range_start = match next_layer_type {
239 : None => {
240 : // No changes for the range
241 8230 : self.pad_range(self.key_range.clone());
242 8230 : return self.result;
243 : }
244 67272 : Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
245 0 : // Changes only after the end of the range
246 0 : self.pad_range(self.key_range.clone());
247 0 : return self.result;
248 : }
249 67272 : Some(layer_type) => {
250 67272 : // Changes for the range exist. Record anything before the first
251 67272 : // coverage change as not found.
252 67272 : let coverage_start = layer_type.next_change_at_key();
253 67272 : let range_before = self.key_range.start..coverage_start;
254 67272 : self.pad_range(range_before);
255 67272 :
256 67272 : self.advance(&layer_type);
257 67272 : coverage_start
258 : }
259 : };
260 :
261 141699 : while current_range_start < self.key_range.end {
262 74427 : let next_layer_type = self.choose_next_layer_type();
263 74427 : match next_layer_type {
264 7155 : Some(t) => {
265 7155 : let current_range_end = t.next_change_at_key();
266 7155 : self.add_range(current_range_start..current_range_end);
267 7155 : current_range_start = current_range_end;
268 7155 :
269 7155 : self.advance(&t);
270 7155 : }
271 67272 : None => {
272 67272 : self.add_range(current_range_start..self.key_range.end);
273 67272 : current_range_start = self.key_range.end;
274 67272 : }
275 : }
276 : }
277 :
278 67272 : self.result
279 75502 : }
280 :
281 : /// Mark a range as not found (i.e. no layers intersect it)
282 76434 : fn pad_range(&mut self, key_range: Range<Key>) {
283 76434 : if !key_range.is_empty() {
284 10146 : self.result.not_found.add_range(key_range);
285 66288 : }
286 76434 : }
287 :
288 : /// Select the appropiate layer for the given range and update
289 : /// the collector.
290 74427 : fn add_range(&mut self, covered_range: Range<Key>) {
291 74427 : let selected = LayerMap::select_layer(
292 74427 : self.current_delta.clone(),
293 74427 : self.current_image.clone(),
294 74427 : self.end_lsn,
295 74427 : );
296 74427 :
297 74427 : match selected {
298 73495 : Some(search_result) => self
299 73495 : .result
300 73495 : .found
301 73495 : .entry(search_result)
302 73495 : .or_default()
303 73495 : .add_range(covered_range),
304 932 : None => self.pad_range(covered_range),
305 : }
306 74427 : }
307 :
308 : /// Move to the next coverage change.
309 74427 : fn advance(&mut self, layer_type: &NextLayerType) {
310 74427 : match layer_type {
311 16873 : NextLayerType::Delta(_) => {
312 16873 : let (_, layer) = self.delta_coverage.next().unwrap();
313 16873 : self.current_delta = layer;
314 16873 : }
315 1742 : NextLayerType::Image(_) => {
316 1742 : let (_, layer) = self.image_coverage.next().unwrap();
317 1742 : self.current_image = layer;
318 1742 : }
319 55812 : NextLayerType::Both(_) => {
320 55812 : let (_, image_layer) = self.image_coverage.next().unwrap();
321 55812 : let (_, delta_layer) = self.delta_coverage.next().unwrap();
322 55812 :
323 55812 : self.current_image = image_layer;
324 55812 : self.current_delta = delta_layer;
325 55812 : }
326 : }
327 74427 : }
328 :
329 : /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
330 149929 : fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
331 149929 : let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
332 149929 : let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
333 149929 :
334 149929 : match (next_delta_at, next_image_at) {
335 75502 : (None, None) => None,
336 14350 : (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
337 1304 : (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
338 58773 : (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
339 438 : Some(NextLayerType::Image(*next_image_at))
340 : }
341 58335 : (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
342 2523 : Some(NextLayerType::Delta(*next_delta_at))
343 : }
344 55812 : (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
345 : }
346 149929 : }
347 : }
348 :
349 : impl LayerMap {
350 : ///
351 : /// Find the latest layer (by lsn.end) that covers the given
352 : /// 'key', with lsn.start < 'end_lsn'.
353 : ///
354 : /// The caller of this function is the page reconstruction
355 : /// algorithm looking for the next relevant delta layer, or
356 : /// the terminal image layer. The caller will pass the lsn_floor
357 : /// value as end_lsn in the next call to search.
358 : ///
359 : /// If there's an image layer exactly below the given end_lsn,
360 : /// search should return that layer regardless if there are
361 : /// overlapping deltas.
362 : ///
363 : /// If the latest layer is a delta and there is an overlapping
364 : /// image with it below, the lsn_floor returned should be right
365 : /// above that image so we don't skip it in the search. Otherwise
366 : /// the lsn_floor returned should be the bottom of the delta layer
367 : /// because we should make as much progress down the lsn axis
368 : /// as possible. It's fine if this way we skip some overlapping
369 : /// deltas, because the delta we returned would contain the same
370 : /// wal content.
371 : ///
372 : /// TODO: This API is convoluted and inefficient. If the caller
373 : /// makes N search calls, we'll end up finding the same latest
374 : /// image layer N times. We should either cache the latest image
375 : /// layer result, or simplify the api to `get_latest_image` and
376 : /// `get_latest_delta`, and only call `get_latest_image` once.
377 : ///
378 : /// NOTE: This only searches the 'historic' layers, *not* the
379 : /// 'open' and 'frozen' layers!
380 : ///
381 284059 : pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
382 284059 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
383 283943 : let latest_delta = version.delta_coverage.query(key.to_i128());
384 283943 : let latest_image = version.image_coverage.query(key.to_i128());
385 283943 :
386 283943 : Self::select_layer(latest_delta, latest_image, end_lsn)
387 284059 : }
388 :
389 358370 : fn select_layer(
390 358370 : delta_layer: Option<Arc<PersistentLayerDesc>>,
391 358370 : image_layer: Option<Arc<PersistentLayerDesc>>,
392 358370 : end_lsn: Lsn,
393 358370 : ) -> Option<SearchResult> {
394 358370 : assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
395 358370 : assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
396 :
397 358370 : match (delta_layer, image_layer) {
398 11600 : (None, None) => None,
399 35692 : (None, Some(image)) => {
400 35692 : let lsn_floor = image.get_lsn_range().start;
401 35692 : Some(SearchResult {
402 35692 : layer: image,
403 35692 : lsn_floor,
404 35692 : })
405 : }
406 91632 : (Some(delta), None) => {
407 91632 : let lsn_floor = delta.get_lsn_range().start;
408 91632 : Some(SearchResult {
409 91632 : layer: delta,
410 91632 : lsn_floor,
411 91632 : })
412 : }
413 219446 : (Some(delta), Some(image)) => {
414 219446 : let img_lsn = image.get_lsn_range().start;
415 219446 : let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
416 219446 : let image_exact_match = img_lsn + 1 == end_lsn;
417 219446 : if image_is_newer || image_exact_match {
418 38219 : Some(SearchResult {
419 38219 : layer: image,
420 38219 : lsn_floor: img_lsn,
421 38219 : })
422 : } else {
423 181227 : let lsn_floor =
424 181227 : std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
425 181227 : Some(SearchResult {
426 181227 : layer: delta,
427 181227 : lsn_floor,
428 181227 : })
429 : }
430 : }
431 : }
432 358370 : }
433 :
434 75540 : pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> RangeSearchResult {
435 75540 : let version = match self.historic.get().unwrap().get_version(end_lsn.0 - 1) {
436 75502 : Some(version) => version,
437 : None => {
438 38 : let mut result = RangeSearchResult::new();
439 38 : result.not_found.add_range(key_range);
440 38 : return result;
441 : }
442 : };
443 :
444 75502 : let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
445 75502 : let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
446 75502 : let image_changes = version.image_coverage.range_overlaps(&raw_range);
447 75502 :
448 75502 : let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
449 75502 : collector.collect()
450 75540 : }
451 :
452 : /// Start a batch of updates, applied on drop
453 1584 : pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
454 1584 : BatchedUpdates { layer_map: self }
455 1584 : }
456 :
457 : ///
458 : /// Insert an on-disk layer
459 : ///
460 : /// Helper function for BatchedUpdates::insert_historic
461 : ///
462 : /// TODO(chi): remove L generic so that we do not need to pass layer object.
463 1550 : pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
464 1550 : // TODO: See #3869, resulting #4088, attempted fix and repro #4094
465 1550 :
466 1550 : if Self::is_l0(&layer_desc) {
467 980 : self.l0_delta_layers.push(layer_desc.clone().into());
468 980 : }
469 :
470 1550 : self.historic.insert(
471 1550 : historic_layer_coverage::LayerKey::from(&layer_desc),
472 1550 : layer_desc.into(),
473 1550 : );
474 1550 : }
475 :
476 : ///
477 : /// Remove an on-disk layer from the map.
478 : ///
479 : /// Helper function for BatchedUpdates::remove_historic
480 : ///
481 426 : pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
482 426 : self.historic
483 426 : .remove(historic_layer_coverage::LayerKey::from(layer_desc));
484 426 : let layer_key = layer_desc.key();
485 426 : if Self::is_l0(layer_desc) {
486 404 : let len_before = self.l0_delta_layers.len();
487 404 : let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
488 5524 : l0_delta_layers.retain(|other| other.key() != layer_key);
489 404 : self.l0_delta_layers = l0_delta_layers;
490 404 : // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
491 404 : // there's a chance that the comparison fails at runtime due to it comparing (pointer,
492 404 : // vtable) pairs.
493 404 : assert_eq!(
494 404 : self.l0_delta_layers.len(),
495 404 : len_before - 1,
496 0 : "failed to locate removed historic layer from l0_delta_layers"
497 : );
498 22 : }
499 426 : }
500 :
501 : /// Helper function for BatchedUpdates::drop.
502 1586 : pub(self) fn flush_updates(&mut self) {
503 1586 : self.historic.rebuild();
504 1586 : }
505 :
506 : /// Is there a newer image layer for given key- and LSN-range? Or a set
507 : /// of image layers within the specified lsn range that cover the entire
508 : /// specified key range?
509 : ///
510 : /// This is used for garbage collection, to determine if an old layer can
511 : /// be deleted.
512 11650 : pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
513 11650 : if key.is_empty() {
514 : // Vacuously true. There's a newer image for all 0 of the kerys in the range.
515 0 : return true;
516 11650 : }
517 :
518 11650 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
519 11650 : Some(v) => v,
520 0 : None => return false,
521 : };
522 :
523 11650 : let start = key.start.to_i128();
524 11650 : let end = key.end.to_i128();
525 11650 :
526 11748 : let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
527 11718 : Some(layer) => layer.get_lsn_range().start >= lsn.start,
528 30 : None => false,
529 11748 : };
530 :
531 : // Check the start is covered
532 11650 : if !layer_covers(version.image_coverage.query(start)) {
533 11612 : return false;
534 38 : }
535 :
536 : // Check after all changes of coverage
537 98 : for (_, change_val) in version.image_coverage.range(start..end) {
538 98 : if !layer_covers(change_val) {
539 30 : return false;
540 68 : }
541 : }
542 :
543 8 : true
544 11650 : }
545 :
546 770 : pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
547 770 : self.historic.iter()
548 770 : }
549 :
550 : /// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
551 288 : pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
552 288 : where
553 288 : Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
554 288 : {
555 288 : if let Some(open) = &self.open_layer {
556 28 : if pred(open) {
557 14 : return Some(open.clone());
558 14 : }
559 260 : }
560 :
561 274 : self.frozen_layers.iter().rfind(|l| pred(l)).cloned()
562 288 : }
563 :
564 : ///
565 : /// Divide the whole given range of keys into sub-ranges based on the latest
566 : /// image layer that covers each range at the specified lsn (inclusive).
567 : /// This is used when creating new image layers.
568 14 : pub fn image_coverage(
569 14 : &self,
570 14 : key_range: &Range<Key>,
571 14 : lsn: Lsn,
572 14 : ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
573 14 : let version = match self.historic.get().unwrap().get_version(lsn.0) {
574 14 : Some(v) => v,
575 0 : None => return vec![],
576 : };
577 :
578 14 : let start = key_range.start.to_i128();
579 14 : let end = key_range.end.to_i128();
580 14 :
581 14 : // Initialize loop variables
582 14 : let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
583 14 : let mut current_key = start;
584 14 : let mut current_val = version.image_coverage.query(start);
585 :
586 : // Loop through the change events and push intervals
587 14 : for (change_key, change_val) in version.image_coverage.range(start..end) {
588 0 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
589 0 : coverage.push((kr, current_val.take()));
590 0 : current_key = change_key;
591 0 : current_val.clone_from(&change_val);
592 0 : }
593 :
594 : // Add the final interval
595 14 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
596 14 : coverage.push((kr, current_val.take()));
597 14 :
598 14 : coverage
599 14 : }
600 :
601 2284 : pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
602 2284 : layer.get_key_range() == (Key::MIN..Key::MAX)
603 2284 : }
604 :
605 : /// This function determines which layers are counted in `count_deltas`:
606 : /// layers that should count towards deciding whether or not to reimage
607 : /// a certain partition range.
608 : ///
609 : /// There are two kinds of layers we currently consider reimage-worthy:
610 : ///
611 : /// Case 1: Non-L0 layers are currently reimage-worthy by default.
612 : /// TODO Some of these layers are very sparse and cover the entire key
613 : /// range. Replacing 256MB of data (or less!) with terabytes of
614 : /// images doesn't seem wise. We need a better heuristic, possibly
615 : /// based on some of these factors:
616 : /// a) whether this layer has any wal in this partition range
617 : /// b) the size of the layer
618 : /// c) the number of images needed to cover it
619 : /// d) the estimated time until we'll have to reimage over it for GC
620 : ///
621 : /// Case 2: Since L0 layers by definition cover the entire key space, we consider
622 : /// them reimage-worthy only when the entire key space can be covered by very few
623 : /// images (currently 1).
624 : /// TODO The optimal number should probably be slightly higher than 1, but to
625 : /// implement that we need to plumb a lot more context into this function
626 : /// than just the current partition_range.
627 0 : pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
628 0 : // Case 1
629 0 : if !Self::is_l0(layer) {
630 0 : return true;
631 0 : }
632 0 :
633 0 : // Case 2
634 0 : if partition_range == &(Key::MIN..Key::MAX) {
635 0 : return true;
636 0 : }
637 0 :
638 0 : false
639 0 : }
640 :
641 : /// Count the height of the tallest stack of reimage-worthy deltas
642 : /// in this 2d region.
643 : ///
644 : /// If `limit` is provided we don't try to count above that number.
645 : ///
646 : /// This number is used to compute the largest number of deltas that
647 : /// we'll need to visit for any page reconstruction in this region.
648 : /// We use this heuristic to decide whether to create an image layer.
649 14 : pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
650 14 : // We get the delta coverage of the region, and for each part of the coverage
651 14 : // we recurse right underneath the delta. The recursion depth is limited by
652 14 : // the largest result this function could return, which is in practice between
653 14 : // 3 and 10 (since we usually try to create an image when the number gets larger).
654 14 :
655 14 : if lsn.is_empty() || key.is_empty() || limit == Some(0) {
656 0 : return 0;
657 14 : }
658 :
659 14 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
660 14 : Some(v) => v,
661 0 : None => return 0,
662 : };
663 :
664 14 : let start = key.start.to_i128();
665 14 : let end = key.end.to_i128();
666 14 :
667 14 : // Initialize loop variables
668 14 : let mut max_stacked_deltas = 0;
669 14 : let mut current_key = start;
670 14 : let mut current_val = version.delta_coverage.query(start);
671 :
672 : // Loop through the delta coverage and recurse on each part
673 14 : for (change_key, change_val) in version.delta_coverage.range(start..end) {
674 : // If there's a relevant delta in this part, add 1 and recurse down
675 0 : if let Some(val) = ¤t_val {
676 0 : if val.get_lsn_range().end > lsn.start {
677 0 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
678 0 : let lr = lsn.start..val.get_lsn_range().start;
679 0 : if !kr.is_empty() {
680 0 : let base_count = Self::is_reimage_worthy(val, key) as usize;
681 0 : let new_limit = limit.map(|l| l - base_count);
682 0 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
683 0 : max_stacked_deltas = std::cmp::max(
684 0 : max_stacked_deltas,
685 0 : base_count + max_stacked_deltas_underneath,
686 0 : );
687 0 : }
688 0 : }
689 0 : }
690 :
691 0 : current_key = change_key;
692 0 : current_val.clone_from(&change_val);
693 : }
694 :
695 : // Consider the last part
696 14 : if let Some(val) = ¤t_val {
697 0 : if val.get_lsn_range().end > lsn.start {
698 0 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
699 0 : let lr = lsn.start..val.get_lsn_range().start;
700 0 :
701 0 : if !kr.is_empty() {
702 0 : let base_count = Self::is_reimage_worthy(val, key) as usize;
703 0 : let new_limit = limit.map(|l| l - base_count);
704 0 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
705 0 : max_stacked_deltas = std::cmp::max(
706 0 : max_stacked_deltas,
707 0 : base_count + max_stacked_deltas_underneath,
708 0 : );
709 0 : }
710 0 : }
711 14 : }
712 :
713 14 : max_stacked_deltas
714 14 : }
715 :
716 : /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
717 : ///
718 : /// The `partition_range` argument is used as context for the reimage-worthiness decision.
719 : ///
720 : /// Used as a helper for correctness checks only. Performance not critical.
721 0 : pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
722 0 : match self.search(key, lsn) {
723 0 : Some(search_result) => {
724 0 : if search_result.layer.is_incremental() {
725 0 : (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
726 0 : + self.get_difficulty(search_result.lsn_floor, key, partition_range)
727 : } else {
728 0 : 0
729 : }
730 : }
731 0 : None => 0,
732 : }
733 0 : }
734 :
735 : /// Used for correctness checking. Results are expected to be identical to
736 : /// self.get_difficulty_map. Assumes self.search is correct.
737 0 : pub fn get_difficulty_map_bruteforce(
738 0 : &self,
739 0 : lsn: Lsn,
740 0 : partitioning: &KeyPartitioning,
741 0 : ) -> Vec<usize> {
742 0 : // Looking at the difficulty as a function of key, it could only increase
743 0 : // when a delta layer starts or an image layer ends. Therefore it's sufficient
744 0 : // to check the difficulties at:
745 0 : // - the key.start for each non-empty part range
746 0 : // - the key.start for each delta
747 0 : // - the key.end for each image
748 0 : let keys_iter: Box<dyn Iterator<Item = Key>> = {
749 0 : let mut keys: Vec<Key> = self
750 0 : .iter_historic_layers()
751 0 : .map(|layer| {
752 0 : if layer.is_incremental() {
753 0 : layer.get_key_range().start
754 : } else {
755 0 : layer.get_key_range().end
756 : }
757 0 : })
758 0 : .collect();
759 0 : keys.sort();
760 0 : Box::new(keys.into_iter())
761 0 : };
762 0 : let mut keys_iter = keys_iter.peekable();
763 0 :
764 0 : // Iter the partition and keys together and query all the necessary
765 0 : // keys, computing the max difficulty for each part.
766 0 : partitioning
767 0 : .parts
768 0 : .iter()
769 0 : .map(|part| {
770 0 : let mut difficulty = 0;
771 : // Partition ranges are assumed to be sorted and disjoint
772 : // TODO assert it
773 0 : for range in &part.ranges {
774 0 : if !range.is_empty() {
775 0 : difficulty =
776 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
777 0 : }
778 0 : while let Some(key) = keys_iter.peek() {
779 0 : if key >= &range.end {
780 0 : break;
781 0 : }
782 0 : let key = keys_iter.next().unwrap();
783 0 : if key < range.start {
784 0 : continue;
785 0 : }
786 0 : difficulty =
787 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
788 : }
789 : }
790 0 : difficulty
791 0 : })
792 0 : .collect()
793 0 : }
794 :
795 : /// For each part of a keyspace partitioning, return the maximum number of layers
796 : /// that would be needed for page reconstruction in that part at the given LSN.
797 : ///
798 : /// If `limit` is provided we don't try to count above that number.
799 : ///
800 : /// This method is used to decide where to create new image layers. Computing the
801 : /// result for the entire partitioning at once allows this function to be more
802 : /// efficient, and further optimization is possible by using iterators instead,
803 : /// to allow early return.
804 : ///
805 : /// TODO actually use this method instead of count_deltas. Currently we only use
806 : /// it for benchmarks.
807 0 : pub fn get_difficulty_map(
808 0 : &self,
809 0 : lsn: Lsn,
810 0 : partitioning: &KeyPartitioning,
811 0 : limit: Option<usize>,
812 0 : ) -> Vec<usize> {
813 0 : // TODO This is a naive implementation. Perf improvements to do:
814 0 : // 1. Instead of calling self.image_coverage and self.count_deltas,
815 0 : // iterate the image and delta coverage only once.
816 0 : partitioning
817 0 : .parts
818 0 : .iter()
819 0 : .map(|part| {
820 0 : let mut difficulty = 0;
821 0 : for range in &part.ranges {
822 0 : if limit == Some(difficulty) {
823 0 : break;
824 0 : }
825 0 : for (img_range, last_img) in self.image_coverage(range, lsn) {
826 0 : if limit == Some(difficulty) {
827 0 : break;
828 0 : }
829 0 : let img_lsn = if let Some(last_img) = last_img {
830 0 : last_img.get_lsn_range().end
831 : } else {
832 0 : Lsn(0)
833 : };
834 :
835 0 : if img_lsn < lsn {
836 0 : let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
837 0 : difficulty = std::cmp::max(difficulty, num_deltas);
838 0 : }
839 : }
840 : }
841 0 : difficulty
842 0 : })
843 0 : .collect()
844 0 : }
845 :
846 : /// Return all L0 delta layers
847 370 : pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
848 370 : Ok(self.l0_delta_layers.to_vec())
849 370 : }
850 :
851 : /// debugging function to print out the contents of the layer map
852 : #[allow(unused)]
853 2 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
854 2 : println!("Begin dump LayerMap");
855 2 :
856 2 : println!("open_layer:");
857 2 : if let Some(open_layer) = &self.open_layer {
858 0 : open_layer.dump(verbose, ctx).await?;
859 2 : }
860 :
861 2 : println!("frozen_layers:");
862 2 : for frozen_layer in self.frozen_layers.iter() {
863 0 : frozen_layer.dump(verbose, ctx).await?;
864 : }
865 :
866 2 : println!("historic_layers:");
867 12 : for desc in self.iter_historic_layers() {
868 12 : desc.dump();
869 12 : }
870 2 : println!("End dump LayerMap");
871 2 : Ok(())
872 2 : }
873 : }
874 :
875 : #[cfg(test)]
876 : mod tests {
877 : use pageserver_api::keyspace::KeySpace;
878 :
879 : use super::*;
880 :
881 : #[derive(Clone)]
882 : struct LayerDesc {
883 : key_range: Range<Key>,
884 : lsn_range: Range<Lsn>,
885 : is_delta: bool,
886 : }
887 :
888 2 : fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
889 2 : let mut layer_map = LayerMap::default();
890 :
891 12 : for layer in layers {
892 10 : layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
893 10 : layer.key_range,
894 10 : layer.lsn_range,
895 10 : layer.is_delta,
896 10 : ));
897 10 : }
898 :
899 2 : layer_map.flush_updates();
900 2 : layer_map
901 2 : }
902 :
903 3540 : fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
904 3540 : assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
905 3540 : let lhs: HashMap<SearchResult, KeySpace> = lhs
906 3540 : .found
907 3540 : .into_iter()
908 8230 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
909 3540 : .collect();
910 3540 : let rhs: HashMap<SearchResult, KeySpace> = rhs
911 3540 : .found
912 3540 : .into_iter()
913 8230 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
914 3540 : .collect();
915 3540 :
916 3540 : assert_eq!(lhs, rhs);
917 3540 : }
918 :
919 : #[cfg(test)]
920 3540 : fn brute_force_range_search(
921 3540 : layer_map: &LayerMap,
922 3540 : key_range: Range<Key>,
923 3540 : end_lsn: Lsn,
924 3540 : ) -> RangeSearchResult {
925 3540 : let mut range_search_result = RangeSearchResult::new();
926 3540 :
927 3540 : let mut key = key_range.start;
928 75520 : while key != key_range.end {
929 71980 : let res = layer_map.search(key, end_lsn);
930 71980 : match res {
931 61320 : Some(res) => {
932 61320 : range_search_result
933 61320 : .found
934 61320 : .entry(res)
935 61320 : .or_default()
936 61320 : .add_key(key);
937 61320 : }
938 10660 : None => {
939 10660 : range_search_result.not_found.add_key(key);
940 10660 : }
941 : }
942 :
943 71980 : key = key.next();
944 : }
945 :
946 3540 : range_search_result
947 3540 : }
948 :
949 : #[test]
950 2 : fn ranged_search_on_empty_layer_map() {
951 2 : let layer_map = LayerMap::default();
952 2 : let range = Key::from_i128(100)..Key::from_i128(200);
953 2 :
954 2 : let res = layer_map.range_search(range.clone(), Lsn(100));
955 2 : assert_eq!(
956 2 : res.not_found.to_keyspace(),
957 2 : KeySpace {
958 2 : ranges: vec![range]
959 2 : }
960 2 : );
961 2 : }
962 :
963 : #[test]
964 2 : fn ranged_search() {
965 2 : let layers = vec![
966 2 : LayerDesc {
967 2 : key_range: Key::from_i128(15)..Key::from_i128(50),
968 2 : lsn_range: Lsn(0)..Lsn(5),
969 2 : is_delta: false,
970 2 : },
971 2 : LayerDesc {
972 2 : key_range: Key::from_i128(10)..Key::from_i128(20),
973 2 : lsn_range: Lsn(5)..Lsn(20),
974 2 : is_delta: true,
975 2 : },
976 2 : LayerDesc {
977 2 : key_range: Key::from_i128(15)..Key::from_i128(25),
978 2 : lsn_range: Lsn(20)..Lsn(30),
979 2 : is_delta: true,
980 2 : },
981 2 : LayerDesc {
982 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
983 2 : lsn_range: Lsn(25)..Lsn(35),
984 2 : is_delta: true,
985 2 : },
986 2 : LayerDesc {
987 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
988 2 : lsn_range: Lsn(35)..Lsn(40),
989 2 : is_delta: false,
990 2 : },
991 2 : ];
992 2 :
993 2 : let layer_map = create_layer_map(layers.clone());
994 122 : for start in 0..60 {
995 3540 : for end in (start + 1)..60 {
996 3540 : let range = Key::from_i128(start)..Key::from_i128(end);
997 3540 : let result = layer_map.range_search(range.clone(), Lsn(100));
998 3540 : let expected = brute_force_range_search(&layer_map, range, Lsn(100));
999 3540 :
1000 3540 : assert_range_search_result_eq(result, expected);
1001 3540 : }
1002 : }
1003 2 : }
1004 : }
|