Line data Source code
1 : //!
2 : //! The layer map tracks what layers exist in a timeline.
3 : //!
4 : //! When the timeline is first accessed, the server lists of all layer files
5 : //! in the timelines/<timeline_id> directory, and populates this map with
6 : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
7 : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
8 : //! records. Now and then, in the checkpoint() function, the in-memory layer is
9 : //! are frozen, and it is split up into new image and delta layers and the
10 : //! corresponding files are written to disk.
11 : //!
12 : //! Design overview:
13 : //!
14 : //! The `search` method of the layer map is on the read critical path, so we've
15 : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
16 : //! Other read methods are less critical but still impact performance of background tasks.
17 : //!
18 : //! This data structure relies on a persistent/immutable binary search tree. See the
19 : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
20 : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
21 : //! you to modify the tree in such a way that each modification creates a new "version"
22 : //! of the tree. When you modify it, you get a new version, but all previous versions are
23 : //! still accessible too. So if someone is still holding a reference to an older version,
24 : //! they continue to see the tree as it was then. The persistent BST stores all the
25 : //! different versions in an efficient way.
26 : //!
27 : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
28 : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
29 : //! to handle the LSN dimension.
30 : //!
31 : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
32 : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
33 : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
34 : //! `historic_layer_coverage.rs`.
35 : //!
36 : //! To search for a particular key-LSN pair, you first look up the right "version" in the
37 : //! BTreeMap. Then you search that version of the BST with the key.
38 : //!
39 : //! The persistent BST keeps all the versions, but there is no way to change the old versions
40 : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
41 : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
42 : //! to throw away most of the persistent BST and build a new one, starting from the oldest
43 : //! LSN. See [`LayerMap::flush_updates()`].
44 : //!
45 :
46 : mod historic_layer_coverage;
47 : mod layer_coverage;
48 :
49 : use crate::context::RequestContext;
50 : use crate::keyspace::KeyPartitioning;
51 : use crate::repository::Key;
52 : use crate::tenant::storage_layer::InMemoryLayer;
53 : use anyhow::Result;
54 : use pageserver_api::keyspace::KeySpaceAccum;
55 : use std::cmp::Ordering;
56 : use std::collections::{BTreeMap, VecDeque};
57 : use std::iter::Peekable;
58 : use std::ops::Range;
59 : use std::sync::Arc;
60 : use utils::lsn::Lsn;
61 :
62 : use historic_layer_coverage::BufferedHistoricLayerCoverage;
63 : pub use historic_layer_coverage::LayerKey;
64 :
65 : use super::storage_layer::PersistentLayerDesc;
66 :
67 : ///
68 : /// LayerMap tracks what layers exist on a timeline.
69 : ///
70 1572 : #[derive(Default)]
71 : pub struct LayerMap {
72 : //
73 : // 'open_layer' holds the current InMemoryLayer that is accepting new
74 : // records. If it is None, 'next_open_layer_at' will be set instead, indicating
75 : // where the start LSN of the next InMemoryLayer that is to be created.
76 : //
77 : pub open_layer: Option<Arc<InMemoryLayer>>,
78 : pub next_open_layer_at: Option<Lsn>,
79 :
80 : ///
81 : /// Frozen layers, if any. Frozen layers are in-memory layers that
82 : /// are no longer added to, but haven't been written out to disk
83 : /// yet. They contain WAL older than the current 'open_layer' or
84 : /// 'next_open_layer_at', but newer than any historic layer.
85 : /// The frozen layers are in order from oldest to newest, so that
86 : /// the newest one is in the 'back' of the VecDeque, and the oldest
87 : /// in the 'front'.
88 : ///
89 : pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
90 :
91 : /// Index of the historic layers optimized for search
92 : historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
93 :
94 : /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
95 : /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
96 : l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
97 : }
98 :
99 : /// The primary update API for the layer map.
100 : ///
101 : /// Batching historic layer insertions and removals is good for
102 : /// performance and this struct helps us do that correctly.
103 : #[must_use]
104 : pub struct BatchedUpdates<'a> {
105 : // While we hold this exclusive reference to the layer map the type checker
106 : // will prevent us from accidentally reading any unflushed updates.
107 : layer_map: &'a mut LayerMap,
108 : }
109 :
110 : /// Provide ability to batch more updates while hiding the read
111 : /// API so we don't accidentally read without flushing.
112 : impl BatchedUpdates<'_> {
113 : ///
114 : /// Insert an on-disk layer.
115 : ///
116 : // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
117 78770 : pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
118 78770 : self.layer_map.insert_historic_noflush(layer_desc)
119 78770 : }
120 :
121 : ///
122 : /// Remove an on-disk layer from the map.
123 : ///
124 : /// This should be called when the corresponding file on disk has been deleted.
125 : ///
126 5410 : pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
127 5410 : self.layer_map.remove_historic_noflush(layer_desc)
128 5410 : }
129 :
130 : // We will flush on drop anyway, but this method makes it
131 : // more explicit that there is some work being done.
132 : /// Apply all updates
133 7507 : pub fn flush(self) {
134 7507 : // Flush happens on drop
135 7507 : }
136 : }
137 :
138 : // Ideally the flush() method should be called explicitly for more
139 : // controlled execution. But if we forget we'd rather flush on drop
140 : // than panic later or read without flushing.
141 : //
142 : // TODO maybe warn if flush hasn't explicitly been called
143 : impl Drop for BatchedUpdates<'_> {
144 7507 : fn drop(&mut self) {
145 7507 : self.layer_map.flush_updates();
146 7507 : }
147 : }
148 :
149 : /// Return value of LayerMap::search
150 8230 : #[derive(Eq, PartialEq, Debug)]
151 : pub struct SearchResult {
152 : pub layer: Arc<PersistentLayerDesc>,
153 : pub lsn_floor: Lsn,
154 : }
155 :
156 : pub struct OrderedSearchResult(SearchResult);
157 :
158 : impl Ord for OrderedSearchResult {
159 91470 : fn cmp(&self, other: &Self) -> Ordering {
160 91470 : self.0.lsn_floor.cmp(&other.0.lsn_floor)
161 91470 : }
162 : }
163 :
164 : impl PartialOrd for OrderedSearchResult {
165 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
166 0 : Some(self.cmp(other))
167 0 : }
168 : }
169 :
170 : impl PartialEq for OrderedSearchResult {
171 0 : fn eq(&self, other: &Self) -> bool {
172 0 : self.0.lsn_floor == other.0.lsn_floor
173 0 : }
174 : }
175 :
176 : impl Eq for OrderedSearchResult {}
177 :
178 : pub struct RangeSearchResult {
179 : pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
180 : pub not_found: KeySpaceAccum,
181 : }
182 :
183 : impl RangeSearchResult {
184 7080 : fn new() -> Self {
185 7080 : Self {
186 7080 : found: BTreeMap::new(),
187 7080 : not_found: KeySpaceAccum::new(),
188 7080 : }
189 7080 : }
190 : }
191 :
192 : /// Collector for results of range search queries on the LayerMap.
193 : /// It should be provided with two iterators for the delta and image coverage
194 : /// that contain all the changes for layers which intersect the range.
195 : struct RangeSearchCollector<Iter>
196 : where
197 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
198 : {
199 : delta_coverage: Peekable<Iter>,
200 : image_coverage: Peekable<Iter>,
201 : key_range: Range<Key>,
202 : end_lsn: Lsn,
203 :
204 : current_delta: Option<Arc<PersistentLayerDesc>>,
205 : current_image: Option<Arc<PersistentLayerDesc>>,
206 :
207 : result: RangeSearchResult,
208 : }
209 :
210 0 : #[derive(Debug)]
211 : enum NextLayerType {
212 : Delta(i128),
213 : Image(i128),
214 : Both(i128),
215 : }
216 :
217 : impl NextLayerType {
218 13836 : fn next_change_at_key(&self) -> Key {
219 13836 : match self {
220 4600 : NextLayerType::Delta(at) => Key::from_i128(*at),
221 2484 : NextLayerType::Image(at) => Key::from_i128(*at),
222 6752 : NextLayerType::Both(at) => Key::from_i128(*at),
223 : }
224 13836 : }
225 : }
226 :
227 : impl<Iter> RangeSearchCollector<Iter>
228 : where
229 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
230 : {
231 3540 : fn new(
232 3540 : key_range: Range<Key>,
233 3540 : end_lsn: Lsn,
234 3540 : delta_coverage: Iter,
235 3540 : image_coverage: Iter,
236 3540 : ) -> Self {
237 3540 : Self {
238 3540 : delta_coverage: delta_coverage.peekable(),
239 3540 : image_coverage: image_coverage.peekable(),
240 3540 : key_range,
241 3540 : end_lsn,
242 3540 : current_delta: None,
243 3540 : current_image: None,
244 3540 : result: RangeSearchResult::new(),
245 3540 : }
246 3540 : }
247 :
248 : /// Run the collector. Collection is implemented via a two pointer algorithm.
249 : /// One pointer tracks the start of the current range and the other tracks
250 : /// the beginning of the next range which will overlap with the next change
251 : /// in coverage across both image and delta.
252 3540 : fn collect(mut self) -> RangeSearchResult {
253 3540 : let next_layer_type = self.choose_next_layer_type();
254 3358 : let mut current_range_start = match next_layer_type {
255 : None => {
256 : // No changes for the range
257 182 : self.pad_range(self.key_range.clone());
258 182 : return self.result;
259 : }
260 3358 : Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
261 0 : // Changes only after the end of the range
262 0 : self.pad_range(self.key_range.clone());
263 0 : return self.result;
264 : }
265 3358 : Some(layer_type) => {
266 3358 : // Changes for the range exist. Record anything before the first
267 3358 : // coverage change as not found.
268 3358 : let coverage_start = layer_type.next_change_at_key();
269 3358 : let range_before = self.key_range.start..coverage_start;
270 3358 : self.pad_range(range_before);
271 3358 :
272 3358 : self.advance(&layer_type);
273 3358 : coverage_start
274 : }
275 : };
276 :
277 13836 : while current_range_start < self.key_range.end {
278 10478 : let next_layer_type = self.choose_next_layer_type();
279 10478 : match next_layer_type {
280 7120 : Some(t) => {
281 7120 : let current_range_end = t.next_change_at_key();
282 7120 : self.add_range(current_range_start..current_range_end);
283 7120 : current_range_start = current_range_end;
284 7120 :
285 7120 : self.advance(&t);
286 7120 : }
287 3358 : None => {
288 3358 : self.add_range(current_range_start..self.key_range.end);
289 3358 : current_range_start = self.key_range.end;
290 3358 : }
291 : }
292 : }
293 :
294 3358 : self.result
295 3540 : }
296 :
297 : /// Mark a range as not found (i.e. no layers intersect it)
298 4458 : fn pad_range(&mut self, key_range: Range<Key>) {
299 4458 : if !key_range.is_empty() {
300 2080 : self.result.not_found.add_range(key_range);
301 2378 : }
302 4458 : }
303 :
304 : /// Select the appropiate layer for the given range and update
305 : /// the collector.
306 10478 : fn add_range(&mut self, covered_range: Range<Key>) {
307 10478 : let selected = LayerMap::select_layer(
308 10478 : self.current_delta.clone(),
309 10478 : self.current_image.clone(),
310 10478 : self.end_lsn,
311 10478 : );
312 10478 :
313 10478 : match selected {
314 9560 : Some(search_result) => self
315 9560 : .result
316 9560 : .found
317 9560 : .entry(OrderedSearchResult(search_result))
318 9560 : .or_default()
319 9560 : .add_range(covered_range),
320 918 : None => self.pad_range(covered_range),
321 : }
322 10478 : }
323 :
324 : /// Move to the next coverage change.
325 10478 : fn advance(&mut self, layer_type: &NextLayerType) {
326 10478 : match layer_type {
327 3150 : NextLayerType::Delta(_) => {
328 3150 : let (_, layer) = self.delta_coverage.next().unwrap();
329 3150 : self.current_delta = layer;
330 3150 : }
331 1692 : NextLayerType::Image(_) => {
332 1692 : let (_, layer) = self.image_coverage.next().unwrap();
333 1692 : self.current_image = layer;
334 1692 : }
335 5636 : NextLayerType::Both(_) => {
336 5636 : let (_, image_layer) = self.image_coverage.next().unwrap();
337 5636 : let (_, delta_layer) = self.delta_coverage.next().unwrap();
338 5636 :
339 5636 : self.current_image = image_layer;
340 5636 : self.current_delta = delta_layer;
341 5636 : }
342 : }
343 10478 : }
344 :
345 : /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
346 14018 : fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
347 14018 : let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
348 14018 : let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
349 14018 :
350 14018 : match (next_delta_at, next_image_at) {
351 3540 : (None, None) => None,
352 630 : (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
353 1260 : (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
354 8588 : (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
355 432 : Some(NextLayerType::Image(*next_image_at))
356 : }
357 8156 : (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
358 2520 : Some(NextLayerType::Delta(*next_delta_at))
359 : }
360 5636 : (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
361 : }
362 14018 : }
363 : }
364 :
365 : impl LayerMap {
366 : ///
367 : /// Find the latest layer (by lsn.end) that covers the given
368 : /// 'key', with lsn.start < 'end_lsn'.
369 : ///
370 : /// The caller of this function is the page reconstruction
371 : /// algorithm looking for the next relevant delta layer, or
372 : /// the terminal image layer. The caller will pass the lsn_floor
373 : /// value as end_lsn in the next call to search.
374 : ///
375 : /// If there's an image layer exactly below the given end_lsn,
376 : /// search should return that layer regardless if there are
377 : /// overlapping deltas.
378 : ///
379 : /// If the latest layer is a delta and there is an overlapping
380 : /// image with it below, the lsn_floor returned should be right
381 : /// above that image so we don't skip it in the search. Otherwise
382 : /// the lsn_floor returned should be the bottom of the delta layer
383 : /// because we should make as much progress down the lsn axis
384 : /// as possible. It's fine if this way we skip some overlapping
385 : /// deltas, because the delta we returned would contain the same
386 : /// wal content.
387 : ///
388 : /// TODO: This API is convoluted and inefficient. If the caller
389 : /// makes N search calls, we'll end up finding the same latest
390 : /// image layer N times. We should either cache the latest image
391 : /// layer result, or simplify the api to `get_latest_image` and
392 : /// `get_latest_delta`, and only call `get_latest_image` once.
393 : ///
394 : /// NOTE: This only searches the 'historic' layers, *not* the
395 : /// 'open' and 'frozen' layers!
396 : ///
397 24145599 : pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
398 24145599 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
399 24132805 : let latest_delta = version.delta_coverage.query(key.to_i128());
400 24132805 : let latest_image = version.image_coverage.query(key.to_i128());
401 24132805 :
402 24132805 : Self::select_layer(latest_delta, latest_image, end_lsn)
403 24145599 : }
404 :
405 24143283 : fn select_layer(
406 24143283 : delta_layer: Option<Arc<PersistentLayerDesc>>,
407 24143283 : image_layer: Option<Arc<PersistentLayerDesc>>,
408 24143283 : end_lsn: Lsn,
409 24143283 : ) -> Option<SearchResult> {
410 24143283 : assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
411 24143283 : assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
412 :
413 24143283 : match (delta_layer, image_layer) {
414 137851 : (None, None) => None,
415 84084 : (None, Some(image)) => {
416 84084 : let lsn_floor = image.get_lsn_range().start;
417 84084 : Some(SearchResult {
418 84084 : layer: image,
419 84084 : lsn_floor,
420 84084 : })
421 : }
422 19061016 : (Some(delta), None) => {
423 19061016 : let lsn_floor = delta.get_lsn_range().start;
424 19061016 : Some(SearchResult {
425 19061016 : layer: delta,
426 19061016 : lsn_floor,
427 19061016 : })
428 : }
429 4860332 : (Some(delta), Some(image)) => {
430 4860332 : let img_lsn = image.get_lsn_range().start;
431 4860332 : let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
432 4860332 : let image_exact_match = img_lsn + 1 == end_lsn;
433 4860332 : if image_is_newer || image_exact_match {
434 1096341 : Some(SearchResult {
435 1096341 : layer: image,
436 1096341 : lsn_floor: img_lsn,
437 1096341 : })
438 : } else {
439 3763991 : let lsn_floor =
440 3763991 : std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
441 3763991 : Some(SearchResult {
442 3763991 : layer: delta,
443 3763991 : lsn_floor,
444 3763991 : })
445 : }
446 : }
447 : }
448 24143283 : }
449 :
450 3542 : pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
451 3542 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
452 :
453 3540 : let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
454 3540 : let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
455 3540 : let image_changes = version.image_coverage.range_overlaps(&raw_range);
456 3540 :
457 3540 : let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
458 3540 : Some(collector.collect())
459 3542 : }
460 :
461 : /// Start a batch of updates, applied on drop
462 7507 : pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
463 7507 : BatchedUpdates { layer_map: self }
464 7507 : }
465 :
466 : ///
467 : /// Insert an on-disk layer
468 : ///
469 : /// Helper function for BatchedUpdates::insert_historic
470 : ///
471 : /// TODO(chi): remove L generic so that we do not need to pass layer object.
472 78780 : pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
473 78780 : // TODO: See #3869, resulting #4088, attempted fix and repro #4094
474 78780 :
475 78780 : if Self::is_l0(&layer_desc) {
476 6238 : self.l0_delta_layers.push(layer_desc.clone().into());
477 72542 : }
478 :
479 78780 : self.historic.insert(
480 78780 : historic_layer_coverage::LayerKey::from(&layer_desc),
481 78780 : layer_desc.into(),
482 78780 : );
483 78780 : }
484 :
485 : ///
486 : /// Remove an on-disk layer from the map.
487 : ///
488 : /// Helper function for BatchedUpdates::remove_historic
489 : ///
490 5410 : pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
491 5410 : self.historic
492 5410 : .remove(historic_layer_coverage::LayerKey::from(layer_desc));
493 5410 : let layer_key = layer_desc.key();
494 5410 : if Self::is_l0(layer_desc) {
495 4213 : let len_before = self.l0_delta_layers.len();
496 4213 : let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
497 74302 : l0_delta_layers.retain(|other| other.key() != layer_key);
498 4213 : self.l0_delta_layers = l0_delta_layers;
499 4213 : // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
500 4213 : // there's a chance that the comparison fails at runtime due to it comparing (pointer,
501 4213 : // vtable) pairs.
502 4213 : assert_eq!(
503 4213 : self.l0_delta_layers.len(),
504 4213 : len_before - 1,
505 0 : "failed to locate removed historic layer from l0_delta_layers"
506 : );
507 1197 : }
508 5410 : }
509 :
510 : /// Helper function for BatchedUpdates::drop.
511 7509 : pub(self) fn flush_updates(&mut self) {
512 7509 : self.historic.rebuild();
513 7509 : }
514 :
515 : /// Is there a newer image layer for given key- and LSN-range? Or a set
516 : /// of image layers within the specified lsn range that cover the entire
517 : /// specified key range?
518 : ///
519 : /// This is used for garbage collection, to determine if an old layer can
520 : /// be deleted.
521 14763 : pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
522 14763 : if key.is_empty() {
523 : // Vacuously true. There's a newer image for all 0 of the kerys in the range.
524 0 : return true;
525 14763 : }
526 :
527 14763 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
528 14763 : Some(v) => v,
529 0 : None => return false,
530 : };
531 :
532 14763 : let start = key.start.to_i128();
533 14763 : let end = key.end.to_i128();
534 14763 :
535 16089 : let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
536 13140 : Some(layer) => layer.get_lsn_range().start >= lsn.start,
537 2949 : None => false,
538 16089 : };
539 :
540 : // Check the start is covered
541 14763 : if !layer_covers(version.image_coverage.query(start)) {
542 13520 : return false;
543 1243 : }
544 :
545 : // Check after all changes of coverage
546 1326 : for (_, change_val) in version.image_coverage.range(start..end) {
547 1326 : if !layer_covers(change_val) {
548 46 : return false;
549 1280 : }
550 : }
551 :
552 1197 : true
553 14763 : }
554 :
555 6919 : pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
556 6919 : self.historic.iter()
557 6919 : }
558 :
559 : ///
560 : /// Divide the whole given range of keys into sub-ranges based on the latest
561 : /// image layer that covers each range at the specified lsn (inclusive).
562 : /// This is used when creating new image layers.
563 2238562 : pub fn image_coverage(
564 2238562 : &self,
565 2238562 : key_range: &Range<Key>,
566 2238562 : lsn: Lsn,
567 2238562 : ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
568 2238562 : let version = match self.historic.get().unwrap().get_version(lsn.0) {
569 2045633 : Some(v) => v,
570 192929 : None => return vec![],
571 : };
572 :
573 2045633 : let start = key_range.start.to_i128();
574 2045633 : let end = key_range.end.to_i128();
575 2045633 :
576 2045633 : // Initialize loop variables
577 2045633 : let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
578 2045633 : let mut current_key = start;
579 2045633 : let mut current_val = version.image_coverage.query(start);
580 :
581 : // Loop through the change events and push intervals
582 2045633 : for (change_key, change_val) in version.image_coverage.range(start..end) {
583 16129 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
584 16129 : coverage.push((kr, current_val.take()));
585 16129 : current_key = change_key;
586 16129 : current_val = change_val.clone();
587 16129 : }
588 :
589 : // Add the final interval
590 2045633 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
591 2045633 : coverage.push((kr, current_val.take()));
592 2045633 :
593 2045633 : coverage
594 2238562 : }
595 :
596 2258870 : pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
597 2258870 : layer.get_key_range() == (Key::MIN..Key::MAX)
598 2258870 : }
599 :
600 : /// This function determines which layers are counted in `count_deltas`:
601 : /// layers that should count towards deciding whether or not to reimage
602 : /// a certain partition range.
603 : ///
604 : /// There are two kinds of layers we currently consider reimage-worthy:
605 : ///
606 : /// Case 1: Non-L0 layers are currently reimage-worthy by default.
607 : /// TODO Some of these layers are very sparse and cover the entire key
608 : /// range. Replacing 256MB of data (or less!) with terabytes of
609 : /// images doesn't seem wise. We need a better heuristic, possibly
610 : /// based on some of these factors:
611 : /// a) whether this layer has any wal in this partition range
612 : /// b) the size of the layer
613 : /// c) the number of images needed to cover it
614 : /// d) the estimated time until we'll have to reimage over it for GC
615 : ///
616 : /// Case 2: Since L0 layers by definition cover the entire key space, we consider
617 : /// them reimage-worthy only when the entire key space can be covered by very few
618 : /// images (currently 1).
619 : /// TODO The optimal number should probably be slightly higher than 1, but to
620 : /// implement that we need to plumb a lot more context into this function
621 : /// than just the current partition_range.
622 2163950 : pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
623 2163950 : // Case 1
624 2163950 : if !Self::is_l0(layer) {
625 339030 : return true;
626 1824920 : }
627 1824920 :
628 1824920 : // Case 2
629 1824920 : if partition_range == &(Key::MIN..Key::MAX) {
630 0 : return true;
631 1824920 : }
632 1824920 :
633 1824920 : false
634 2163950 : }
635 :
636 : /// Count the height of the tallest stack of reimage-worthy deltas
637 : /// in this 2d region.
638 : ///
639 : /// If `limit` is provided we don't try to count above that number.
640 : ///
641 : /// This number is used to compute the largest number of deltas that
642 : /// we'll need to visit for any page reconstruction in this region.
643 : /// We use this heuristic to decide whether to create an image layer.
644 3984055 : pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
645 3984055 : // We get the delta coverage of the region, and for each part of the coverage
646 3984055 : // we recurse right underneath the delta. The recursion depth is limited by
647 3984055 : // the largest result this function could return, which is in practice between
648 3984055 : // 3 and 10 (since we usually try to create an image when the number gets larger).
649 3984055 :
650 3984055 : if lsn.is_empty() || key.is_empty() || limit == Some(0) {
651 222073 : return 0;
652 3761982 : }
653 :
654 3761982 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
655 2585162 : Some(v) => v,
656 1176820 : None => return 0,
657 : };
658 :
659 2585162 : let start = key.start.to_i128();
660 2585162 : let end = key.end.to_i128();
661 2585162 :
662 2585162 : // Initialize loop variables
663 2585162 : let mut max_stacked_deltas = 0;
664 2585162 : let mut current_key = start;
665 2585162 : let mut current_val = version.delta_coverage.query(start);
666 :
667 : // Loop through the delta coverage and recurse on each part
668 2585162 : for (change_key, change_val) in version.delta_coverage.range(start..end) {
669 : // If there's a relevant delta in this part, add 1 and recurse down
670 40714 : if let Some(val) = current_val {
671 40527 : if val.get_lsn_range().end > lsn.start {
672 12061 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
673 12061 : let lr = lsn.start..val.get_lsn_range().start;
674 12061 : if !kr.is_empty() {
675 9202 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
676 9202 : let new_limit = limit.map(|l| l - base_count);
677 9202 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
678 9202 : max_stacked_deltas = std::cmp::max(
679 9202 : max_stacked_deltas,
680 9202 : base_count + max_stacked_deltas_underneath,
681 9202 : );
682 9202 : }
683 28466 : }
684 187 : }
685 :
686 40714 : current_key = change_key;
687 40714 : current_val = change_val.clone();
688 : }
689 :
690 : // Consider the last part
691 2585161 : if let Some(val) = current_val {
692 2417532 : if val.get_lsn_range().end > lsn.start {
693 2154748 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
694 2154748 : let lr = lsn.start..val.get_lsn_range().start;
695 2154748 :
696 2154748 : if !kr.is_empty() {
697 2154748 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
698 2154748 : let new_limit = limit.map(|l| l - base_count);
699 2154748 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
700 2154748 : max_stacked_deltas = std::cmp::max(
701 2154748 : max_stacked_deltas,
702 2154748 : base_count + max_stacked_deltas_underneath,
703 2154748 : );
704 2154748 : }
705 262784 : }
706 167629 : }
707 :
708 2585161 : max_stacked_deltas
709 3984054 : }
710 :
711 : /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
712 : ///
713 : /// The `partition_range` argument is used as context for the reimage-worthiness decision.
714 : ///
715 : /// Used as a helper for correctness checks only. Performance not critical.
716 0 : pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
717 0 : match self.search(key, lsn) {
718 0 : Some(search_result) => {
719 0 : if search_result.layer.is_incremental() {
720 0 : (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
721 0 : + self.get_difficulty(search_result.lsn_floor, key, partition_range)
722 : } else {
723 0 : 0
724 : }
725 : }
726 0 : None => 0,
727 : }
728 0 : }
729 :
730 : /// Used for correctness checking. Results are expected to be identical to
731 : /// self.get_difficulty_map. Assumes self.search is correct.
732 0 : pub fn get_difficulty_map_bruteforce(
733 0 : &self,
734 0 : lsn: Lsn,
735 0 : partitioning: &KeyPartitioning,
736 0 : ) -> Vec<usize> {
737 0 : // Looking at the difficulty as a function of key, it could only increase
738 0 : // when a delta layer starts or an image layer ends. Therefore it's sufficient
739 0 : // to check the difficulties at:
740 0 : // - the key.start for each non-empty part range
741 0 : // - the key.start for each delta
742 0 : // - the key.end for each image
743 0 : let keys_iter: Box<dyn Iterator<Item = Key>> = {
744 0 : let mut keys: Vec<Key> = self
745 0 : .iter_historic_layers()
746 0 : .map(|layer| {
747 0 : if layer.is_incremental() {
748 0 : layer.get_key_range().start
749 : } else {
750 0 : layer.get_key_range().end
751 : }
752 0 : })
753 0 : .collect();
754 0 : keys.sort();
755 0 : Box::new(keys.into_iter())
756 0 : };
757 0 : let mut keys_iter = keys_iter.peekable();
758 0 :
759 0 : // Iter the partition and keys together and query all the necessary
760 0 : // keys, computing the max difficulty for each part.
761 0 : partitioning
762 0 : .parts
763 0 : .iter()
764 0 : .map(|part| {
765 0 : let mut difficulty = 0;
766 : // Partition ranges are assumed to be sorted and disjoint
767 : // TODO assert it
768 0 : for range in &part.ranges {
769 0 : if !range.is_empty() {
770 0 : difficulty =
771 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
772 0 : }
773 0 : while let Some(key) = keys_iter.peek() {
774 0 : if key >= &range.end {
775 0 : break;
776 0 : }
777 0 : let key = keys_iter.next().unwrap();
778 0 : if key < range.start {
779 0 : continue;
780 0 : }
781 0 : difficulty =
782 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
783 : }
784 : }
785 0 : difficulty
786 0 : })
787 0 : .collect()
788 0 : }
789 :
790 : /// For each part of a keyspace partitioning, return the maximum number of layers
791 : /// that would be needed for page reconstruction in that part at the given LSN.
792 : ///
793 : /// If `limit` is provided we don't try to count above that number.
794 : ///
795 : /// This method is used to decide where to create new image layers. Computing the
796 : /// result for the entire partitioning at once allows this function to be more
797 : /// efficient, and further optimization is possible by using iterators instead,
798 : /// to allow early return.
799 : ///
800 : /// TODO actually use this method instead of count_deltas. Currently we only use
801 : /// it for benchmarks.
802 0 : pub fn get_difficulty_map(
803 0 : &self,
804 0 : lsn: Lsn,
805 0 : partitioning: &KeyPartitioning,
806 0 : limit: Option<usize>,
807 0 : ) -> Vec<usize> {
808 0 : // TODO This is a naive implementation. Perf improvements to do:
809 0 : // 1. Instead of calling self.image_coverage and self.count_deltas,
810 0 : // iterate the image and delta coverage only once.
811 0 : partitioning
812 0 : .parts
813 0 : .iter()
814 0 : .map(|part| {
815 0 : let mut difficulty = 0;
816 0 : for range in &part.ranges {
817 0 : if limit == Some(difficulty) {
818 0 : break;
819 0 : }
820 0 : for (img_range, last_img) in self.image_coverage(range, lsn) {
821 0 : if limit == Some(difficulty) {
822 0 : break;
823 0 : }
824 0 : let img_lsn = if let Some(last_img) = last_img {
825 0 : last_img.get_lsn_range().end
826 : } else {
827 0 : Lsn(0)
828 : };
829 :
830 0 : if img_lsn < lsn {
831 0 : let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
832 0 : difficulty = std::cmp::max(difficulty, num_deltas);
833 0 : }
834 : }
835 : }
836 0 : difficulty
837 0 : })
838 0 : .collect()
839 0 : }
840 :
841 : /// Return all L0 delta layers
842 1521 : pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
843 1521 : Ok(self.l0_delta_layers.to_vec())
844 1521 : }
845 :
846 : /// debugging function to print out the contents of the layer map
847 : #[allow(unused)]
848 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
849 0 : println!("Begin dump LayerMap");
850 0 :
851 0 : println!("open_layer:");
852 0 : if let Some(open_layer) = &self.open_layer {
853 0 : open_layer.dump(verbose, ctx).await?;
854 0 : }
855 :
856 0 : println!("frozen_layers:");
857 0 : for frozen_layer in self.frozen_layers.iter() {
858 0 : frozen_layer.dump(verbose, ctx).await?;
859 : }
860 :
861 0 : println!("historic_layers:");
862 0 : for desc in self.iter_historic_layers() {
863 0 : desc.dump();
864 0 : }
865 0 : println!("End dump LayerMap");
866 0 : Ok(())
867 0 : }
868 : }
869 :
870 : #[cfg(test)]
871 : mod tests {
872 : use super::*;
873 :
874 10 : #[derive(Clone)]
875 : struct LayerDesc {
876 : key_range: Range<Key>,
877 : lsn_range: Range<Lsn>,
878 : is_delta: bool,
879 : }
880 :
881 2 : fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
882 2 : let mut layer_map = LayerMap::default();
883 :
884 12 : for layer in layers {
885 10 : layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
886 10 : layer.key_range,
887 10 : layer.lsn_range,
888 10 : layer.is_delta,
889 10 : ));
890 10 : }
891 :
892 2 : layer_map.flush_updates();
893 2 : layer_map
894 2 : }
895 :
896 3540 : fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
897 3540 : assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
898 3540 : let lhs: Vec<_> = lhs
899 3540 : .found
900 3540 : .into_iter()
901 8230 : .map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
902 3540 : .collect();
903 3540 : let rhs: Vec<_> = rhs
904 3540 : .found
905 3540 : .into_iter()
906 8230 : .map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
907 3540 : .collect();
908 3540 :
909 3540 : assert_eq!(lhs, rhs);
910 3540 : }
911 :
912 3540 : fn brute_force_range_search(
913 3540 : layer_map: &LayerMap,
914 3540 : key_range: Range<Key>,
915 3540 : end_lsn: Lsn,
916 3540 : ) -> RangeSearchResult {
917 3540 : let mut range_search_result = RangeSearchResult::new();
918 3540 :
919 3540 : let mut key = key_range.start;
920 75520 : while key != key_range.end {
921 71980 : let res = layer_map.search(key, end_lsn);
922 71980 : match res {
923 61320 : Some(res) => {
924 61320 : range_search_result
925 61320 : .found
926 61320 : .entry(OrderedSearchResult(res))
927 61320 : .or_default()
928 61320 : .add_key(key);
929 61320 : }
930 10660 : None => {
931 10660 : range_search_result.not_found.add_key(key);
932 10660 : }
933 : }
934 :
935 71980 : key = key.next();
936 : }
937 :
938 3540 : range_search_result
939 3540 : }
940 :
941 2 : #[test]
942 2 : fn ranged_search_on_empty_layer_map() {
943 2 : let layer_map = LayerMap::default();
944 2 : let range = Key::from_i128(100)..Key::from_i128(200);
945 2 :
946 2 : let res = layer_map.range_search(range, Lsn(100));
947 2 : assert!(res.is_none());
948 2 : }
949 :
950 2 : #[test]
951 2 : fn ranged_search() {
952 2 : let layers = vec![
953 2 : LayerDesc {
954 2 : key_range: Key::from_i128(15)..Key::from_i128(50),
955 2 : lsn_range: Lsn(0)..Lsn(5),
956 2 : is_delta: false,
957 2 : },
958 2 : LayerDesc {
959 2 : key_range: Key::from_i128(10)..Key::from_i128(20),
960 2 : lsn_range: Lsn(5)..Lsn(20),
961 2 : is_delta: true,
962 2 : },
963 2 : LayerDesc {
964 2 : key_range: Key::from_i128(15)..Key::from_i128(25),
965 2 : lsn_range: Lsn(20)..Lsn(30),
966 2 : is_delta: true,
967 2 : },
968 2 : LayerDesc {
969 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
970 2 : lsn_range: Lsn(25)..Lsn(35),
971 2 : is_delta: true,
972 2 : },
973 2 : LayerDesc {
974 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
975 2 : lsn_range: Lsn(35)..Lsn(40),
976 2 : is_delta: false,
977 2 : },
978 2 : ];
979 2 :
980 2 : let layer_map = create_layer_map(layers.clone());
981 122 : for start in 0..60 {
982 3540 : for end in (start + 1)..60 {
983 3540 : let range = Key::from_i128(start)..Key::from_i128(end);
984 3540 : let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
985 3540 : let expected = brute_force_range_search(&layer_map, range, Lsn(100));
986 3540 :
987 3540 : assert_range_search_result_eq(result, expected);
988 3540 : }
989 : }
990 2 : }
991 : }
|