TLA Line data Source code
1 : //!
2 : //! The layer map tracks what layers exist in a timeline.
3 : //!
4 : //! When the timeline is first accessed, the server lists of all layer files
5 : //! in the timelines/<timeline_id> directory, and populates this map with
6 : //! ImageLayer and DeltaLayer structs corresponding to each file. When the first
7 : //! new WAL record is received, we create an InMemoryLayer to hold the incoming
8 : //! records. Now and then, in the checkpoint() function, the in-memory layer is
9 : //! are frozen, and it is split up into new image and delta layers and the
10 : //! corresponding files are written to disk.
11 : //!
12 : //! Design overview:
13 : //!
14 : //! The `search` method of the layer map is on the read critical path, so we've
15 : //! built an efficient data structure for fast reads, stored in `LayerMap::historic`.
16 : //! Other read methods are less critical but still impact performance of background tasks.
17 : //!
18 : //! This data structure relies on a persistent/immutable binary search tree. See the
19 : //! following lecture for an introduction <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
20 : //! Summary: A persistent/immutable BST (and persistent data structures in general) allows
21 : //! you to modify the tree in such a way that each modification creates a new "version"
22 : //! of the tree. When you modify it, you get a new version, but all previous versions are
23 : //! still accessible too. So if someone is still holding a reference to an older version,
24 : //! they continue to see the tree as it was then. The persistent BST stores all the
25 : //! different versions in an efficient way.
26 : //!
27 : //! Our persistent BST maintains a map of which layer file "covers" each key. It has only
28 : //! one dimension, the key. See `layer_coverage.rs`. We use the persistent/immutable property
29 : //! to handle the LSN dimension.
30 : //!
31 : //! To build the layer map, we insert each layer to the persistent BST in LSN.start order,
32 : //! starting from the oldest one. After each insertion, we grab a reference to that "version"
33 : //! of the tree, and store it in another tree, a BtreeMap keyed by the LSN. See
34 : //! `historic_layer_coverage.rs`.
35 : //!
36 : //! To search for a particular key-LSN pair, you first look up the right "version" in the
37 : //! BTreeMap. Then you search that version of the BST with the key.
38 : //!
39 : //! The persistent BST keeps all the versions, but there is no way to change the old versions
40 : //! afterwards. We can add layers as long as they have larger LSNs than any previous layer in
41 : //! the map, but if we need to remove a layer, or insert anything with an older LSN, we need
42 : //! to throw away most of the persistent BST and build a new one, starting from the oldest
43 : //! LSN. See [`LayerMap::flush_updates()`].
44 : //!
45 :
46 : mod historic_layer_coverage;
47 : mod layer_coverage;
48 :
49 : use crate::context::RequestContext;
50 : use crate::keyspace::KeyPartitioning;
51 : use crate::repository::Key;
52 : use crate::tenant::storage_layer::InMemoryLayer;
53 : use anyhow::Result;
54 : use std::collections::VecDeque;
55 : use std::ops::Range;
56 : use std::sync::Arc;
57 : use utils::lsn::Lsn;
58 :
59 : use historic_layer_coverage::BufferedHistoricLayerCoverage;
60 : pub use historic_layer_coverage::LayerKey;
61 :
62 : use super::storage_layer::PersistentLayerDesc;
63 :
64 : ///
65 : /// LayerMap tracks what layers exist on a timeline.
66 : ///
67 CBC 1306 : #[derive(Default)]
68 : pub struct LayerMap {
69 : //
70 : // 'open_layer' holds the current InMemoryLayer that is accepting new
71 : // records. If it is None, 'next_open_layer_at' will be set instead, indicating
72 : // where the start LSN of the next InMemoryLayer that is to be created.
73 : //
74 : pub open_layer: Option<Arc<InMemoryLayer>>,
75 : pub next_open_layer_at: Option<Lsn>,
76 :
77 : ///
78 : /// Frozen layers, if any. Frozen layers are in-memory layers that
79 : /// are no longer added to, but haven't been written out to disk
80 : /// yet. They contain WAL older than the current 'open_layer' or
81 : /// 'next_open_layer_at', but newer than any historic layer.
82 : /// The frozen layers are in order from oldest to newest, so that
83 : /// the newest one is in the 'back' of the VecDeque, and the oldest
84 : /// in the 'front'.
85 : ///
86 : pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
87 :
88 : /// Index of the historic layers optimized for search
89 : historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
90 :
91 : /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
92 : /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
93 : l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
94 : }
95 :
96 : /// The primary update API for the layer map.
97 : ///
98 : /// Batching historic layer insertions and removals is good for
99 : /// performance and this struct helps us do that correctly.
100 : #[must_use]
101 : pub struct BatchedUpdates<'a> {
102 : // While we hold this exclusive reference to the layer map the type checker
103 : // will prevent us from accidentally reading any unflushed updates.
104 : layer_map: &'a mut LayerMap,
105 : }
106 :
107 : /// Provide ability to batch more updates while hiding the read
108 : /// API so we don't accidentally read without flushing.
109 : impl BatchedUpdates<'_> {
110 : ///
111 : /// Insert an on-disk layer.
112 : ///
113 : // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
114 26653 : pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
115 26653 : self.layer_map.insert_historic_noflush(layer_desc)
116 26653 : }
117 :
118 : ///
119 : /// Remove an on-disk layer from the map.
120 : ///
121 : /// This should be called when the corresponding file on disk has been deleted.
122 : ///
123 7552 : pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
124 7552 : self.layer_map.remove_historic_noflush(layer_desc)
125 7552 : }
126 :
127 : // We will flush on drop anyway, but this method makes it
128 : // more explicit that there is some work being done.
129 : /// Apply all updates
130 7281 : pub fn flush(self) {
131 7281 : // Flush happens on drop
132 7281 : }
133 : }
134 :
135 : // Ideally the flush() method should be called explicitly for more
136 : // controlled execution. But if we forget we'd rather flush on drop
137 : // than panic later or read without flushing.
138 : //
139 : // TODO maybe warn if flush hasn't explicitly been called
140 : impl Drop for BatchedUpdates<'_> {
141 7287 : fn drop(&mut self) {
142 7287 : self.layer_map.flush_updates();
143 7287 : }
144 : }
145 :
146 : /// Return value of LayerMap::search
147 : pub struct SearchResult {
148 : pub layer: Arc<PersistentLayerDesc>,
149 : pub lsn_floor: Lsn,
150 : }
151 :
152 : impl LayerMap {
153 : ///
154 : /// Find the latest layer (by lsn.end) that covers the given
155 : /// 'key', with lsn.start < 'end_lsn'.
156 : ///
157 : /// The caller of this function is the page reconstruction
158 : /// algorithm looking for the next relevant delta layer, or
159 : /// the terminal image layer. The caller will pass the lsn_floor
160 : /// value as end_lsn in the next call to search.
161 : ///
162 : /// If there's an image layer exactly below the given end_lsn,
163 : /// search should return that layer regardless if there are
164 : /// overlapping deltas.
165 : ///
166 : /// If the latest layer is a delta and there is an overlapping
167 : /// image with it below, the lsn_floor returned should be right
168 : /// above that image so we don't skip it in the search. Otherwise
169 : /// the lsn_floor returned should be the bottom of the delta layer
170 : /// because we should make as much progress down the lsn axis
171 : /// as possible. It's fine if this way we skip some overlapping
172 : /// deltas, because the delta we returned would contain the same
173 : /// wal content.
174 : ///
175 : /// TODO: This API is convoluted and inefficient. If the caller
176 : /// makes N search calls, we'll end up finding the same latest
177 : /// image layer N times. We should either cache the latest image
178 : /// layer result, or simplify the api to `get_latest_image` and
179 : /// `get_latest_delta`, and only call `get_latest_image` once.
180 : ///
181 : /// NOTE: This only searches the 'historic' layers, *not* the
182 : /// 'open' and 'frozen' layers!
183 : ///
184 23948799 : pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
185 23948799 : let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
186 23941452 : let latest_delta = version.delta_coverage.query(key.to_i128());
187 23941452 : let latest_image = version.image_coverage.query(key.to_i128());
188 23941452 :
189 23941452 : match (latest_delta, latest_image) {
190 89817 : (None, None) => None,
191 31940 : (None, Some(image)) => {
192 31940 : let lsn_floor = image.get_lsn_range().start;
193 31940 : Some(SearchResult {
194 31940 : layer: image,
195 31940 : lsn_floor,
196 31940 : })
197 : }
198 20315338 : (Some(delta), None) => {
199 20315338 : let lsn_floor = delta.get_lsn_range().start;
200 20315338 : Some(SearchResult {
201 20315338 : layer: delta,
202 20315338 : lsn_floor,
203 20315338 : })
204 : }
205 3504357 : (Some(delta), Some(image)) => {
206 3504357 : let img_lsn = image.get_lsn_range().start;
207 3504357 : let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
208 3504357 : let image_exact_match = img_lsn + 1 == end_lsn;
209 3504357 : if image_is_newer || image_exact_match {
210 399942 : Some(SearchResult {
211 399942 : layer: image,
212 399942 : lsn_floor: img_lsn,
213 399942 : })
214 : } else {
215 3104415 : let lsn_floor =
216 3104415 : std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
217 3104415 : Some(SearchResult {
218 3104415 : layer: delta,
219 3104415 : lsn_floor,
220 3104415 : })
221 : }
222 : }
223 : }
224 23948799 : }
225 :
226 : /// Start a batch of updates, applied on drop
227 7292 : pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
228 7292 : BatchedUpdates { layer_map: self }
229 7292 : }
230 :
231 : ///
232 : /// Insert an on-disk layer
233 : ///
234 : /// Helper function for BatchedUpdates::insert_historic
235 : ///
236 : /// TODO(chi): remove L generic so that we do not need to pass layer object.
237 26653 : pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
238 26653 : // TODO: See #3869, resulting #4088, attempted fix and repro #4094
239 26653 :
240 26653 : if Self::is_l0(&layer_desc) {
241 6200 : self.l0_delta_layers.push(layer_desc.clone().into());
242 20453 : }
243 :
244 26653 : self.historic.insert(
245 26653 : historic_layer_coverage::LayerKey::from(&layer_desc),
246 26653 : layer_desc.into(),
247 26653 : );
248 26653 : }
249 :
250 : ///
251 : /// Remove an on-disk layer from the map.
252 : ///
253 : /// Helper function for BatchedUpdates::remove_historic
254 : ///
255 7552 : pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
256 7552 : self.historic
257 7552 : .remove(historic_layer_coverage::LayerKey::from(layer_desc));
258 7552 : let layer_key = layer_desc.key();
259 7552 : if Self::is_l0(layer_desc) {
260 4281 : let len_before = self.l0_delta_layers.len();
261 4281 : let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
262 62335 : l0_delta_layers.retain(|other| other.key() != layer_key);
263 4281 : self.l0_delta_layers = l0_delta_layers;
264 4281 : // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
265 4281 : // there's a chance that the comparison fails at runtime due to it comparing (pointer,
266 4281 : // vtable) pairs.
267 4281 : assert_eq!(
268 4281 : self.l0_delta_layers.len(),
269 4281 : len_before - 1,
270 UBC 0 : "failed to locate removed historic layer from l0_delta_layers"
271 : );
272 CBC 3271 : }
273 7552 : }
274 :
275 : /// Helper function for BatchedUpdates::drop.
276 7287 : pub(self) fn flush_updates(&mut self) {
277 7287 : self.historic.rebuild();
278 7287 : }
279 :
280 : /// Is there a newer image layer for given key- and LSN-range? Or a set
281 : /// of image layers within the specified lsn range that cover the entire
282 : /// specified key range?
283 : ///
284 : /// This is used for garbage collection, to determine if an old layer can
285 : /// be deleted.
286 14558 : pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
287 14558 : if key.is_empty() {
288 : // Vacuously true. There's a newer image for all 0 of the kerys in the range.
289 UBC 0 : return Ok(true);
290 CBC 14558 : }
291 :
292 14558 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
293 14558 : Some(v) => v,
294 UBC 0 : None => return Ok(false),
295 : };
296 :
297 CBC 14558 : let start = key.start.to_i128();
298 14558 : let end = key.end.to_i128();
299 14558 :
300 17024 : let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
301 12586 : Some(layer) => layer.get_lsn_range().start >= lsn.start,
302 4438 : None => false,
303 17024 : };
304 :
305 : // Check the start is covered
306 14558 : if !layer_covers(version.image_coverage.query(start)) {
307 11274 : return Ok(false);
308 3284 : }
309 :
310 : // Check after all changes of coverage
311 3284 : for (_, change_val) in version.image_coverage.range(start..end) {
312 2466 : if !layer_covers(change_val) {
313 15 : return Ok(false);
314 2451 : }
315 : }
316 :
317 3269 : Ok(true)
318 14558 : }
319 :
320 4543 : pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
321 4543 : self.historic.iter()
322 4543 : }
323 :
324 : ///
325 : /// Divide the whole given range of keys into sub-ranges based on the latest
326 : /// image layer that covers each range at the specified lsn (inclusive).
327 : /// This is used when creating new image layers.
328 : ///
329 : // FIXME: clippy complains that the result type is very complex. She's probably
330 : // right...
331 : #[allow(clippy::type_complexity)]
332 2019899 : pub fn image_coverage(
333 2019899 : &self,
334 2019899 : key_range: &Range<Key>,
335 2019899 : lsn: Lsn,
336 2019899 : ) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
337 2019899 : let version = match self.historic.get().unwrap().get_version(lsn.0) {
338 1878766 : Some(v) => v,
339 141133 : None => return Ok(vec![]),
340 : };
341 :
342 1878766 : let start = key_range.start.to_i128();
343 1878766 : let end = key_range.end.to_i128();
344 1878766 :
345 1878766 : // Initialize loop variables
346 1878766 : let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
347 1878766 : let mut current_key = start;
348 1878766 : let mut current_val = version.image_coverage.query(start);
349 :
350 : // Loop through the change events and push intervals
351 1878766 : for (change_key, change_val) in version.image_coverage.range(start..end) {
352 5159 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
353 5159 : coverage.push((kr, current_val.take()));
354 5159 : current_key = change_key;
355 5159 : current_val = change_val.clone();
356 5159 : }
357 :
358 : // Add the final interval
359 1878766 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
360 1878766 : coverage.push((kr, current_val.take()));
361 1878766 :
362 1878766 : Ok(coverage)
363 2019899 : }
364 :
365 7194359 : pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
366 7194359 : layer.get_key_range() == (Key::MIN..Key::MAX)
367 7194359 : }
368 :
369 : /// This function determines which layers are counted in `count_deltas`:
370 : /// layers that should count towards deciding whether or not to reimage
371 : /// a certain partition range.
372 : ///
373 : /// There are two kinds of layers we currently consider reimage-worthy:
374 : ///
375 : /// Case 1: Non-L0 layers are currently reimage-worthy by default.
376 : /// TODO Some of these layers are very sparse and cover the entire key
377 : /// range. Replacing 256MB of data (or less!) with terabytes of
378 : /// images doesn't seem wise. We need a better heuristic, possibly
379 : /// based on some of these factors:
380 : /// a) whether this layer has any wal in this partition range
381 : /// b) the size of the layer
382 : /// c) the number of images needed to cover it
383 : /// d) the estimated time until we'll have to reimage over it for GC
384 : ///
385 : /// Case 2: Since L0 layers by definition cover the entire key space, we consider
386 : /// them reimage-worthy only when the entire key space can be covered by very few
387 : /// images (currently 1).
388 : /// TODO The optimal number should probably be slightly higher than 1, but to
389 : /// implement that we need to plumb a lot more context into this function
390 : /// than just the current partition_range.
391 7146362 : pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
392 7146362 : // Case 1
393 7146362 : if !Self::is_l0(layer) {
394 200956 : return true;
395 6945406 : }
396 6945406 :
397 6945406 : // Case 2
398 6945406 : if partition_range == &(Key::MIN..Key::MAX) {
399 UBC 0 : return true;
400 CBC 6945406 : }
401 6945406 :
402 6945406 : false
403 7146362 : }
404 :
405 : /// Count the height of the tallest stack of reimage-worthy deltas
406 : /// in this 2d region.
407 : ///
408 : /// If `limit` is provided we don't try to count above that number.
409 : ///
410 : /// This number is used to compute the largest number of deltas that
411 : /// we'll need to visit for any page reconstruction in this region.
412 : /// We use this heuristic to decide whether to create an image layer.
413 8801773 : pub fn count_deltas(
414 8801773 : &self,
415 8801773 : key: &Range<Key>,
416 8801773 : lsn: &Range<Lsn>,
417 8801773 : limit: Option<usize>,
418 8801773 : ) -> Result<usize> {
419 8801773 : // We get the delta coverage of the region, and for each part of the coverage
420 8801773 : // we recurse right underneath the delta. The recursion depth is limited by
421 8801773 : // the largest result this function could return, which is in practice between
422 8801773 : // 3 and 10 (since we usually try to create an image when the number gets larger).
423 8801773 :
424 8801773 : if lsn.is_empty() || key.is_empty() || limit == Some(0) {
425 204902 : return Ok(0);
426 8596871 : }
427 :
428 8596871 : let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
429 7510486 : Some(v) => v,
430 1086385 : None => return Ok(0),
431 : };
432 :
433 7510486 : let start = key.start.to_i128();
434 7510486 : let end = key.end.to_i128();
435 7510486 :
436 7510486 : // Initialize loop variables
437 7510486 : let mut max_stacked_deltas = 0;
438 7510486 : let mut current_key = start;
439 7510486 : let mut current_val = version.delta_coverage.query(start);
440 :
441 : // Loop through the delta coverage and recurse on each part
442 7510486 : for (change_key, change_val) in version.delta_coverage.range(start..end) {
443 : // If there's a relevant delta in this part, add 1 and recurse down
444 13449 : if let Some(val) = current_val {
445 13174 : if val.get_lsn_range().end > lsn.start {
446 9054 : let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
447 9054 : let lr = lsn.start..val.get_lsn_range().start;
448 9054 : if !kr.is_empty() {
449 3884 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
450 3884 : let new_limit = limit.map(|l| l - base_count);
451 3884 : let max_stacked_deltas_underneath =
452 3884 : self.count_deltas(&kr, &lr, new_limit)?;
453 3884 : max_stacked_deltas = std::cmp::max(
454 3884 : max_stacked_deltas,
455 3884 : base_count + max_stacked_deltas_underneath,
456 3884 : );
457 5170 : }
458 4120 : }
459 275 : }
460 :
461 13449 : current_key = change_key;
462 13449 : current_val = change_val.clone();
463 : }
464 :
465 : // Consider the last part
466 7510486 : if let Some(val) = current_val {
467 7254739 : if val.get_lsn_range().end > lsn.start {
468 7142478 : let kr = Key::from_i128(current_key)..Key::from_i128(end);
469 7142478 : let lr = lsn.start..val.get_lsn_range().start;
470 7142478 :
471 7142478 : if !kr.is_empty() {
472 7142478 : let base_count = Self::is_reimage_worthy(&val, key) as usize;
473 7142478 : let new_limit = limit.map(|l| l - base_count);
474 7142478 : let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
475 7142478 : max_stacked_deltas = std::cmp::max(
476 7142478 : max_stacked_deltas,
477 7142478 : base_count + max_stacked_deltas_underneath,
478 7142478 : );
479 UBC 0 : }
480 CBC 112261 : }
481 255747 : }
482 :
483 7510486 : Ok(max_stacked_deltas)
484 8801773 : }
485 :
486 : /// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
487 : ///
488 : /// The `partition_range` argument is used as context for the reimage-worthiness decision.
489 : ///
490 : /// Used as a helper for correctness checks only. Performance not critical.
491 UBC 0 : pub fn get_difficulty(&self, lsn: Lsn, key: Key, partition_range: &Range<Key>) -> usize {
492 0 : match self.search(key, lsn) {
493 0 : Some(search_result) => {
494 0 : if search_result.layer.is_incremental() {
495 0 : (Self::is_reimage_worthy(&search_result.layer, partition_range) as usize)
496 0 : + self.get_difficulty(search_result.lsn_floor, key, partition_range)
497 : } else {
498 0 : 0
499 : }
500 : }
501 0 : None => 0,
502 : }
503 0 : }
504 :
505 : /// Used for correctness checking. Results are expected to be identical to
506 : /// self.get_difficulty_map. Assumes self.search is correct.
507 0 : pub fn get_difficulty_map_bruteforce(
508 0 : &self,
509 0 : lsn: Lsn,
510 0 : partitioning: &KeyPartitioning,
511 0 : ) -> Vec<usize> {
512 0 : // Looking at the difficulty as a function of key, it could only increase
513 0 : // when a delta layer starts or an image layer ends. Therefore it's sufficient
514 0 : // to check the difficulties at:
515 0 : // - the key.start for each non-empty part range
516 0 : // - the key.start for each delta
517 0 : // - the key.end for each image
518 0 : let keys_iter: Box<dyn Iterator<Item = Key>> = {
519 0 : let mut keys: Vec<Key> = self
520 0 : .iter_historic_layers()
521 0 : .map(|layer| {
522 0 : if layer.is_incremental() {
523 0 : layer.get_key_range().start
524 : } else {
525 0 : layer.get_key_range().end
526 : }
527 0 : })
528 0 : .collect();
529 0 : keys.sort();
530 0 : Box::new(keys.into_iter())
531 0 : };
532 0 : let mut keys_iter = keys_iter.peekable();
533 0 :
534 0 : // Iter the partition and keys together and query all the necessary
535 0 : // keys, computing the max difficulty for each part.
536 0 : partitioning
537 0 : .parts
538 0 : .iter()
539 0 : .map(|part| {
540 0 : let mut difficulty = 0;
541 : // Partition ranges are assumed to be sorted and disjoint
542 : // TODO assert it
543 0 : for range in &part.ranges {
544 0 : if !range.is_empty() {
545 0 : difficulty =
546 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, range.start, range));
547 0 : }
548 0 : while let Some(key) = keys_iter.peek() {
549 0 : if key >= &range.end {
550 0 : break;
551 0 : }
552 0 : let key = keys_iter.next().unwrap();
553 0 : if key < range.start {
554 0 : continue;
555 0 : }
556 0 : difficulty =
557 0 : std::cmp::max(difficulty, self.get_difficulty(lsn, key, range));
558 : }
559 : }
560 0 : difficulty
561 0 : })
562 0 : .collect()
563 0 : }
564 :
565 : /// For each part of a keyspace partitioning, return the maximum number of layers
566 : /// that would be needed for page reconstruction in that part at the given LSN.
567 : ///
568 : /// If `limit` is provided we don't try to count above that number.
569 : ///
570 : /// This method is used to decide where to create new image layers. Computing the
571 : /// result for the entire partitioning at once allows this function to be more
572 : /// efficient, and further optimization is possible by using iterators instead,
573 : /// to allow early return.
574 : ///
575 : /// TODO actually use this method instead of count_deltas. Currently we only use
576 : /// it for benchmarks.
577 0 : pub fn get_difficulty_map(
578 0 : &self,
579 0 : lsn: Lsn,
580 0 : partitioning: &KeyPartitioning,
581 0 : limit: Option<usize>,
582 0 : ) -> Vec<usize> {
583 0 : // TODO This is a naive implementation. Perf improvements to do:
584 0 : // 1. Instead of calling self.image_coverage and self.count_deltas,
585 0 : // iterate the image and delta coverage only once.
586 0 : partitioning
587 0 : .parts
588 0 : .iter()
589 0 : .map(|part| {
590 0 : let mut difficulty = 0;
591 0 : for range in &part.ranges {
592 0 : if limit == Some(difficulty) {
593 0 : break;
594 0 : }
595 0 : for (img_range, last_img) in self
596 0 : .image_coverage(range, lsn)
597 0 : .expect("why would this err?")
598 : {
599 0 : if limit == Some(difficulty) {
600 0 : break;
601 0 : }
602 0 : let img_lsn = if let Some(last_img) = last_img {
603 0 : last_img.get_lsn_range().end
604 : } else {
605 0 : Lsn(0)
606 : };
607 :
608 0 : if img_lsn < lsn {
609 0 : let num_deltas = self
610 0 : .count_deltas(&img_range, &(img_lsn..lsn), limit)
611 0 : .expect("why would this err lol?");
612 0 : difficulty = std::cmp::max(difficulty, num_deltas);
613 0 : }
614 : }
615 : }
616 0 : difficulty
617 0 : })
618 0 : .collect()
619 0 : }
620 :
621 : /// Return all L0 delta layers
622 CBC 1267 : pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
623 1267 : Ok(self.l0_delta_layers.to_vec())
624 1267 : }
625 :
626 : /// debugging function to print out the contents of the layer map
627 : #[allow(unused)]
628 UBC 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
629 0 : println!("Begin dump LayerMap");
630 0 :
631 0 : println!("open_layer:");
632 0 : if let Some(open_layer) = &self.open_layer {
633 0 : open_layer.dump(verbose, ctx).await?;
634 0 : }
635 :
636 0 : println!("frozen_layers:");
637 0 : for frozen_layer in self.frozen_layers.iter() {
638 0 : frozen_layer.dump(verbose, ctx).await?;
639 : }
640 :
641 0 : println!("historic_layers:");
642 0 : for layer in self.iter_historic_layers() {
643 0 : layer.dump(verbose, ctx)?;
644 : }
645 0 : println!("End dump LayerMap");
646 0 : Ok(())
647 0 : }
648 : }
649 :
650 : #[cfg(test)]
651 : mod tests {
652 : use super::LayerMap;
653 : use crate::tenant::storage_layer::LayerFileName;
654 : use std::str::FromStr;
655 : use std::sync::Arc;
656 :
657 : mod l0_delta_layers_updated {
658 :
659 : use crate::tenant::{
660 : storage_layer::{AsLayerDesc, PersistentLayerDesc},
661 : timeline::layer_manager::LayerFileManager,
662 : };
663 :
664 : use super::*;
665 :
666 : struct LayerObject(PersistentLayerDesc);
667 :
668 : impl AsLayerDesc for LayerObject {
669 CBC 40 : fn layer_desc(&self) -> &PersistentLayerDesc {
670 40 : &self.0
671 40 : }
672 : }
673 :
674 : impl LayerObject {
675 8 : fn new(desc: PersistentLayerDesc) -> Self {
676 8 : LayerObject(desc)
677 8 : }
678 : }
679 :
680 : type TestLayerFileManager = LayerFileManager<LayerObject>;
681 :
682 1 : #[test]
683 1 : fn for_full_range_delta() {
684 1 : // l0_delta_layers are used by compaction, and should observe all buffered updates
685 1 : l0_delta_layers_updated_scenario(
686 1 : "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
687 1 : true
688 1 : )
689 1 : }
690 :
691 1 : #[test]
692 1 : fn for_non_full_range_delta() {
693 1 : // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
694 1 : l0_delta_layers_updated_scenario(
695 1 : "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
696 1 : // because not full range
697 1 : false
698 1 : )
699 1 : }
700 :
701 1 : #[test]
702 1 : fn for_image() {
703 1 : l0_delta_layers_updated_scenario(
704 1 : "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
705 1 : // code only checks if it is a full range layer, doesn't care about images, which must
706 1 : // mean we should in practice never have full range images
707 1 : false
708 1 : )
709 1 : }
710 :
711 1 : #[test]
712 1 : fn replacing_missing_l0_is_notfound() {
713 1 : // original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
714 1 : // however only happen for precondition failures.
715 1 :
716 1 : let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
717 1 : let layer = LayerFileName::from_str(layer).unwrap();
718 1 : let layer = PersistentLayerDesc::from(layer);
719 1 :
720 1 : // same skeletan construction; see scenario below
721 1 : let not_found = Arc::new(LayerObject::new(layer.clone()));
722 1 : let new_version = Arc::new(LayerObject::new(layer));
723 1 :
724 1 : // after the immutable storage state refactor, the replace operation
725 1 : // will not use layer map any more. We keep it here for consistency in test cases
726 1 : // and can remove it in the future.
727 1 : let _map = LayerMap::default();
728 1 :
729 1 : let mut mapping = TestLayerFileManager::new();
730 1 :
731 1 : mapping
732 1 : .replace_and_verify(not_found, new_version)
733 1 : .unwrap_err();
734 1 : }
735 :
736 3 : fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
737 3 : let name = LayerFileName::from_str(layer_name).unwrap();
738 3 : let skeleton = PersistentLayerDesc::from(name);
739 3 :
740 3 : let remote = Arc::new(LayerObject::new(skeleton.clone()));
741 3 : let downloaded = Arc::new(LayerObject::new(skeleton));
742 3 :
743 3 : let mut map = LayerMap::default();
744 3 : let mut mapping = LayerFileManager::new();
745 3 :
746 3 : // two disjoint Arcs in different lifecycle phases. even if it seems they must be the
747 3 : // same layer, we use LayerMap::compare_arced_layers as the identity of layers.
748 3 : assert_eq!(remote.layer_desc(), downloaded.layer_desc());
749 :
750 3 : let expected_in_counts = (1, usize::from(expected_l0));
751 3 :
752 3 : map.batch_update()
753 3 : .insert_historic(remote.layer_desc().clone());
754 3 : mapping.insert(remote.clone());
755 3 : assert_eq!(
756 3 : count_layer_in(&map, remote.layer_desc()),
757 3 : expected_in_counts
758 3 : );
759 :
760 3 : mapping
761 3 : .replace_and_verify(remote, downloaded.clone())
762 3 : .expect("name derived attributes are the same");
763 3 : assert_eq!(
764 3 : count_layer_in(&map, downloaded.layer_desc()),
765 3 : expected_in_counts
766 3 : );
767 :
768 3 : map.batch_update().remove_historic(downloaded.layer_desc());
769 3 : assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
770 3 : }
771 :
772 9 : fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
773 9 : let historic = map
774 9 : .iter_historic_layers()
775 9 : .filter(|x| x.key() == layer.key())
776 9 : .count();
777 9 : let l0s = map
778 9 : .get_level0_deltas()
779 9 : .expect("why does this return a result");
780 9 : let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
781 9 :
782 9 : (historic, l0)
783 9 : }
784 : }
785 : }
|