Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : };
5 :
6 : use pageserver_api::key::Key;
7 : use utils::lsn::Lsn;
8 :
9 : use crate::{context::RequestContext, repository::Value};
10 :
11 : use super::{
12 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
13 : image_layer::{ImageLayerInner, ImageLayerIterator},
14 : };
15 :
16 : #[derive(Clone, Copy)]
17 : enum LayerRef<'a> {
18 : Image(&'a ImageLayerInner),
19 : Delta(&'a DeltaLayerInner),
20 : }
21 :
22 : impl<'a> LayerRef<'a> {
23 42 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
24 42 : match self {
25 8 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
26 34 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
27 : }
28 42 : }
29 : }
30 :
31 : enum LayerIterRef<'a> {
32 : Image(ImageLayerIterator<'a>),
33 : Delta(DeltaLayerIterator<'a>),
34 : }
35 :
36 : impl LayerIterRef<'_> {
37 6252 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
38 6252 : match self {
39 6164 : Self::Delta(x) => x.next().await,
40 88 : Self::Image(x) => x.next().await,
41 : }
42 6252 : }
43 : }
44 :
45 : /// This type plays several roles at once
46 : /// 1. Unified iterator for image and delta layers.
47 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
48 : /// 3. Lazy creation of the real delta/image iterator.
49 : enum IteratorWrapper<'a> {
50 : NotLoaded {
51 : ctx: &'a RequestContext,
52 : first_key_lower_bound: (Key, Lsn),
53 : layer: LayerRef<'a>,
54 : },
55 : Loaded {
56 : iter: PeekableLayerIterRef<'a>,
57 : },
58 : }
59 :
60 : struct PeekableLayerIterRef<'a> {
61 : iter: LayerIterRef<'a>,
62 : peeked: Option<(Key, Lsn, Value)>, // None == end
63 : }
64 :
65 : impl<'a> PeekableLayerIterRef<'a> {
66 42 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
67 42 : let peeked = iter.next().await?;
68 42 : Ok(Self { iter, peeked })
69 42 : }
70 :
71 20812 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
72 20812 : &self.peeked
73 20812 : }
74 :
75 6210 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
76 6210 : let result = self.peeked.take();
77 6210 : self.peeked = self.iter.next().await?;
78 6210 : Ok(result)
79 6210 : }
80 : }
81 :
82 : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
83 0 : fn eq(&self, other: &Self) -> bool {
84 0 : self.cmp(other) == Ordering::Equal
85 0 : }
86 : }
87 :
88 : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
89 :
90 : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
91 12532 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
92 12532 : Some(self.cmp(other))
93 12532 : }
94 : }
95 :
96 : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
97 12532 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
98 12532 : use std::cmp::Ordering;
99 12532 : let a = self.peek_next_key_lsn_value();
100 12532 : let b = other.peek_next_key_lsn_value();
101 12532 : match (a, b) {
102 8368 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
103 16736 : fn map_value_to_num(val: &Option<&Value>) -> usize {
104 12460 : match val {
105 8368 : None => 0,
106 12290 : Some(Value::Image(_)) => 1,
107 8368 : Some(Value::WalRecord(_)) => 2,
108 8368 : }
109 16736 : }
110 8368 : let order_1 = map_value_to_num(&v1);
111 8368 : let order_2 = map_value_to_num(&v2);
112 8368 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
113 8368 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
114 8368 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
115 : }
116 2060 : (Some(_), None) => Ordering::Less,
117 40 : (None, Some(_)) => Ordering::Greater,
118 2064 : (None, None) => Ordering::Equal,
119 : }
120 12532 : .reverse()
121 12532 : }
122 : }
123 :
124 : impl<'a> IteratorWrapper<'a> {
125 8 : pub fn create_from_image_layer(
126 8 : image_layer: &'a ImageLayerInner,
127 8 : ctx: &'a RequestContext,
128 8 : ) -> Self {
129 8 : Self::NotLoaded {
130 8 : layer: LayerRef::Image(image_layer),
131 8 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
132 8 : ctx,
133 8 : }
134 8 : }
135 :
136 34 : pub fn create_from_delta_layer(
137 34 : delta_layer: &'a DeltaLayerInner,
138 34 : ctx: &'a RequestContext,
139 34 : ) -> Self {
140 34 : Self::NotLoaded {
141 34 : layer: LayerRef::Delta(delta_layer),
142 34 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
143 34 : ctx,
144 34 : }
145 34 : }
146 :
147 25064 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
148 25064 : match self {
149 20770 : Self::Loaded { iter } => iter
150 20770 : .peek()
151 20770 : .as_ref()
152 20770 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
153 4294 : Self::NotLoaded {
154 4294 : first_key_lower_bound: (key, lsn),
155 4294 : ..
156 4294 : } => Some((key, *lsn, None)),
157 : }
158 25064 : }
159 :
160 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
161 : //
162 : // The reason is that `impl Ord for Self` evaluates differently after this function
163 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
164 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
165 : // and not just `PeekMut::deref`
166 : // If we don't take `&mut self`
167 42 : async fn load(&mut self) -> anyhow::Result<()> {
168 42 : assert!(!self.is_loaded());
169 42 : let Self::NotLoaded {
170 42 : ctx,
171 42 : first_key_lower_bound,
172 42 : layer,
173 42 : } = self
174 : else {
175 0 : unreachable!()
176 : };
177 42 : let iter = layer.iter(ctx);
178 42 : let iter = PeekableLayerIterRef::create(iter).await?;
179 42 : if let Some((k1, l1, _)) = iter.peek() {
180 42 : let (k2, l2) = first_key_lower_bound;
181 42 : debug_assert!((k1, l1) >= (k2, l2));
182 0 : }
183 42 : *self = Self::Loaded { iter };
184 42 : Ok(())
185 42 : }
186 :
187 6294 : fn is_loaded(&self) -> bool {
188 6294 : matches!(self, Self::Loaded { .. })
189 6294 : }
190 :
191 : /// Correctness: must load the iterator before using.
192 : ///
193 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
194 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
195 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
196 6210 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
197 6210 : let Self::Loaded { iter } = self else {
198 0 : panic!("must load the iterator before using")
199 : };
200 6210 : iter.next().await
201 6210 : }
202 : }
203 :
204 : /// A merge iterator over delta/image layer iterators. When duplicated records are
205 : /// found, the iterator will not perform any deduplication, and the caller should handle
206 : /// these situation. By saying duplicated records, there are many possibilities:
207 : /// * Two same delta at the same LSN.
208 : /// * Two same image at the same LSN.
209 : /// * Delta/image at the same LSN where the image has already applied the delta.
210 : /// The iterator will always put the image before the delta.
211 : pub struct MergeIterator<'a> {
212 : heap: BinaryHeap<IteratorWrapper<'a>>,
213 : }
214 :
215 : impl<'a> MergeIterator<'a> {
216 12 : pub fn create(
217 12 : deltas: &[&'a DeltaLayerInner],
218 12 : images: &[&'a ImageLayerInner],
219 12 : ctx: &'a RequestContext,
220 12 : ) -> Self {
221 12 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
222 20 : for image in images {
223 8 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
224 8 : }
225 46 : for delta in deltas {
226 34 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
227 34 : }
228 12 : Self {
229 12 : heap: BinaryHeap::from(heap),
230 12 : }
231 12 : }
232 :
233 6180 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
234 6264 : while let Some(mut iter) = self.heap.peek_mut() {
235 6252 : if !iter.is_loaded() {
236 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
237 : // We put it back into the heap so that a potentially unloaded layer may have a key between
238 : // [potential_first_key, loaded_first_key).
239 42 : iter.load().await?;
240 42 : continue;
241 6210 : }
242 6210 : let Some(item) = iter.next().await? else {
243 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
244 : // we order None > Some, and all the rest of the iterators should return None.
245 42 : binary_heap::PeekMut::pop(iter);
246 42 : continue;
247 : };
248 6168 : return Ok(Some(item));
249 : }
250 12 : Ok(None)
251 6180 : }
252 : }
253 :
254 : #[cfg(test)]
255 : mod tests {
256 : use super::*;
257 :
258 : use itertools::Itertools;
259 : use pageserver_api::key::Key;
260 : use utils::lsn::Lsn;
261 :
262 : use crate::{
263 : tenant::{
264 : harness::{TenantHarness, TIMELINE_ID},
265 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
266 : },
267 : walrecord::NeonWalRecord,
268 : DEFAULT_PG_VERSION,
269 : };
270 :
271 8 : async fn assert_merge_iter_equal(
272 8 : merge_iter: &mut MergeIterator<'_>,
273 8 : expect: &[(Key, Lsn, Value)],
274 8 : ) {
275 8 : let mut expect_iter = expect.iter();
276 : loop {
277 6080 : let o1 = merge_iter.next().await.unwrap();
278 6080 : let o2 = expect_iter.next();
279 6080 : assert_eq!(o1.is_some(), o2.is_some());
280 6080 : if o1.is_none() && o2.is_none() {
281 8 : break;
282 6072 : }
283 6072 : let (k1, l1, v1) = o1.unwrap();
284 6072 : let (k2, l2, v2) = o2.unwrap();
285 6072 : assert_eq!(&k1, k2);
286 6072 : assert_eq!(l1, *l2);
287 6072 : assert_eq!(&v1, v2);
288 : }
289 8 : }
290 :
291 : #[tokio::test]
292 2 : async fn merge_in_between() {
293 2 : use crate::repository::Value;
294 2 : use bytes::Bytes;
295 2 :
296 2 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
297 2 : .await
298 2 : .unwrap();
299 8 : let (tenant, ctx) = harness.load().await;
300 2 :
301 2 : let tline = tenant
302 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
303 6 : .await
304 2 : .unwrap();
305 2 :
306 8 : fn get_key(id: u32) -> Key {
307 8 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
308 8 : key.field6 = id;
309 8 : key
310 8 : }
311 2 : let test_deltas1 = vec![
312 2 : (
313 2 : get_key(0),
314 2 : Lsn(0x10),
315 2 : Value::Image(Bytes::copy_from_slice(b"test")),
316 2 : ),
317 2 : (
318 2 : get_key(5),
319 2 : Lsn(0x10),
320 2 : Value::Image(Bytes::copy_from_slice(b"test")),
321 2 : ),
322 2 : ];
323 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
324 6 : .await
325 2 : .unwrap();
326 2 : let test_deltas2 = vec![
327 2 : (
328 2 : get_key(3),
329 2 : Lsn(0x10),
330 2 : Value::Image(Bytes::copy_from_slice(b"test")),
331 2 : ),
332 2 : (
333 2 : get_key(4),
334 2 : Lsn(0x10),
335 2 : Value::Image(Bytes::copy_from_slice(b"test")),
336 2 : ),
337 2 : ];
338 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
339 6 : .await
340 2 : .unwrap();
341 2 : let mut merge_iter = MergeIterator::create(
342 2 : &[
343 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
344 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
345 2 : ],
346 2 : &[],
347 2 : &ctx,
348 2 : );
349 2 : let mut expect = Vec::new();
350 2 : expect.extend(test_deltas1);
351 2 : expect.extend(test_deltas2);
352 2 : expect.sort_by(sort_delta);
353 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
354 2 : }
355 :
356 : #[tokio::test]
357 2 : async fn delta_merge() {
358 2 : use crate::repository::Value;
359 2 : use bytes::Bytes;
360 2 :
361 2 : let harness = TenantHarness::create("merge_iterator_delta_merge")
362 2 : .await
363 2 : .unwrap();
364 8 : let (tenant, ctx) = harness.load().await;
365 2 :
366 2 : let tline = tenant
367 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
368 6 : .await
369 2 : .unwrap();
370 2 :
371 6000 : fn get_key(id: u32) -> Key {
372 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
373 6000 : key.field6 = id;
374 6000 : key
375 6000 : }
376 2 : const N: usize = 1000;
377 2 : let test_deltas1 = (0..N)
378 2000 : .map(|idx| {
379 2000 : (
380 2000 : get_key(idx as u32 / 10),
381 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
382 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
383 2000 : )
384 2000 : })
385 2 : .collect_vec();
386 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
387 8 : .await
388 2 : .unwrap();
389 2 : let test_deltas2 = (0..N)
390 2000 : .map(|idx| {
391 2000 : (
392 2000 : get_key(idx as u32 / 10),
393 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
394 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
395 2000 : )
396 2000 : })
397 2 : .collect_vec();
398 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
399 8 : .await
400 2 : .unwrap();
401 2 : let test_deltas3 = (0..N)
402 2000 : .map(|idx| {
403 2000 : (
404 2000 : get_key(idx as u32 / 10 + N as u32),
405 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
406 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
407 2000 : )
408 2000 : })
409 2 : .collect_vec();
410 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
411 8 : .await
412 2 : .unwrap();
413 2 : let mut merge_iter = MergeIterator::create(
414 2 : &[
415 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
416 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
417 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
418 2 : ],
419 2 : &[],
420 2 : &ctx,
421 2 : );
422 2 : let mut expect = Vec::new();
423 2 : expect.extend(test_deltas1);
424 2 : expect.extend(test_deltas2);
425 2 : expect.extend(test_deltas3);
426 2 : expect.sort_by(sort_delta);
427 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
428 2 :
429 2 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
430 2 : }
431 :
432 : #[tokio::test]
433 2 : async fn delta_image_mixed_merge() {
434 2 : use crate::repository::Value;
435 2 : use bytes::Bytes;
436 2 :
437 2 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
438 2 : .await
439 2 : .unwrap();
440 8 : let (tenant, ctx) = harness.load().await;
441 2 :
442 2 : let tline = tenant
443 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
444 6 : .await
445 2 : .unwrap();
446 2 :
447 18 : fn get_key(id: u32) -> Key {
448 18 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
449 18 : key.field6 = id;
450 18 : key
451 18 : }
452 2 : // In this test case, we want to test if the iterator still works correctly with multiple copies
453 2 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
454 2 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
455 2 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
456 2 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
457 2 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
458 2 : // correctly process these situations and return everything as-is, and the upper layer of the system
459 2 : // will handle duplicated LSNs.
460 2 : let test_deltas1 = vec![
461 2 : (
462 2 : get_key(0),
463 2 : Lsn(0x10),
464 2 : Value::WalRecord(NeonWalRecord::wal_init()),
465 2 : ),
466 2 : (
467 2 : get_key(0),
468 2 : Lsn(0x18),
469 2 : Value::WalRecord(NeonWalRecord::wal_append("a")),
470 2 : ),
471 2 : (
472 2 : get_key(5),
473 2 : Lsn(0x10),
474 2 : Value::WalRecord(NeonWalRecord::wal_init()),
475 2 : ),
476 2 : (
477 2 : get_key(5),
478 2 : Lsn(0x18),
479 2 : Value::WalRecord(NeonWalRecord::wal_append("b")),
480 2 : ),
481 2 : ];
482 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
483 6 : .await
484 2 : .unwrap();
485 2 : let mut test_deltas2 = test_deltas1.clone();
486 2 : test_deltas2.push((
487 2 : get_key(10),
488 2 : Lsn(0x20),
489 2 : Value::Image(Bytes::copy_from_slice(b"test")),
490 2 : ));
491 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
492 6 : .await
493 2 : .unwrap();
494 2 : let test_deltas3 = vec![
495 2 : (
496 2 : get_key(0),
497 2 : Lsn(0x10),
498 2 : Value::Image(Bytes::copy_from_slice(b"")),
499 2 : ),
500 2 : (
501 2 : get_key(5),
502 2 : Lsn(0x18),
503 2 : Value::Image(Bytes::copy_from_slice(b"b")),
504 2 : ),
505 2 : (
506 2 : get_key(15),
507 2 : Lsn(0x20),
508 2 : Value::Image(Bytes::copy_from_slice(b"test")),
509 2 : ),
510 2 : ];
511 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
512 6 : .await
513 2 : .unwrap();
514 2 : let mut test_deltas4 = test_deltas3.clone();
515 2 : test_deltas4.push((
516 2 : get_key(20),
517 2 : Lsn(0x20),
518 2 : Value::Image(Bytes::copy_from_slice(b"test")),
519 2 : ));
520 2 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
521 6 : .await
522 2 : .unwrap();
523 2 : let mut expect = Vec::new();
524 2 : expect.extend(test_deltas1);
525 2 : expect.extend(test_deltas2);
526 2 : expect.extend(test_deltas3);
527 2 : expect.extend(test_deltas4);
528 2 : expect.sort_by(sort_delta_value);
529 2 :
530 2 : // Test with different layer order for MergeIterator::create to ensure the order
531 2 : // is stable.
532 2 :
533 2 : let mut merge_iter = MergeIterator::create(
534 2 : &[
535 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
536 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
537 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
538 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
539 2 : ],
540 2 : &[],
541 2 : &ctx,
542 2 : );
543 8 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
544 2 :
545 2 : let mut merge_iter = MergeIterator::create(
546 2 : &[
547 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
548 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
549 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
550 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
551 2 : ],
552 2 : &[],
553 2 : &ctx,
554 2 : );
555 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
556 2 :
557 2 : is_send(merge_iter);
558 2 : }
559 :
560 2 : fn is_send(_: impl Send) {}
561 : }
|