Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : sync::Arc,
5 : };
6 :
7 : use anyhow::bail;
8 : use pageserver_api::key::Key;
9 : use utils::lsn::Lsn;
10 :
11 : use crate::context::RequestContext;
12 : use pageserver_api::value::Value;
13 :
14 : use super::{
15 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
16 : image_layer::{ImageLayerInner, ImageLayerIterator},
17 : PersistentLayerDesc, PersistentLayerKey,
18 : };
19 :
20 : #[derive(Clone, Copy)]
21 : pub(crate) enum LayerRef<'a> {
22 : Image(&'a ImageLayerInner),
23 : Delta(&'a DeltaLayerInner),
24 : }
25 :
26 : impl<'a> LayerRef<'a> {
27 552 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
28 552 : match self {
29 58 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
30 494 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
31 : }
32 552 : }
33 :
34 0 : fn layer_dbg_info(&self) -> String {
35 0 : match self {
36 0 : Self::Image(x) => x.layer_dbg_info(),
37 0 : Self::Delta(x) => x.layer_dbg_info(),
38 : }
39 0 : }
40 : }
41 :
42 : enum LayerIterRef<'a> {
43 : Image(ImageLayerIterator<'a>),
44 : Delta(DeltaLayerIterator<'a>),
45 : }
46 :
47 : impl LayerIterRef<'_> {
48 2072386 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
49 2072386 : match self {
50 2071694 : Self::Delta(x) => x.next().await,
51 692 : Self::Image(x) => x.next().await,
52 : }
53 2072386 : }
54 :
55 0 : fn layer_dbg_info(&self) -> String {
56 0 : match self {
57 0 : Self::Image(x) => x.layer_dbg_info(),
58 0 : Self::Delta(x) => x.layer_dbg_info(),
59 : }
60 0 : }
61 : }
62 :
63 : /// This type plays several roles at once
64 : /// 1. Unified iterator for image and delta layers.
65 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
66 : /// 3. Lazy creation of the real delta/image iterator.
67 : pub(crate) enum IteratorWrapper<'a> {
68 : NotLoaded {
69 : ctx: &'a RequestContext,
70 : first_key_lower_bound: (Key, Lsn),
71 : layer: LayerRef<'a>,
72 : source_desc: Arc<PersistentLayerKey>,
73 : },
74 : Loaded {
75 : iter: PeekableLayerIterRef<'a>,
76 : source_desc: Arc<PersistentLayerKey>,
77 : },
78 : }
79 :
80 : pub(crate) struct PeekableLayerIterRef<'a> {
81 : iter: LayerIterRef<'a>,
82 : peeked: Option<(Key, Lsn, Value)>, // None == end
83 : }
84 :
85 : impl<'a> PeekableLayerIterRef<'a> {
86 552 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
87 670 : let peeked = iter.next().await?;
88 552 : Ok(Self { iter, peeked })
89 552 : }
90 :
91 8451506 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
92 8451506 : &self.peeked
93 8451506 : }
94 :
95 2071834 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
96 2071834 : let result = self.peeked.take();
97 2071834 : self.peeked = self.iter.next().await?;
98 2071834 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
99 2070734 : if (k1, l1) < (k2, l2) {
100 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
101 2070734 : }
102 1100 : }
103 2071834 : Ok(result)
104 2071834 : }
105 : }
106 :
107 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
108 0 : fn eq(&self, other: &Self) -> bool {
109 0 : self.cmp(other) == Ordering::Equal
110 0 : }
111 : }
112 :
113 : impl std::cmp::Eq for IteratorWrapper<'_> {}
114 :
115 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
116 4229872 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
117 4229872 : Some(self.cmp(other))
118 4229872 : }
119 : }
120 :
121 : impl std::cmp::Ord for IteratorWrapper<'_> {
122 4229872 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
123 : use std::cmp::Ordering;
124 4229872 : let a = self.peek_next_key_lsn_value();
125 4229872 : let b = other.peek_next_key_lsn_value();
126 4229872 : match (a, b) {
127 3021888 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
128 6043776 : fn map_value_to_num(val: &Option<&Value>) -> usize {
129 6035108 : match val {
130 8668 : None => 0,
131 6034648 : Some(Value::Image(_)) => 1,
132 460 : Some(Value::WalRecord(_)) => 2,
133 : }
134 6043776 : }
135 3021888 : let order_1 = map_value_to_num(&v1);
136 3021888 : let order_2 = map_value_to_num(&v2);
137 3021888 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
138 3021888 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
139 3021888 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
140 : }
141 1002914 : (Some(_), None) => Ordering::Less,
142 1326 : (None, Some(_)) => Ordering::Greater,
143 203744 : (None, None) => Ordering::Equal,
144 : }
145 4229872 : .reverse()
146 4229872 : }
147 : }
148 :
149 : impl<'a> IteratorWrapper<'a> {
150 58 : pub fn create_from_image_layer(
151 58 : image_layer: &'a ImageLayerInner,
152 58 : ctx: &'a RequestContext,
153 58 : ) -> Self {
154 58 : Self::NotLoaded {
155 58 : layer: LayerRef::Image(image_layer),
156 58 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
157 58 : ctx,
158 58 : source_desc: PersistentLayerKey {
159 58 : key_range: image_layer.key_range().clone(),
160 58 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
161 58 : is_delta: false,
162 58 : }
163 58 : .into(),
164 58 : }
165 58 : }
166 :
167 494 : pub fn create_from_delta_layer(
168 494 : delta_layer: &'a DeltaLayerInner,
169 494 : ctx: &'a RequestContext,
170 494 : ) -> Self {
171 494 : Self::NotLoaded {
172 494 : layer: LayerRef::Delta(delta_layer),
173 494 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
174 494 : ctx,
175 494 : source_desc: PersistentLayerKey {
176 494 : key_range: delta_layer.key_range().clone(),
177 494 : lsn_range: delta_layer.lsn_range().clone(),
178 494 : is_delta: true,
179 494 : }
180 494 : .into(),
181 494 : }
182 494 : }
183 :
184 8459744 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
185 8459744 : match self {
186 8450954 : Self::Loaded { iter, .. } => iter
187 8450954 : .peek()
188 8450954 : .as_ref()
189 8450954 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
190 8790 : Self::NotLoaded {
191 8790 : first_key_lower_bound: (key, lsn),
192 8790 : ..
193 8790 : } => Some((key, *lsn, None)),
194 : }
195 8459744 : }
196 :
197 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
198 : //
199 : // The reason is that `impl Ord for Self` evaluates differently after this function
200 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
201 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
202 : // and not just `PeekMut::deref`
203 : // If we don't take `&mut self`
204 552 : async fn load(&mut self) -> anyhow::Result<()> {
205 552 : assert!(!self.is_loaded());
206 552 : let Self::NotLoaded {
207 552 : ctx,
208 552 : first_key_lower_bound,
209 552 : layer,
210 552 : source_desc,
211 552 : } = self
212 : else {
213 0 : unreachable!()
214 : };
215 552 : let iter = layer.iter(ctx);
216 670 : let iter = PeekableLayerIterRef::create(iter).await?;
217 552 : if let Some((k1, l1, _)) = iter.peek() {
218 552 : let (k2, l2) = first_key_lower_bound;
219 552 : if (k1, l1) < (k2, l2) {
220 0 : bail!(
221 0 : "layer key range did not include the first key in the layer: {}",
222 0 : layer.layer_dbg_info()
223 0 : );
224 552 : }
225 0 : }
226 552 : *self = Self::Loaded {
227 552 : iter,
228 552 : source_desc: source_desc.clone(),
229 552 : };
230 552 : Ok(())
231 552 : }
232 :
233 2072938 : fn is_loaded(&self) -> bool {
234 2072938 : matches!(self, Self::Loaded { .. })
235 2072938 : }
236 :
237 : /// Correctness: must load the iterator before using.
238 : ///
239 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
240 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
241 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
242 2071834 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
243 2071834 : let Self::Loaded { iter, .. } = self else {
244 0 : panic!("must load the iterator before using")
245 : };
246 2071834 : iter.next().await
247 2071834 : }
248 :
249 : /// Get the persistent layer key corresponding to this iterator
250 782 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
251 782 : match self {
252 782 : Self::Loaded { source_desc, .. } => source_desc.clone(),
253 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
254 : }
255 782 : }
256 : }
257 :
258 : /// A merge iterator over delta/image layer iterators.
259 : ///
260 : /// When duplicated records are found, the iterator will not perform any
261 : /// deduplication, and the caller should handle these situation. By saying
262 : /// duplicated records, there are many possibilities:
263 : ///
264 : /// * Two same delta at the same LSN.
265 : /// * Two same image at the same LSN.
266 : /// * Delta/image at the same LSN where the image has already applied the delta.
267 : ///
268 : /// The iterator will always put the image before the delta.
269 : pub struct MergeIterator<'a> {
270 : heap: BinaryHeap<IteratorWrapper<'a>>,
271 : }
272 :
273 : pub(crate) trait MergeIteratorItem {
274 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
275 :
276 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
277 : }
278 :
279 : impl MergeIteratorItem for (Key, Lsn, Value) {
280 2070502 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
281 2070502 : item
282 2070502 : }
283 :
284 790 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
285 790 : self
286 790 : }
287 : }
288 :
289 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
290 782 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
291 782 : (item, iter.trace_source().clone())
292 782 : }
293 :
294 1640 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
295 1640 : &self.0
296 1640 : }
297 : }
298 :
299 : impl<'a> MergeIterator<'a> {
300 80 : pub fn create(
301 80 : deltas: &[&'a DeltaLayerInner],
302 80 : images: &[&'a ImageLayerInner],
303 80 : ctx: &'a RequestContext,
304 80 : ) -> Self {
305 80 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
306 138 : for image in images {
307 58 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
308 58 : }
309 574 : for delta in deltas {
310 494 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
311 494 : }
312 80 : Self {
313 80 : heap: BinaryHeap::from(heap),
314 80 : }
315 80 : }
316 :
317 2071362 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
318 2072464 : while let Some(mut iter) = self.heap.peek_mut() {
319 2072386 : if !iter.is_loaded() {
320 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
321 : // We put it back into the heap so that a potentially unloaded layer may have a key between
322 : // [potential_first_key, loaded_first_key).
323 670 : iter.load().await?;
324 552 : continue;
325 2071834 : }
326 2071834 : let Some(item) = iter.next().await? else {
327 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
328 : // we order None > Some, and all the rest of the iterators should return None.
329 550 : binary_heap::PeekMut::pop(iter);
330 550 : continue;
331 : };
332 2071284 : return Ok(Some(R::new(item, &iter)));
333 : }
334 78 : Ok(None)
335 2071362 : }
336 :
337 : /// Get the next key-value pair from the iterator.
338 2070146 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
339 2070146 : self.next_inner().await
340 2070146 : }
341 :
342 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
343 0 : pub async fn next_with_trace(
344 0 : &mut self,
345 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
346 0 : self.next_inner().await
347 0 : }
348 : }
349 :
350 : #[cfg(test)]
351 : mod tests {
352 : use super::*;
353 :
354 : use itertools::Itertools;
355 : use pageserver_api::key::Key;
356 : use utils::lsn::Lsn;
357 :
358 : use crate::{
359 : tenant::{
360 : harness::{TenantHarness, TIMELINE_ID},
361 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta},
362 : },
363 : DEFAULT_PG_VERSION,
364 : };
365 :
366 : #[cfg(feature = "testing")]
367 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
368 : #[cfg(feature = "testing")]
369 : use pageserver_api::record::NeonWalRecord;
370 :
371 8 : async fn assert_merge_iter_equal(
372 8 : merge_iter: &mut MergeIterator<'_>,
373 8 : expect: &[(Key, Lsn, Value)],
374 8 : ) {
375 8 : let mut expect_iter = expect.iter();
376 : loop {
377 6080 : let o1 = merge_iter.next().await.unwrap();
378 6080 : let o2 = expect_iter.next();
379 6080 : assert_eq!(o1.is_some(), o2.is_some());
380 6080 : if o1.is_none() && o2.is_none() {
381 8 : break;
382 6072 : }
383 6072 : let (k1, l1, v1) = o1.unwrap();
384 6072 : let (k2, l2, v2) = o2.unwrap();
385 6072 : assert_eq!(&k1, k2);
386 6072 : assert_eq!(l1, *l2);
387 6072 : assert_eq!(&v1, v2);
388 : }
389 8 : }
390 :
391 : #[tokio::test]
392 2 : async fn merge_in_between() {
393 2 : use bytes::Bytes;
394 2 : use pageserver_api::value::Value;
395 2 :
396 2 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
397 2 : .await
398 2 : .unwrap();
399 20 : let (tenant, ctx) = harness.load().await;
400 2 :
401 2 : let tline = tenant
402 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
403 6 : .await
404 2 : .unwrap();
405 2 :
406 8 : fn get_key(id: u32) -> Key {
407 8 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
408 8 : key.field6 = id;
409 8 : key
410 8 : }
411 2 : let test_deltas1 = vec![
412 2 : (
413 2 : get_key(0),
414 2 : Lsn(0x10),
415 2 : Value::Image(Bytes::copy_from_slice(b"test")),
416 2 : ),
417 2 : (
418 2 : get_key(5),
419 2 : Lsn(0x10),
420 2 : Value::Image(Bytes::copy_from_slice(b"test")),
421 2 : ),
422 2 : ];
423 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
424 6 : .await
425 2 : .unwrap();
426 2 : let test_deltas2 = vec![
427 2 : (
428 2 : get_key(3),
429 2 : Lsn(0x10),
430 2 : Value::Image(Bytes::copy_from_slice(b"test")),
431 2 : ),
432 2 : (
433 2 : get_key(4),
434 2 : Lsn(0x10),
435 2 : Value::Image(Bytes::copy_from_slice(b"test")),
436 2 : ),
437 2 : ];
438 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
439 6 : .await
440 2 : .unwrap();
441 2 : let mut merge_iter = MergeIterator::create(
442 2 : &[
443 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
444 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
445 2 : ],
446 2 : &[],
447 2 : &ctx,
448 2 : );
449 2 : let mut expect = Vec::new();
450 2 : expect.extend(test_deltas1);
451 2 : expect.extend(test_deltas2);
452 2 : expect.sort_by(sort_delta);
453 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
454 2 : }
455 :
456 : #[tokio::test]
457 2 : async fn delta_merge() {
458 2 : use bytes::Bytes;
459 2 : use pageserver_api::value::Value;
460 2 :
461 2 : let harness = TenantHarness::create("merge_iterator_delta_merge")
462 2 : .await
463 2 : .unwrap();
464 20 : let (tenant, ctx) = harness.load().await;
465 2 :
466 2 : let tline = tenant
467 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
468 6 : .await
469 2 : .unwrap();
470 2 :
471 6000 : fn get_key(id: u32) -> Key {
472 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
473 6000 : key.field6 = id;
474 6000 : key
475 6000 : }
476 2 : const N: usize = 1000;
477 2 : let test_deltas1 = (0..N)
478 2000 : .map(|idx| {
479 2000 : (
480 2000 : get_key(idx as u32 / 10),
481 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
482 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
483 2000 : )
484 2000 : })
485 2 : .collect_vec();
486 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
487 8 : .await
488 2 : .unwrap();
489 2 : let test_deltas2 = (0..N)
490 2000 : .map(|idx| {
491 2000 : (
492 2000 : get_key(idx as u32 / 10),
493 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
494 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
495 2000 : )
496 2000 : })
497 2 : .collect_vec();
498 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
499 8 : .await
500 2 : .unwrap();
501 2 : let test_deltas3 = (0..N)
502 2000 : .map(|idx| {
503 2000 : (
504 2000 : get_key(idx as u32 / 10 + N as u32),
505 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
506 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
507 2000 : )
508 2000 : })
509 2 : .collect_vec();
510 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
511 8 : .await
512 2 : .unwrap();
513 2 : let mut merge_iter = MergeIterator::create(
514 2 : &[
515 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
516 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
517 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
518 2 : ],
519 2 : &[],
520 2 : &ctx,
521 2 : );
522 2 : let mut expect = Vec::new();
523 2 : expect.extend(test_deltas1);
524 2 : expect.extend(test_deltas2);
525 2 : expect.extend(test_deltas3);
526 2 : expect.sort_by(sort_delta);
527 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
528 2 :
529 2 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
530 2 : }
531 :
532 : #[cfg(feature = "testing")]
533 : #[tokio::test]
534 2 : async fn delta_image_mixed_merge() {
535 2 : use bytes::Bytes;
536 2 : use pageserver_api::value::Value;
537 2 :
538 2 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
539 2 : .await
540 2 : .unwrap();
541 20 : let (tenant, ctx) = harness.load().await;
542 2 :
543 2 : let tline = tenant
544 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
545 5 : .await
546 2 : .unwrap();
547 2 :
548 18 : fn get_key(id: u32) -> Key {
549 18 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
550 18 : key.field6 = id;
551 18 : key
552 18 : }
553 2 : // In this test case, we want to test if the iterator still works correctly with multiple copies
554 2 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
555 2 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
556 2 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
557 2 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
558 2 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
559 2 : // correctly process these situations and return everything as-is, and the upper layer of the system
560 2 : // will handle duplicated LSNs.
561 2 : let test_deltas1 = vec![
562 2 : (
563 2 : get_key(0),
564 2 : Lsn(0x10),
565 2 : Value::WalRecord(NeonWalRecord::wal_init("")),
566 2 : ),
567 2 : (
568 2 : get_key(0),
569 2 : Lsn(0x18),
570 2 : Value::WalRecord(NeonWalRecord::wal_append("a")),
571 2 : ),
572 2 : (
573 2 : get_key(5),
574 2 : Lsn(0x10),
575 2 : Value::WalRecord(NeonWalRecord::wal_init("")),
576 2 : ),
577 2 : (
578 2 : get_key(5),
579 2 : Lsn(0x18),
580 2 : Value::WalRecord(NeonWalRecord::wal_append("b")),
581 2 : ),
582 2 : ];
583 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
584 6 : .await
585 2 : .unwrap();
586 2 : let mut test_deltas2 = test_deltas1.clone();
587 2 : test_deltas2.push((
588 2 : get_key(10),
589 2 : Lsn(0x20),
590 2 : Value::Image(Bytes::copy_from_slice(b"test")),
591 2 : ));
592 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
593 6 : .await
594 2 : .unwrap();
595 2 : let test_deltas3 = vec![
596 2 : (
597 2 : get_key(0),
598 2 : Lsn(0x10),
599 2 : Value::Image(Bytes::copy_from_slice(b"")),
600 2 : ),
601 2 : (
602 2 : get_key(5),
603 2 : Lsn(0x18),
604 2 : Value::Image(Bytes::copy_from_slice(b"b")),
605 2 : ),
606 2 : (
607 2 : get_key(15),
608 2 : Lsn(0x20),
609 2 : Value::Image(Bytes::copy_from_slice(b"test")),
610 2 : ),
611 2 : ];
612 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
613 6 : .await
614 2 : .unwrap();
615 2 : let mut test_deltas4 = test_deltas3.clone();
616 2 : test_deltas4.push((
617 2 : get_key(20),
618 2 : Lsn(0x20),
619 2 : Value::Image(Bytes::copy_from_slice(b"test")),
620 2 : ));
621 2 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
622 6 : .await
623 2 : .unwrap();
624 2 : let mut expect = Vec::new();
625 2 : expect.extend(test_deltas1);
626 2 : expect.extend(test_deltas2);
627 2 : expect.extend(test_deltas3);
628 2 : expect.extend(test_deltas4);
629 2 : expect.sort_by(sort_delta_value);
630 2 :
631 2 : // Test with different layer order for MergeIterator::create to ensure the order
632 2 : // is stable.
633 2 :
634 2 : let mut merge_iter = MergeIterator::create(
635 2 : &[
636 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
637 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
638 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
639 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
640 2 : ],
641 2 : &[],
642 2 : &ctx,
643 2 : );
644 8 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
645 2 :
646 2 : let mut merge_iter = MergeIterator::create(
647 2 : &[
648 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
649 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
650 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
651 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
652 2 : ],
653 2 : &[],
654 2 : &ctx,
655 2 : );
656 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
657 2 :
658 2 : is_send(merge_iter);
659 2 : }
660 :
661 : #[cfg(feature = "testing")]
662 2 : fn is_send(_: impl Send) {}
663 : }
|