Line data Source code
1 : //!
2 : //! The layer map tracks what layers exist in a timeline.
3 : //!
4 : //! When the timeline is first accessed, the server lists of all layer files
5 : //! in the timelines/<timeline_id> directory, and populates this map with
6 : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
7 : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
8 : //! records. Now and then, in the checkpoint() function, the in-memory layer is
9 : //! are frozen, and it is split up into new image and delta layers and the
10 : //! corresponding files are written to disk.
11 : //!
12 : //! Design overview:
13 : //!
14 : //! The `search` method of the layer map is on the read critical path, so we've
15 : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
16 : //! Other read methods are less critical but still impact performance of background tasks.
17 : //!
18 : //! This data structure relies on a persistent/immutable binary search tree. See the
19 : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
20 : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
21 : //! you to modify the tree in such a way that each modification creates a new "version"
22 : //! of the tree. When you modify it, you get a new version, but all previous versions are
23 : //! still accessible too. So if someone is still holding a reference to an older version,
24 : //! they continue to see the tree as it was then. The persistent BST stores all the
25 : //! different versions in an efficient way.
26 : //!
27 : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
28 : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
29 : //! to handle the LSN dimension.
30 : //!
31 : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
32 : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
33 : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
34 : //! `historic_layer_coverage.rs`.
35 : //!
36 : //! To search for a particular key-LSN pair, you first look up the right "version" in the
37 : //! BTreeMap. Then you search that version of the BST with the key.
38 : //!
39 : //! The persistent BST keeps all the versions, but there is no way to change the old versions
40 : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
41 : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
42 : //! to throw away most of the persistent BST and build a new one, starting from the oldest
43 : //! LSN. See [`LayerMap::flush_updates()`].
44 : //!
45 :
46 : mod historic_layer_coverage;
47 : mod layer_coverage;
48 :
49 : use crate::context::RequestContext;
50 : use crate::keyspace::KeyPartitioning;
51 : use crate::repository::Key;
52 : use crate::tenant::storage_layer::InMemoryLayer;
53 : use anyhow::Result;
54 : use pageserver_api::keyspace::KeySpaceAccum;
55 : use std::collections::{HashMap, VecDeque};
56 : use std::iter::Peekable;
57 : use std::ops::Range;
58 : use std::sync::Arc;
59 : use utils::lsn::Lsn;
60 :
61 : use historic_layer_coverage::BufferedHistoricLayerCoverage;
62 : pub use historic_layer_coverage::LayerKey;
63 :
64 : use super::storage_layer::PersistentLayerDesc;
65 :
66 : ///
67 : /// LayerMap tracks what layers exist on a timeline.
68 : ///
69 300 : #[derive(Default)]
70 : pub struct LayerMap {
71 : //
72 : // 'open_layer' holds the current InMemoryLayer that is accepting new
73 : // records. If it is None, 'next_open_layer_at' will be set instead, indicating
74 : // where the start LSN of the next InMemoryLayer that is to be created.
75 : //
76 : pub open_layer: Option<Arc<InMemoryLayer>>,
77 : pub next_open_layer_at: Option<Lsn>,
78 :
79 : ///
80 : /// Frozen layers, if any. Frozen layers are in-memory layers that
81 : /// are no longer added to, but haven't been written out to disk
82 : /// yet. They contain WAL older than the current 'open_layer' or
83 : /// 'next_open_layer_at', but newer than any historic layer.
84 : /// The frozen layers are in order from oldest to newest, so that
85 : /// the newest one is in the 'back' of the VecDeque, and the oldest
86 : /// in the 'front'.
87 : ///
88 : pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
89 :
90 : /// Index of the historic layers optimized for search
91 : historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
92 :
93 : /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
94 : /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
95 : l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
96 : }
97 :
98 : /// The primary update API for the layer map.
99 : ///
100 : /// Batching historic layer insertions and removals is good for
101 : /// performance and this struct helps us do that correctly.
102 : #[must_use]
103 : pub struct BatchedUpdates<'a> {
104 : // While we hold this exclusive reference to the layer map the type checker
105 : // will prevent us from accidentally reading any unflushed updates.
106 : layer_map: &'a mut LayerMap,
107 : }
108 :
109 : /// Provide ability to batch more updates while hiding the read
110 : /// API so we don't accidentally read without flushing.
111 : impl BatchedUpdates<'_> {
112 : ///
113 : /// Insert an on-disk layer.
114 : ///
115 : // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
116 570 : pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
117 570 : self.layer_map.insert_historic_noflush(layer_desc)
118 570 : }
119 :
120 : ///
121 : /// Remove an on-disk layer from the map.
122 : ///
123 : /// This should be called when the corresponding file on disk has been deleted.
124 : ///
125 302 : pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
126 302 : self.layer_map.remove_historic_noflush(layer_desc)
127 302 : }
128 :
129 : // We will flush on drop anyway, but this method makes it
130 : // more explicit that there is some work being done.
131 : /// Apply all updates
132 970 : pub fn flush(self) {
133 970 : // Flush happens on drop
134 970 : }
135 : }
136 :
137 : // Ideally the flush() method should be called explicitly for more
138 : // controlled execution. But if we forget we'd rather flush on drop
139 : // than panic later or read without flushing.
140 : //
141 : // TODO maybe warn if flush hasn't explicitly been called
142 : impl Drop for BatchedUpdates<'_> {
143 970 : fn drop(&mut self) {
144 970 : self.layer_map.flush_updates();
145 970 : }
146 : }
147 :
148 : /// Return value of LayerMap::search
149 99900 : #[derive(Eq, PartialEq, Debug, Hash)]
150 : pub struct SearchResult {
151 : pub layer: Arc<PersistentLayerDesc>,
152 : pub lsn_floor: Lsn,
153 : }
154 :
155 : /// Return value of [`LayerMap::range_search`]
156 : ///
157 : /// Contains a mapping from a layer description to a keyspace
158 : /// accumulator that contains all the keys which intersect the layer
159 : /// from the original search space. Keys that were not found are accumulated
160 : /// in a separate key space accumulator.
161 0 : #[derive(Debug)]
162 : pub struct RangeSearchResult {
163 : pub found: HashMap<SearchResult, KeySpaceAccum>,
164 : pub not_found: KeySpaceAccum,
165 : }
166 :
167 : impl RangeSearchResult {
168 7090 : fn new() -> Self {
169 7090 : Self {
170 7090 : found: HashMap::new(),
171 7090 : not_found: KeySpaceAccum::new(),
172 7090 : }
173 7090 : }
174 : }
175 :
176 : /// Collector for results of range search queries on the LayerMap.
177 : /// It should be provided with two iterators for the delta and image coverage
178 : /// that contain all the changes for layers which intersect the range.
179 : struct RangeSearchCollector<Iter>
180 : where
181 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
182 : {
183 : delta_coverage: Peekable<Iter>,
184 : image_coverage: Peekable<Iter>,
185 : key_range: Range<Key>,
186 : end_lsn: Lsn,
187 :
188 : current_delta: Option<Arc<PersistentLayerDesc>>,
189 : current_image: Option<Arc<PersistentLayerDesc>>,
190 :
191 : result: RangeSearchResult,
192 : }
193 :
194 0 : #[derive(Debug)]
195 : enum NextLayerType {
196 : Delta(i128),
197 : Image(i128),
198 : Both(i128),
199 : }
200 :
201 : impl NextLayerType {
202 13856 : fn next_change_at_key(&self) -> Key {
203 13856 : match self {
204 4600 : NextLayerType::Delta(at) => Key::from_i128(*at),
205 2484 : NextLayerType::Image(at) => Key::from_i128(*at),
206 6772 : NextLayerType::Both(at) => Key::from_i128(*at),
207 : }
208 13856 : }
209 : }
210 :
211 : impl<Iter> RangeSearchCollector<Iter>
212 : where
213 : Iter: Iterator<Item = (i128, Option<Arc<PersistentLayerDesc>>)>,
214 : {
215 3550 : fn new(
216 3550 : key_range: Range<Key>,
217 3550 : end_lsn: Lsn,
218 3550 : delta_coverage: Iter,
219 3550 : image_coverage: Iter,
220 3550 : ) -> Self {
221 3550 : Self {
222 3550 : delta_coverage: delta_coverage.peekable(),
223 3550 : image_coverage: image_coverage.peekable(),
224 3550 : key_range,
225 3550 : end_lsn,
226 3550 : current_delta: None,
227 3550 : current_image: None,
228 3550 : result: RangeSearchResult::new(),
229 3550 : }
230 3550 : }
231 :
232 : /// Run the collector. Collection is implemented via a two pointer algorithm.
233 : /// One pointer tracks the start of the current range and the other tracks
234 : /// the beginning of the next range which will overlap with the next change
235 : /// in coverage across both image and delta.
236 3550 : fn collect(mut self) -> RangeSearchResult {
237 3550 : let next_layer_type = self.choose_next_layer_type();
238 3368 : let mut current_range_start = match next_layer_type {
239 : None => {
240 : // No changes for the range
241 182 : self.pad_range(self.key_range.clone());
242 182 : return self.result;
243 : }
244 3368 : Some(layer_type) if self.key_range.end <= layer_type.next_change_at_key() => {
245 0 : // Changes only after the end of the range
246 0 : self.pad_range(self.key_range.clone());
247 0 : return self.result;
248 : }
249 3368 : Some(layer_type) => {
250 3368 : // Changes for the range exist. Record anything before the first
251 3368 : // coverage change as not found.
252 3368 : let coverage_start = layer_type.next_change_at_key();
253 3368 : let range_before = self.key_range.start..coverage_start;
254 3368 : self.pad_range(range_before);
255 3368 :
256 3368 : self.advance(&layer_type);
257 3368 : coverage_start
258 : }
259 : };
260 :
261 13856 : while current_range_start < self.key_range.end {
262 10488 : let next_layer_type = self.choose_next_layer_type();
263 10488 : match next_layer_type {
264 7120 : Some(t) => {
265 7120 : let current_range_end = t.next_change_at_key();
266 7120 : self.add_range(current_range_start..current_range_end);
267 7120 : current_range_start = current_range_end;
268 7120 :
269 7120 : self.advance(&t);
270 7120 : }
271 3368 : None => {
272 3368 : self.add_range(current_range_start..self.key_range.end);
273 3368 : current_range_start = self.key_range.end;
274 3368 : }
275 : }
276 : }
277 :
278 3368 : self.result
279 3550 : }
280 :
281 : /// Mark a range as not found (i.e. no layers intersect it)
282 4468 : fn pad_range(&mut self, key_range: Range<Key>) {
283 4468 : if !key_range.is_empty() {
284 2080 : self.result.not_found.add_range(key_range);
285 2388 : }
286 4468 : }
287 :
288 : /// Select the appropiate layer for the given range and update
289 : /// the collector.
290 10488 : fn add_range(&mut self, covered_range: Range<Key>) {
291 10488 : let selected = LayerMap::select_layer(
292 10488 : self.current_delta.clone(),
293 10488 : self.current_image.clone(),
294 10488 : self.end_lsn,
295 10488 : );
296 10488 :
297 10488 : match selected {
298 9570 : Some(search_result) => self
299 9570 : .result
300 9570 : .found
301 9570 : .entry(search_result)
302 9570 : .or_default()
303 9570 : .add_range(covered_range),
304 918 : None => self.pad_range(covered_range),
305 : }
306 10488 : }
307 :
308 : /// Move to the next coverage change.
309 10488 : fn advance(&mut self, layer_type: &NextLayerType) {
310 10488 : match layer_type {
311 3150 : NextLayerType::Delta(_) => {
312 3150 : let (_, layer) = self.delta_coverage.next().unwrap();
313 3150 : self.current_delta = layer;
314 3150 : }
315 1692 : NextLayerType::Image(_) => {
316 1692 : let (_, layer) = self.image_coverage.next().unwrap();
317 1692 : self.current_image = layer;
318 1692 : }
319 5646 : NextLayerType::Both(_) => {
320 5646 : let (_, image_layer) = self.image_coverage.next().unwrap();
321 5646 : let (_, delta_layer) = self.delta_coverage.next().unwrap();
322 5646 :
323 5646 : self.current_image = image_layer;
324 5646 : self.current_delta = delta_layer;
325 5646 : }
326 : }
327 10488 : }
328 :
329 : /// Pick the next coverage change: the one at the lesser key or both if they're alligned.
330 14038 : fn choose_next_layer_type(&mut self) -> Option<NextLayerType> {
331 14038 : let next_delta_at = self.delta_coverage.peek().map(|(key, _)| key);
332 14038 : let next_image_at = self.image_coverage.peek().map(|(key, _)| key);
333 14038 :
334 14038 : match (next_delta_at, next_image_at) {
335 3550 : (None, None) => None,
336 630 : (Some(next_delta_at), None) => Some(NextLayerType::Delta(*next_delta_at)),
337 1260 : (None, Some(next_image_at)) => Some(NextLayerType::Image(*next_image_at)),
338 8598 : (Some(next_delta_at), Some(next_image_at)) if next_image_at < next_delta_at => {
339 432 : Some(NextLayerType::Image(*next_image_at))
340 : }
341 8166 : (Some(next_delta_at), Some(next_image_at)) if next_delta_at < next_image_at => {
342 2520 : Some(NextLayerType::Delta(*next_delta_at))
343 : }
344 5646 : (Some(next_delta_at), Some(_)) => Some(NextLayerType::Both(*next_delta_at)),
345 : }
346 14038 : }
347 : }
348 :
349 0 : #[derive(PartialEq, Eq, Hash, Debug, Clone)]
350 : pub enum InMemoryLayerHandle {
351 : Open {
352 : lsn_floor: Lsn,
353 : end_lsn: Lsn,
354 : },
355 : Frozen {
356 : idx: usize,
357 : lsn_floor: Lsn,
358 : end_lsn: Lsn,
359 : },
360 : }
361 :
362 : impl InMemoryLayerHandle {
363 0 : pub fn get_lsn_floor(&self) -> Lsn {
364 0 : match self {
365 0 : InMemoryLayerHandle::Open { lsn_floor, .. } => *lsn_floor,
366 0 : InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor,
367 : }
368 0 : }
369 :
370 0 : pub fn get_end_lsn(&self) -> Lsn {
371 0 : match self {
372 0 : InMemoryLayerHandle::Open { end_lsn, .. } => *end_lsn,
373 0 : InMemoryLayerHandle::Frozen { end_lsn, .. } => *end_lsn,
374 : }
375 0 : }
376 : }
377 :
378 : impl LayerMap {
379 : ///
380 : /// Find the latest layer (by lsn.end) that covers the given
381 : /// 'key', with lsn.start < 'end_lsn'.
382 : ///
383 : /// The caller of this function is the page reconstruction
384 : /// algorithm looking for the next relevant delta layer, or
385 : /// the terminal image layer. The caller will pass the lsn_floor
386 : /// value as end_lsn in the next call to search.
387 : ///
388 : /// If there's an image layer exactly below the given end_lsn,
389 : /// search should return that layer regardless if there are
390 : /// overlapping deltas.
391 : ///
392 : /// If the latest layer is a delta and there is an overlapping
393 : /// image with it below, the lsn_floor returned should be right
394 : /// above that image so we don't skip it in the search. Otherwise
395 : /// the lsn_floor returned should be the bottom of the delta layer
396 : /// because we should make as much progress down the lsn axis
397 : /// as possible. It's fine if this way we skip some overlapping
398 : /// deltas, because the delta we returned would contain the same
399 : /// wal content.
400 : ///
401 : /// TODO: This API is convoluted and inefficient. If the caller
402 : /// makes N search calls, we'll end up finding the same latest
403 : /// image layer N times. We should either cache the latest image
404 : /// layer result, or simplify the api to `get_latest_image` and
405 : /// `get_latest_delta`, and only call `get_latest_image` once.
406 : ///
407 : /// NOTE: This only searches the 'historic' layers, *not* the
408 : /// 'open' and 'frozen' layers!
409 : ///
410 196110 : pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
411 196110 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
412 196008 : let latest_delta = version.delta_coverage.query(key.to_i128());
413 196008 : let latest_image = version.image_coverage.query(key.to_i128());
414 196008 :
415 196008 : Self::select_layer(latest_delta, latest_image, end_lsn)
416 196110 : }
417 :
418 206496 : fn select_layer(
419 206496 : delta_layer: Option<Arc<PersistentLayerDesc>>,
420 206496 : image_layer: Option<Arc<PersistentLayerDesc>>,
421 206496 : end_lsn: Lsn,
422 206496 : ) -> Option<SearchResult> {
423 206496 : assert!(delta_layer.as_ref().map_or(true, |l| l.is_delta()));
424 206496 : assert!(image_layer.as_ref().map_or(true, |l| !l.is_delta()));
425 :
426 206496 : match (delta_layer, image_layer) {
427 11578 : (None, None) => None,
428 35472 : (None, Some(image)) => {
429 35472 : let lsn_floor = image.get_lsn_range().start;
430 35472 : Some(SearchResult {
431 35472 : layer: image,
432 35472 : lsn_floor,
433 35472 : })
434 : }
435 77143 : (Some(delta), None) => {
436 77143 : let lsn_floor = delta.get_lsn_range().start;
437 77143 : Some(SearchResult {
438 77143 : layer: delta,
439 77143 : lsn_floor,
440 77143 : })
441 : }
442 82303 : (Some(delta), Some(image)) => {
443 82303 : let img_lsn = image.get_lsn_range().start;
444 82303 : let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
445 82303 : let image_exact_match = img_lsn + 1 == end_lsn;
446 82303 : if image_is_newer || image_exact_match {
447 10240 : Some(SearchResult {
448 10240 : layer: image,
449 10240 : lsn_floor: img_lsn,
450 10240 : })
451 : } else {
452 72063 : let lsn_floor =
453 72063 : std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
454 72063 : Some(SearchResult {
455 72063 : layer: delta,
456 72063 : lsn_floor,
457 72063 : })
458 : }
459 : }
460 : }
461 206496 : }
462 :
463 3552 : pub fn range_search(&self, key_range: Range<Key>, end_lsn: Lsn) -> Option<RangeSearchResult> {
464 3552 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
465 :
466 3550 : let raw_range = key_range.start.to_i128()..key_range.end.to_i128();
467 3550 : let delta_changes = version.delta_coverage.range_overlaps(&raw_range);
468 3550 : let image_changes = version.image_coverage.range_overlaps(&raw_range);
469 3550 :
470 3550 : let collector = RangeSearchCollector::new(key_range, end_lsn, delta_changes, image_changes);
471 3550 : Some(collector.collect())
472 3552 : }
473 :
474 : /// Start a batch of updates, applied on drop
475 970 : pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
476 970 : BatchedUpdates { layer_map: self }
477 970 : }
478 :
479 : ///
480 : /// Insert an on-disk layer
481 : ///
482 : /// Helper function for BatchedUpdates::insert_historic
483 : ///
484 : /// TODO(chi): remove L generic so that we do not need to pass layer object.
485 580 : pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
486 580 : // TODO: See #3869, resulting #4088, attempted fix and repro #4094
487 580 :
488 580 : if Self::is_l0(&layer_desc) {
489 462 : self.l0_delta_layers.push(layer_desc.clone().into());
490 462 : }
491 :
492 580 : self.historic.insert(
493 580 : historic_layer_coverage::LayerKey::from(&layer_desc),
494 580 : layer_desc.into(),
495 580 : );
496 580 : }
497 :
498 : ///
499 : /// Remove an on-disk layer from the map.
500 : ///
501 : /// Helper function for BatchedUpdates::remove_historic
502 : ///
503 302 : pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
504 302 : self.historic
505 302 : .remove(historic_layer_coverage::LayerKey::from(layer_desc));
506 302 : let layer_key = layer_desc.key();
507 302 : if Self::is_l0(layer_desc) {
508 300 : let len_before = self.l0_delta_layers.len();
509 300 : let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
510 1650 : l0_delta_layers.retain(|other| other.key() != layer_key);
511 300 : self.l0_delta_layers = l0_delta_layers;
512 300 : // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
513 300 : // there's a chance that the comparison fails at runtime due to it comparing (pointer,
514 300 : // vtable) pairs.
515 300 : assert_eq!(
516 300 : self.l0_delta_layers.len(),
517 300 : len_before - 1,
518 0 : "failed to locate removed historic layer from l0_delta_layers"
519 : );
520 2 : }
521 302 : }
522 :
523 : /// Helper function for BatchedUpdates::drop.
524 972 : pub(self) fn flush_updates(&mut self) {
525 972 : self.historic.rebuild();
526 972 : }
527 :
528 : /// Is there a newer image layer for given key- and LSN-range? Or a set
529 : /// of image layers within the specified lsn range that cover the entire
530 : /// specified key range?
531 : ///
532 : /// This is used for garbage collection, to determine if an old layer can
533 : /// be deleted.
534 1984 : pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
535 1984 : if key.is_empty() {
536 : // Vacuously true. There's a newer image for all 0 of the kerys in the range.
537 0 : return true;
538 1984 : }
539 :
540 1984 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
541 1984 : Some(v) => v,
542 0 : None => return false,
543 : };
544 :
545 1984 : let start = key.start.to_i128();
546 1984 : let end = key.end.to_i128();
547 1984 :
548 1984 : let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
549 1984 : Some(layer) => layer.get_lsn_range().start >= lsn.start,
550 0 : None => false,
551 1984 : };
552 :
553 : // Check the start is covered
554 1984 : if !layer_covers(version.image_coverage.query(start)) {
555 1984 : return false;
556 0 : }
557 :
558 : // Check after all changes of coverage
559 0 : for (_, change_val) in version.image_coverage.range(start..end) {
560 0 : if !layer_covers(change_val) {
561 0 : return false;
562 0 : }
563 : }
564 :
565 0 : true
566 1984 : }
567 :
568 422 : pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
569 422 : self.historic.iter()
570 422 : }
571 :
572 : /// Get a handle for the first in memory layer that matches the provided predicate.
573 : /// The handle should be used with [`Self::get_in_memory_layer`] to retrieve the actual layer.
574 : ///
575 : /// Note: [`Self::find_in_memory_layer`] and [`Self::get_in_memory_layer`] should be called during
576 : /// the same exclusive region established by holding the layer manager lock.
577 20 : pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<InMemoryLayerHandle>
578 20 : where
579 20 : Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
580 20 : {
581 20 : if let Some(open) = &self.open_layer {
582 0 : if pred(open) {
583 0 : return Some(InMemoryLayerHandle::Open {
584 0 : lsn_floor: open.get_lsn_range().start,
585 0 : end_lsn: open.get_lsn_range().end,
586 0 : });
587 0 : }
588 20 : }
589 :
590 20 : let pos = self.frozen_layers.iter().rev().position(pred);
591 20 : pos.map(|rev_idx| {
592 0 : let idx = self.frozen_layers.len() - 1 - rev_idx;
593 0 : InMemoryLayerHandle::Frozen {
594 0 : idx,
595 0 : lsn_floor: self.frozen_layers[idx].get_lsn_range().start,
596 0 : end_lsn: self.frozen_layers[idx].get_lsn_range().end,
597 0 : }
598 20 : })
599 20 : }
600 :
601 : /// Get the layer pointed to by the provided handle.
602 0 : pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option<Arc<InMemoryLayer>> {
603 0 : match handle {
604 0 : InMemoryLayerHandle::Open { .. } => self.open_layer.clone(),
605 0 : InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(),
606 : }
607 0 : }
608 :
609 : ///
610 : /// Divide the whole given range of keys into sub-ranges based on the latest
611 : /// image layer that covers each range at the specified lsn (inclusive).
612 : /// This is used when creating new image layers.
613 2448 : pub fn image_coverage(
614 2448 : &self,
615 2448 : key_range: &Range<Key>,
616 2448 : lsn: Lsn,
617 2448 : ) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
618 2448 : let version = match self.historic.get().unwrap().get_version(lsn.0) {
619 2448 : Some(v) => v,
620 0 : None => return vec![],
621 : };
622 :
623 2448 : let start = key_range.start.to_i128();
624 2448 : let end = key_range.end.to_i128();
625 2448 :
626 2448 : // Initialize loop variables
627 2448 : let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
628 2448 : let mut current_key = start;
629 2448 : let mut current_val = version.image_coverage.query(start);
630 :
631 : // Loop through the change events and push intervals
632 2448 : for (change_key, change_val) in version.image_coverage.range(start..end) {
633 308 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
634 308 : coverage.push((kr, current_val.take()));
635 308 : current_key = change_key;
636 308 : current_val = change_val.clone();
637 308 : }
638 :
639 : // Add the final interval
640 2448 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
641 2448 : coverage.push((kr, current_val.take()));
642 2448 :
643 2448 : coverage
644 2448 : }
645 :
646 1512 : pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
647 1512 : layer.get_key_range() == (Key::MIN..Key::MAX)
648 1512 : }
649 :
650 : /// This function determines which layers are counted in `count_deltas`:
651 : /// layers that should count towards deciding whether or not to reimage
652 : /// a certain partition range.
653 : ///
654 : /// There are two kinds of layers we currently consider reimage-worthy:
655 : ///
656 : /// Case 1: Non-L0 layers are currently reimage-worthy by default.
657 : /// TODO Some of these layers are very sparse and cover the entire key
658 : /// range. Replacing 256MB of data (or less!) with terabytes of
659 : /// images doesn't seem wise. We need a better heuristic, possibly
660 : /// based on some of these factors:
661 : /// a) whether this layer has any wal in this partition range
662 : /// b) the size of the layer
663 : /// c) the number of images needed to cover it
664 : /// d) the estimated time until we'll have to reimage over it for GC
665 : ///
666 : /// Case 2: Since L0 layers by definition cover the entire key space, we consider
667 : /// them reimage-worthy only when the entire key space can be covered by very few
668 : /// images (currently 1).
669 : /// TODO The optimal number should probably be slightly higher than 1, but to
670 : /// implement that we need to plumb a lot more context into this function
671 : /// than just the current partition_range.
672 600 : pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
673 600 : // Case 1
674 600 : if !Self::is_l0(layer) {
675 0 : return true;
676 600 : }
677 600 :
678 600 : // Case 2
679 600 : if partition_range == &(Key::MIN..Key::MAX) {
680 0 : return true;
681 600 : }
682 600 :
683 600 : false
684 600 : }
685 :
686 : /// Count the height of the tallest stack of reimage-worthy deltas
687 : /// in this 2d region.
688 : ///
689 : /// If `limit` is provided we don't try to count above that number.
690 : ///
691 : /// This number is used to compute the largest number of deltas that
692 : /// we'll need to visit for any page reconstruction in this region.
693 : /// We use this heuristic to decide whether to create an image layer.
694 1200 : pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
695 1200 : // We get the delta coverage of the region, and for each part of the coverage
696 1200 : // we recurse right underneath the delta. The recursion depth is limited by
697 1200 : // the largest result this function could return, which is in practice between
698 1200 : // 3 and 10 (since we usually try to create an image when the number gets larger).
699 1200 :
700 1200 : if lsn.is_empty() || key.is_empty() || limit == Some(0) {
701 0 : return 0;
702 1200 : }
703 :
704 1200 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
705 600 : Some(v) => v,
706 600 : None => return 0,
707 : };
708 :
709 600 : let start = key.start.to_i128();
710 600 : let end = key.end.to_i128();
711 600 :
712 600 : // Initialize loop variables
713 600 : let mut max_stacked_deltas = 0;
714 600 : let mut current_key = start;
715 600 : let mut current_val = version.delta_coverage.query(start);
716 :
717 : // Loop through the delta coverage and recurse on each part
718 600 : for (change_key, change_val) in version.delta_coverage.range(start..end) {
719 : // If there's a relevant delta in this part, add 1 and recurse down
720 100 : if let Some(val) = current_val {
721 100 : if val.get_lsn_range().end > lsn.start {
722 100 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
723 100 : let lr = lsn.start..val.get_lsn_range().start;
724 100 : if !kr.is_empty() {
725 0 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
726 0 : let new_limit = limit.map(|l| l - base_count);
727 0 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
728 0 : max_stacked_deltas = std::cmp::max(
729 0 : max_stacked_deltas,
730 0 : base_count + max_stacked_deltas_underneath,
731 0 : );
732 100 : }
733 0 : }
734 0 : }
735 :
736 100 : current_key = change_key;
737 100 : current_val = change_val.clone();
738 : }
739 :
740 : // Consider the last part
741 600 : if let Some(val) = current_val {
742 600 : if val.get_lsn_range().end > lsn.start {
743 600 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
744 600 : let lr = lsn.start..val.get_lsn_range().start;
745 600 :
746 600 : if !kr.is_empty() {
747 600 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
748 600 : let new_limit = limit.map(|l| l - base_count);
749 600 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
750 600 : max_stacked_deltas = std::cmp::max(
751 600 : max_stacked_deltas,
752 600 : base_count + max_stacked_deltas_underneath,
753 600 : );
754 600 : }
755 0 : }
756 0 : }
757 :
758 600 : max_stacked_deltas
759 1200 : }
760 :
761 : /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
762 : ///
763 : /// The `partition_range` argument is used as context for the reimage-worthiness decision.
764 : ///
765 : /// Used as a helper for correctness checks only. Performance not critical.
766 0 : pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
767 0 : match self.search(key, lsn) {
768 0 : Some(search_result) => {
769 0 : if search_result.layer.is_incremental() {
770 0 : (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
771 0 : + self.get_difficulty(search_result.lsn_floor, key, partition_range)
772 : } else {
773 0 : 0
774 : }
775 : }
776 0 : None => 0,
777 : }
778 0 : }
779 :
780 : /// Used for correctness checking. Results are expected to be identical to
781 : /// self.get_difficulty_map. Assumes self.search is correct.
782 0 : pub fn get_difficulty_map_bruteforce(
783 0 : &self,
784 0 : lsn: Lsn,
785 0 : partitioning: &KeyPartitioning,
786 0 : ) -> Vec<usize> {
787 0 : // Looking at the difficulty as a function of key, it could only increase
788 0 : // when a delta layer starts or an image layer ends. Therefore it's sufficient
789 0 : // to check the difficulties at:
790 0 : // - the key.start for each non-empty part range
791 0 : // - the key.start for each delta
792 0 : // - the key.end for each image
793 0 : let keys_iter: Box<dyn Iterator<Item = Key>> = {
794 0 : let mut keys: Vec<Key> = self
795 0 : .iter_historic_layers()
796 0 : .map(|layer| {
797 0 : if layer.is_incremental() {
798 0 : layer.get_key_range().start
799 : } else {
800 0 : layer.get_key_range().end
801 : }
802 0 : })
803 0 : .collect();
804 0 : keys.sort();
805 0 : Box::new(keys.into_iter())
806 0 : };
807 0 : let mut keys_iter = keys_iter.peekable();
808 0 :
809 0 : // Iter the partition and keys together and query all the necessary
810 0 : // keys, computing the max difficulty for each part.
811 0 : partitioning
812 0 : .parts
813 0 : .iter()
814 0 : .map(|part| {
815 0 : let mut difficulty = 0;
816 : // Partition ranges are assumed to be sorted and disjoint
817 : // TODO assert it
818 0 : for range in &part.ranges {
819 0 : if !range.is_empty() {
820 0 : difficulty =
821 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
822 0 : }
823 0 : while let Some(key) = keys_iter.peek() {
824 0 : if key >= &range.end {
825 0 : break;
826 0 : }
827 0 : let key = keys_iter.next().unwrap();
828 0 : if key < range.start {
829 0 : continue;
830 0 : }
831 0 : difficulty =
832 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
833 : }
834 : }
835 0 : difficulty
836 0 : })
837 0 : .collect()
838 0 : }
839 :
840 : /// For each part of a keyspace partitioning, return the maximum number of layers
841 : /// that would be needed for page reconstruction in that part at the given LSN.
842 : ///
843 : /// If `limit` is provided we don't try to count above that number.
844 : ///
845 : /// This method is used to decide where to create new image layers. Computing the
846 : /// result for the entire partitioning at once allows this function to be more
847 : /// efficient, and further optimization is possible by using iterators instead,
848 : /// to allow early return.
849 : ///
850 : /// TODO actually use this method instead of count_deltas. Currently we only use
851 : /// it for benchmarks.
852 0 : pub fn get_difficulty_map(
853 0 : &self,
854 0 : lsn: Lsn,
855 0 : partitioning: &KeyPartitioning,
856 0 : limit: Option<usize>,
857 0 : ) -> Vec<usize> {
858 0 : // TODO This is a naive implementation. Perf improvements to do:
859 0 : // 1. Instead of calling self.image_coverage and self.count_deltas,
860 0 : // iterate the image and delta coverage only once.
861 0 : partitioning
862 0 : .parts
863 0 : .iter()
864 0 : .map(|part| {
865 0 : let mut difficulty = 0;
866 0 : for range in &part.ranges {
867 0 : if limit == Some(difficulty) {
868 0 : break;
869 0 : }
870 0 : for (img_range, last_img) in self.image_coverage(range, lsn) {
871 0 : if limit == Some(difficulty) {
872 0 : break;
873 0 : }
874 0 : let img_lsn = if let Some(last_img) = last_img {
875 0 : last_img.get_lsn_range().end
876 : } else {
877 0 : Lsn(0)
878 : };
879 :
880 0 : if img_lsn < lsn {
881 0 : let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
882 0 : difficulty = std::cmp::max(difficulty, num_deltas);
883 0 : }
884 : }
885 : }
886 0 : difficulty
887 0 : })
888 0 : .collect()
889 0 : }
890 :
891 : /// Return all L0 delta layers
892 410 : pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
893 410 : Ok(self.l0_delta_layers.to_vec())
894 410 : }
895 :
896 : /// debugging function to print out the contents of the layer map
897 : #[allow(unused)]
898 2 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
899 2 : println!("Begin dump LayerMap");
900 2 :
901 2 : println!("open_layer:");
902 2 : if let Some(open_layer) = &self.open_layer {
903 0 : open_layer.dump(verbose, ctx).await?;
904 2 : }
905 :
906 2 : println!("frozen_layers:");
907 2 : for frozen_layer in self.frozen_layers.iter() {
908 0 : frozen_layer.dump(verbose, ctx).await?;
909 : }
910 :
911 2 : println!("historic_layers:");
912 12 : for desc in self.iter_historic_layers() {
913 12 : desc.dump();
914 12 : }
915 2 : println!("End dump LayerMap");
916 2 : Ok(())
917 2 : }
918 : }
919 :
920 : #[cfg(test)]
921 : mod tests {
922 : use pageserver_api::keyspace::KeySpace;
923 :
924 : use super::*;
925 :
926 10 : #[derive(Clone)]
927 : struct LayerDesc {
928 : key_range: Range<Key>,
929 : lsn_range: Range<Lsn>,
930 : is_delta: bool,
931 : }
932 :
933 2 : fn create_layer_map(layers: Vec<LayerDesc>) -> LayerMap {
934 2 : let mut layer_map = LayerMap::default();
935 :
936 12 : for layer in layers {
937 10 : layer_map.insert_historic_noflush(PersistentLayerDesc::new_test(
938 10 : layer.key_range,
939 10 : layer.lsn_range,
940 10 : layer.is_delta,
941 10 : ));
942 10 : }
943 :
944 2 : layer_map.flush_updates();
945 2 : layer_map
946 2 : }
947 :
948 3540 : fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
949 3540 : assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
950 3540 : let lhs: HashMap<SearchResult, KeySpace> = lhs
951 3540 : .found
952 3540 : .into_iter()
953 8230 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
954 3540 : .collect();
955 3540 : let rhs: HashMap<SearchResult, KeySpace> = rhs
956 3540 : .found
957 3540 : .into_iter()
958 8230 : .map(|(search_result, accum)| (search_result, accum.to_keyspace()))
959 3540 : .collect();
960 3540 :
961 3540 : assert_eq!(lhs, rhs);
962 3540 : }
963 :
964 3540 : fn brute_force_range_search(
965 3540 : layer_map: &LayerMap,
966 3540 : key_range: Range<Key>,
967 3540 : end_lsn: Lsn,
968 3540 : ) -> RangeSearchResult {
969 3540 : let mut range_search_result = RangeSearchResult::new();
970 3540 :
971 3540 : let mut key = key_range.start;
972 75520 : while key != key_range.end {
973 71980 : let res = layer_map.search(key, end_lsn);
974 71980 : match res {
975 61320 : Some(res) => {
976 61320 : range_search_result
977 61320 : .found
978 61320 : .entry(res)
979 61320 : .or_default()
980 61320 : .add_key(key);
981 61320 : }
982 10660 : None => {
983 10660 : range_search_result.not_found.add_key(key);
984 10660 : }
985 : }
986 :
987 71980 : key = key.next();
988 : }
989 :
990 3540 : range_search_result
991 3540 : }
992 :
993 2 : #[test]
994 2 : fn ranged_search_on_empty_layer_map() {
995 2 : let layer_map = LayerMap::default();
996 2 : let range = Key::from_i128(100)..Key::from_i128(200);
997 2 :
998 2 : let res = layer_map.range_search(range, Lsn(100));
999 2 : assert!(res.is_none());
1000 2 : }
1001 :
1002 2 : #[test]
1003 2 : fn ranged_search() {
1004 2 : let layers = vec![
1005 2 : LayerDesc {
1006 2 : key_range: Key::from_i128(15)..Key::from_i128(50),
1007 2 : lsn_range: Lsn(0)..Lsn(5),
1008 2 : is_delta: false,
1009 2 : },
1010 2 : LayerDesc {
1011 2 : key_range: Key::from_i128(10)..Key::from_i128(20),
1012 2 : lsn_range: Lsn(5)..Lsn(20),
1013 2 : is_delta: true,
1014 2 : },
1015 2 : LayerDesc {
1016 2 : key_range: Key::from_i128(15)..Key::from_i128(25),
1017 2 : lsn_range: Lsn(20)..Lsn(30),
1018 2 : is_delta: true,
1019 2 : },
1020 2 : LayerDesc {
1021 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
1022 2 : lsn_range: Lsn(25)..Lsn(35),
1023 2 : is_delta: true,
1024 2 : },
1025 2 : LayerDesc {
1026 2 : key_range: Key::from_i128(35)..Key::from_i128(40),
1027 2 : lsn_range: Lsn(35)..Lsn(40),
1028 2 : is_delta: false,
1029 2 : },
1030 2 : ];
1031 2 :
1032 2 : let layer_map = create_layer_map(layers.clone());
1033 122 : for start in 0..60 {
1034 3540 : for end in (start + 1)..60 {
1035 3540 : let range = Key::from_i128(start)..Key::from_i128(end);
1036 3540 : let result = layer_map.range_search(range.clone(), Lsn(100)).unwrap();
1037 3540 : let expected = brute_force_range_search(&layer_map, range, Lsn(100));
1038 3540 :
1039 3540 : assert_range_search_result_eq(result, expected);
1040 3540 : }
1041 : }
1042 2 : }
1043 : }
|