Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : };
5 :
6 : use pageserver_api::key::Key;
7 : use utils::lsn::Lsn;
8 :
9 : use crate::{context::RequestContext, repository::Value};
10 :
11 : use super::{
12 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
13 : image_layer::{ImageLayerInner, ImageLayerIterator},
14 : };
15 :
16 : #[derive(Clone, Copy)]
17 : enum LayerRef<'a> {
18 : Image(&'a ImageLayerInner),
19 : Delta(&'a DeltaLayerInner),
20 : }
21 :
22 : impl<'a> LayerRef<'a> {
23 500 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
24 500 : match self {
25 22 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
26 478 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
27 : }
28 500 : }
29 : }
30 :
31 : enum LayerIterRef<'a> {
32 : Image(ImageLayerIterator<'a>),
33 : Delta(DeltaLayerIterator<'a>),
34 : }
35 :
36 : impl LayerIterRef<'_> {
37 2071500 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
38 2071500 : match self {
39 2071168 : Self::Delta(x) => x.next().await,
40 332 : Self::Image(x) => x.next().await,
41 : }
42 2071500 : }
43 : }
44 :
45 : /// This type plays several roles at once
46 : /// 1. Unified iterator for image and delta layers.
47 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
48 : /// 3. Lazy creation of the real delta/image iterator.
49 : enum IteratorWrapper<'a> {
50 : NotLoaded {
51 : ctx: &'a RequestContext,
52 : first_key_lower_bound: (Key, Lsn),
53 : layer: LayerRef<'a>,
54 : },
55 : Loaded {
56 : iter: PeekableLayerIterRef<'a>,
57 : },
58 : }
59 :
60 : struct PeekableLayerIterRef<'a> {
61 : iter: LayerIterRef<'a>,
62 : peeked: Option<(Key, Lsn, Value)>, // None == end
63 : }
64 :
65 : impl<'a> PeekableLayerIterRef<'a> {
66 500 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
67 611 : let peeked = iter.next().await?;
68 500 : Ok(Self { iter, peeked })
69 500 : }
70 :
71 8450216 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
72 8450216 : &self.peeked
73 8450216 : }
74 :
75 2071000 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
76 2071000 : let result = self.peeked.take();
77 2071000 : self.peeked = self.iter.next().await?;
78 2071000 : Ok(result)
79 2071000 : }
80 : }
81 :
82 : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
83 0 : fn eq(&self, other: &Self) -> bool {
84 0 : self.cmp(other) == Ordering::Equal
85 0 : }
86 : }
87 :
88 : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
89 :
90 : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
91 4229131 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
92 4229131 : Some(self.cmp(other))
93 4229131 : }
94 : }
95 :
96 : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
97 4229131 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
98 4229131 : use std::cmp::Ordering;
99 4229131 : let a = self.peek_next_key_lsn_value();
100 4229131 : let b = other.peek_next_key_lsn_value();
101 4229131 : match (a, b) {
102 3021349 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
103 6042698 : fn map_value_to_num(val: &Option<&Value>) -> usize {
104 6034242 : match val {
105 3021349 : None => 0,
106 6033910 : Some(Value::Image(_)) => 1,
107 3021349 : Some(Value::WalRecord(_)) => 2,
108 3021349 : }
109 6042698 : }
110 3021349 : let order_1 = map_value_to_num(&v1);
111 3021349 : let order_2 = map_value_to_num(&v2);
112 3021349 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
113 3021349 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
114 3021349 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
115 : }
116 1002791 : (Some(_), None) => Ordering::Less,
117 1294 : (None, Some(_)) => Ordering::Greater,
118 203697 : (None, None) => Ordering::Equal,
119 : }
120 4229131 : .reverse()
121 4229131 : }
122 : }
123 :
124 : impl<'a> IteratorWrapper<'a> {
125 22 : pub fn create_from_image_layer(
126 22 : image_layer: &'a ImageLayerInner,
127 22 : ctx: &'a RequestContext,
128 22 : ) -> Self {
129 22 : Self::NotLoaded {
130 22 : layer: LayerRef::Image(image_layer),
131 22 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
132 22 : ctx,
133 22 : }
134 22 : }
135 :
136 478 : pub fn create_from_delta_layer(
137 478 : delta_layer: &'a DeltaLayerInner,
138 478 : ctx: &'a RequestContext,
139 478 : ) -> Self {
140 478 : Self::NotLoaded {
141 478 : layer: LayerRef::Delta(delta_layer),
142 478 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
143 478 : ctx,
144 478 : }
145 478 : }
146 :
147 8458262 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
148 8458262 : match self {
149 8449716 : Self::Loaded { iter } => iter
150 8449716 : .peek()
151 8449716 : .as_ref()
152 8449716 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
153 8546 : Self::NotLoaded {
154 8546 : first_key_lower_bound: (key, lsn),
155 8546 : ..
156 8546 : } => Some((key, *lsn, None)),
157 : }
158 8458262 : }
159 :
160 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
161 : //
162 : // The reason is that `impl Ord for Self` evaluates differently after this function
163 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
164 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
165 : // and not just `PeekMut::deref`
166 : // If we don't take `&mut self`
167 500 : async fn load(&mut self) -> anyhow::Result<()> {
168 500 : assert!(!self.is_loaded());
169 500 : let Self::NotLoaded {
170 500 : ctx,
171 500 : first_key_lower_bound,
172 500 : layer,
173 500 : } = self
174 : else {
175 0 : unreachable!()
176 : };
177 500 : let iter = layer.iter(ctx);
178 611 : let iter = PeekableLayerIterRef::create(iter).await?;
179 500 : if let Some((k1, l1, _)) = iter.peek() {
180 500 : let (k2, l2) = first_key_lower_bound;
181 500 : debug_assert!((k1, l1) >= (k2, l2));
182 0 : }
183 500 : *self = Self::Loaded { iter };
184 500 : Ok(())
185 500 : }
186 :
187 2072000 : fn is_loaded(&self) -> bool {
188 2072000 : matches!(self, Self::Loaded { .. })
189 2072000 : }
190 :
191 : /// Correctness: must load the iterator before using.
192 : ///
193 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
194 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
195 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
196 2071000 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
197 2071000 : let Self::Loaded { iter } = self else {
198 0 : panic!("must load the iterator before using")
199 : };
200 2071000 : iter.next().await
201 2071000 : }
202 : }
203 :
204 : /// A merge iterator over delta/image layer iterators. When duplicated records are
205 : /// found, the iterator will not perform any deduplication, and the caller should handle
206 : /// these situation. By saying duplicated records, there are many possibilities:
207 : ///
208 : /// * Two same delta at the same LSN.
209 : /// * Two same image at the same LSN.
210 : /// * Delta/image at the same LSN where the image has already applied the delta.
211 : ///
212 : /// The iterator will always put the image before the delta.
213 : pub struct MergeIterator<'a> {
214 : heap: BinaryHeap<IteratorWrapper<'a>>,
215 : }
216 :
217 : impl<'a> MergeIterator<'a> {
218 54 : pub fn create(
219 54 : deltas: &[&'a DeltaLayerInner],
220 54 : images: &[&'a ImageLayerInner],
221 54 : ctx: &'a RequestContext,
222 54 : ) -> Self {
223 54 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
224 76 : for image in images {
225 22 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
226 22 : }
227 532 : for delta in deltas {
228 478 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
229 478 : }
230 54 : Self {
231 54 : heap: BinaryHeap::from(heap),
232 54 : }
233 54 : }
234 :
235 2070554 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
236 2071554 : while let Some(mut iter) = self.heap.peek_mut() {
237 2071500 : if !iter.is_loaded() {
238 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
239 : // We put it back into the heap so that a potentially unloaded layer may have a key between
240 : // [potential_first_key, loaded_first_key).
241 611 : iter.load().await?;
242 500 : continue;
243 2071000 : }
244 2071000 : let Some(item) = iter.next().await? else {
245 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
246 : // we order None > Some, and all the rest of the iterators should return None.
247 500 : binary_heap::PeekMut::pop(iter);
248 500 : continue;
249 : };
250 2070500 : return Ok(Some(item));
251 : }
252 54 : Ok(None)
253 2070554 : }
254 : }
255 :
256 : #[cfg(test)]
257 : mod tests {
258 : use super::*;
259 :
260 : use itertools::Itertools;
261 : use pageserver_api::key::Key;
262 : use utils::lsn::Lsn;
263 :
264 : use crate::{
265 : tenant::{
266 : harness::{TenantHarness, TIMELINE_ID},
267 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
268 : },
269 : walrecord::NeonWalRecord,
270 : DEFAULT_PG_VERSION,
271 : };
272 :
273 8 : async fn assert_merge_iter_equal(
274 8 : merge_iter: &mut MergeIterator<'_>,
275 8 : expect: &[(Key, Lsn, Value)],
276 8 : ) {
277 8 : let mut expect_iter = expect.iter();
278 : loop {
279 6080 : let o1 = merge_iter.next().await.unwrap();
280 6080 : let o2 = expect_iter.next();
281 6080 : assert_eq!(o1.is_some(), o2.is_some());
282 6080 : if o1.is_none() && o2.is_none() {
283 8 : break;
284 6072 : }
285 6072 : let (k1, l1, v1) = o1.unwrap();
286 6072 : let (k2, l2, v2) = o2.unwrap();
287 6072 : assert_eq!(&k1, k2);
288 6072 : assert_eq!(l1, *l2);
289 6072 : assert_eq!(&v1, v2);
290 : }
291 8 : }
292 :
293 : #[tokio::test]
294 2 : async fn merge_in_between() {
295 2 : use crate::repository::Value;
296 2 : use bytes::Bytes;
297 2 :
298 2 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
299 2 : .await
300 2 : .unwrap();
301 8 : let (tenant, ctx) = harness.load().await;
302 2 :
303 2 : let tline = tenant
304 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
305 4 : .await
306 2 : .unwrap();
307 2 :
308 8 : fn get_key(id: u32) -> Key {
309 8 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
310 8 : key.field6 = id;
311 8 : key
312 8 : }
313 2 : let test_deltas1 = vec![
314 2 : (
315 2 : get_key(0),
316 2 : Lsn(0x10),
317 2 : Value::Image(Bytes::copy_from_slice(b"test")),
318 2 : ),
319 2 : (
320 2 : get_key(5),
321 2 : Lsn(0x10),
322 2 : Value::Image(Bytes::copy_from_slice(b"test")),
323 2 : ),
324 2 : ];
325 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
326 6 : .await
327 2 : .unwrap();
328 2 : let test_deltas2 = vec![
329 2 : (
330 2 : get_key(3),
331 2 : Lsn(0x10),
332 2 : Value::Image(Bytes::copy_from_slice(b"test")),
333 2 : ),
334 2 : (
335 2 : get_key(4),
336 2 : Lsn(0x10),
337 2 : Value::Image(Bytes::copy_from_slice(b"test")),
338 2 : ),
339 2 : ];
340 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
341 6 : .await
342 2 : .unwrap();
343 2 : let mut merge_iter = MergeIterator::create(
344 2 : &[
345 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
346 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
347 2 : ],
348 2 : &[],
349 2 : &ctx,
350 2 : );
351 2 : let mut expect = Vec::new();
352 2 : expect.extend(test_deltas1);
353 2 : expect.extend(test_deltas2);
354 2 : expect.sort_by(sort_delta);
355 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
356 2 : }
357 :
358 : #[tokio::test]
359 2 : async fn delta_merge() {
360 2 : use crate::repository::Value;
361 2 : use bytes::Bytes;
362 2 :
363 2 : let harness = TenantHarness::create("merge_iterator_delta_merge")
364 2 : .await
365 2 : .unwrap();
366 8 : let (tenant, ctx) = harness.load().await;
367 2 :
368 2 : let tline = tenant
369 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
370 4 : .await
371 2 : .unwrap();
372 2 :
373 6000 : fn get_key(id: u32) -> Key {
374 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
375 6000 : key.field6 = id;
376 6000 : key
377 6000 : }
378 2 : const N: usize = 1000;
379 2 : let test_deltas1 = (0..N)
380 2000 : .map(|idx| {
381 2000 : (
382 2000 : get_key(idx as u32 / 10),
383 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
384 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
385 2000 : )
386 2000 : })
387 2 : .collect_vec();
388 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
389 8 : .await
390 2 : .unwrap();
391 2 : let test_deltas2 = (0..N)
392 2000 : .map(|idx| {
393 2000 : (
394 2000 : get_key(idx as u32 / 10),
395 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
396 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
397 2000 : )
398 2000 : })
399 2 : .collect_vec();
400 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
401 8 : .await
402 2 : .unwrap();
403 2 : let test_deltas3 = (0..N)
404 2000 : .map(|idx| {
405 2000 : (
406 2000 : get_key(idx as u32 / 10 + N as u32),
407 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
408 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
409 2000 : )
410 2000 : })
411 2 : .collect_vec();
412 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
413 8 : .await
414 2 : .unwrap();
415 2 : let mut merge_iter = MergeIterator::create(
416 2 : &[
417 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
418 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
419 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
420 2 : ],
421 2 : &[],
422 2 : &ctx,
423 2 : );
424 2 : let mut expect = Vec::new();
425 2 : expect.extend(test_deltas1);
426 2 : expect.extend(test_deltas2);
427 2 : expect.extend(test_deltas3);
428 2 : expect.sort_by(sort_delta);
429 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
430 2 :
431 2 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
432 2 : }
433 :
434 : #[tokio::test]
435 2 : async fn delta_image_mixed_merge() {
436 2 : use crate::repository::Value;
437 2 : use bytes::Bytes;
438 2 :
439 2 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
440 2 : .await
441 2 : .unwrap();
442 8 : let (tenant, ctx) = harness.load().await;
443 2 :
444 2 : let tline = tenant
445 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
446 4 : .await
447 2 : .unwrap();
448 2 :
449 18 : fn get_key(id: u32) -> Key {
450 18 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
451 18 : key.field6 = id;
452 18 : key
453 18 : }
454 2 : // In this test case, we want to test if the iterator still works correctly with multiple copies
455 2 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
456 2 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
457 2 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
458 2 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
459 2 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
460 2 : // correctly process these situations and return everything as-is, and the upper layer of the system
461 2 : // will handle duplicated LSNs.
462 2 : let test_deltas1 = vec![
463 2 : (
464 2 : get_key(0),
465 2 : Lsn(0x10),
466 2 : Value::WalRecord(NeonWalRecord::wal_init()),
467 2 : ),
468 2 : (
469 2 : get_key(0),
470 2 : Lsn(0x18),
471 2 : Value::WalRecord(NeonWalRecord::wal_append("a")),
472 2 : ),
473 2 : (
474 2 : get_key(5),
475 2 : Lsn(0x10),
476 2 : Value::WalRecord(NeonWalRecord::wal_init()),
477 2 : ),
478 2 : (
479 2 : get_key(5),
480 2 : Lsn(0x18),
481 2 : Value::WalRecord(NeonWalRecord::wal_append("b")),
482 2 : ),
483 2 : ];
484 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
485 6 : .await
486 2 : .unwrap();
487 2 : let mut test_deltas2 = test_deltas1.clone();
488 2 : test_deltas2.push((
489 2 : get_key(10),
490 2 : Lsn(0x20),
491 2 : Value::Image(Bytes::copy_from_slice(b"test")),
492 2 : ));
493 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
494 6 : .await
495 2 : .unwrap();
496 2 : let test_deltas3 = vec![
497 2 : (
498 2 : get_key(0),
499 2 : Lsn(0x10),
500 2 : Value::Image(Bytes::copy_from_slice(b"")),
501 2 : ),
502 2 : (
503 2 : get_key(5),
504 2 : Lsn(0x18),
505 2 : Value::Image(Bytes::copy_from_slice(b"b")),
506 2 : ),
507 2 : (
508 2 : get_key(15),
509 2 : Lsn(0x20),
510 2 : Value::Image(Bytes::copy_from_slice(b"test")),
511 2 : ),
512 2 : ];
513 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
514 6 : .await
515 2 : .unwrap();
516 2 : let mut test_deltas4 = test_deltas3.clone();
517 2 : test_deltas4.push((
518 2 : get_key(20),
519 2 : Lsn(0x20),
520 2 : Value::Image(Bytes::copy_from_slice(b"test")),
521 2 : ));
522 2 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
523 6 : .await
524 2 : .unwrap();
525 2 : let mut expect = Vec::new();
526 2 : expect.extend(test_deltas1);
527 2 : expect.extend(test_deltas2);
528 2 : expect.extend(test_deltas3);
529 2 : expect.extend(test_deltas4);
530 2 : expect.sort_by(sort_delta_value);
531 2 :
532 2 : // Test with different layer order for MergeIterator::create to ensure the order
533 2 : // is stable.
534 2 :
535 2 : let mut merge_iter = MergeIterator::create(
536 2 : &[
537 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
538 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
539 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
540 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
541 2 : ],
542 2 : &[],
543 2 : &ctx,
544 2 : );
545 8 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
546 2 :
547 2 : let mut merge_iter = MergeIterator::create(
548 2 : &[
549 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
550 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
551 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
552 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
553 2 : ],
554 2 : &[],
555 2 : &ctx,
556 2 : );
557 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
558 2 :
559 2 : is_send(merge_iter);
560 2 : }
561 :
562 2 : fn is_send(_: impl Send) {}
563 : }
|