LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 98.6 % 416 410
Test Date: 2024-08-02 21:34:27 Functions: 97.4 % 39 38

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{binary_heap, BinaryHeap},
       4              : };
       5              : 
       6              : use pageserver_api::key::Key;
       7              : use utils::lsn::Lsn;
       8              : 
       9              : use crate::{context::RequestContext, repository::Value};
      10              : 
      11              : use super::{
      12              :     delta_layer::{DeltaLayerInner, DeltaLayerIterator},
      13              :     image_layer::{ImageLayerInner, ImageLayerIterator},
      14              : };
      15              : 
      16              : #[derive(Clone, Copy)]
      17              : enum LayerRef<'a> {
      18              :     Image(&'a ImageLayerInner),
      19              :     Delta(&'a DeltaLayerInner),
      20              : }
      21              : 
      22              : impl<'a> LayerRef<'a> {
      23          500 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      24          500 :         match self {
      25           22 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      26          478 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      27              :         }
      28          500 :     }
      29              : }
      30              : 
      31              : enum LayerIterRef<'a> {
      32              :     Image(ImageLayerIterator<'a>),
      33              :     Delta(DeltaLayerIterator<'a>),
      34              : }
      35              : 
      36              : impl LayerIterRef<'_> {
      37      2071500 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      38      2071500 :         match self {
      39      2071168 :             Self::Delta(x) => x.next().await,
      40          332 :             Self::Image(x) => x.next().await,
      41              :         }
      42      2071500 :     }
      43              : }
      44              : 
      45              : /// This type plays several roles at once
      46              : /// 1. Unified iterator for image and delta layers.
      47              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      48              : /// 3. Lazy creation of the real delta/image iterator.
      49              : enum IteratorWrapper<'a> {
      50              :     NotLoaded {
      51              :         ctx: &'a RequestContext,
      52              :         first_key_lower_bound: (Key, Lsn),
      53              :         layer: LayerRef<'a>,
      54              :     },
      55              :     Loaded {
      56              :         iter: PeekableLayerIterRef<'a>,
      57              :     },
      58              : }
      59              : 
      60              : struct PeekableLayerIterRef<'a> {
      61              :     iter: LayerIterRef<'a>,
      62              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      63              : }
      64              : 
      65              : impl<'a> PeekableLayerIterRef<'a> {
      66          500 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      67          611 :         let peeked = iter.next().await?;
      68          500 :         Ok(Self { iter, peeked })
      69          500 :     }
      70              : 
      71      8450216 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      72      8450216 :         &self.peeked
      73      8450216 :     }
      74              : 
      75      2071000 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      76      2071000 :         let result = self.peeked.take();
      77      2071000 :         self.peeked = self.iter.next().await?;
      78      2071000 :         Ok(result)
      79      2071000 :     }
      80              : }
      81              : 
      82              : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
      83            0 :     fn eq(&self, other: &Self) -> bool {
      84            0 :         self.cmp(other) == Ordering::Equal
      85            0 :     }
      86              : }
      87              : 
      88              : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
      89              : 
      90              : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
      91      4229131 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      92      4229131 :         Some(self.cmp(other))
      93      4229131 :     }
      94              : }
      95              : 
      96              : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
      97      4229131 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      98      4229131 :         use std::cmp::Ordering;
      99      4229131 :         let a = self.peek_next_key_lsn_value();
     100      4229131 :         let b = other.peek_next_key_lsn_value();
     101      4229131 :         match (a, b) {
     102      3021349 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     103      6042698 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     104      6034242 :                     match val {
     105      3021349 :                         None => 0,
     106      6033910 :                         Some(Value::Image(_)) => 1,
     107      3021349 :                         Some(Value::WalRecord(_)) => 2,
     108      3021349 :                     }
     109      6042698 :                 }
     110      3021349 :                 let order_1 = map_value_to_num(&v1);
     111      3021349 :                 let order_2 = map_value_to_num(&v2);
     112      3021349 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     113      3021349 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     114      3021349 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     115              :             }
     116      1002791 :             (Some(_), None) => Ordering::Less,
     117         1294 :             (None, Some(_)) => Ordering::Greater,
     118       203697 :             (None, None) => Ordering::Equal,
     119              :         }
     120      4229131 :         .reverse()
     121      4229131 :     }
     122              : }
     123              : 
     124              : impl<'a> IteratorWrapper<'a> {
     125           22 :     pub fn create_from_image_layer(
     126           22 :         image_layer: &'a ImageLayerInner,
     127           22 :         ctx: &'a RequestContext,
     128           22 :     ) -> Self {
     129           22 :         Self::NotLoaded {
     130           22 :             layer: LayerRef::Image(image_layer),
     131           22 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     132           22 :             ctx,
     133           22 :         }
     134           22 :     }
     135              : 
     136          478 :     pub fn create_from_delta_layer(
     137          478 :         delta_layer: &'a DeltaLayerInner,
     138          478 :         ctx: &'a RequestContext,
     139          478 :     ) -> Self {
     140          478 :         Self::NotLoaded {
     141          478 :             layer: LayerRef::Delta(delta_layer),
     142          478 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     143          478 :             ctx,
     144          478 :         }
     145          478 :     }
     146              : 
     147      8458262 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     148      8458262 :         match self {
     149      8449716 :             Self::Loaded { iter } => iter
     150      8449716 :                 .peek()
     151      8449716 :                 .as_ref()
     152      8449716 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     153         8546 :             Self::NotLoaded {
     154         8546 :                 first_key_lower_bound: (key, lsn),
     155         8546 :                 ..
     156         8546 :             } => Some((key, *lsn, None)),
     157              :         }
     158      8458262 :     }
     159              : 
     160              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     161              :     //
     162              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     163              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     164              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     165              :     // and not just `PeekMut::deref`
     166              :     // If we don't take `&mut self`
     167          500 :     async fn load(&mut self) -> anyhow::Result<()> {
     168          500 :         assert!(!self.is_loaded());
     169          500 :         let Self::NotLoaded {
     170          500 :             ctx,
     171          500 :             first_key_lower_bound,
     172          500 :             layer,
     173          500 :         } = self
     174              :         else {
     175            0 :             unreachable!()
     176              :         };
     177          500 :         let iter = layer.iter(ctx);
     178          611 :         let iter = PeekableLayerIterRef::create(iter).await?;
     179          500 :         if let Some((k1, l1, _)) = iter.peek() {
     180          500 :             let (k2, l2) = first_key_lower_bound;
     181          500 :             debug_assert!((k1, l1) >= (k2, l2));
     182            0 :         }
     183          500 :         *self = Self::Loaded { iter };
     184          500 :         Ok(())
     185          500 :     }
     186              : 
     187      2072000 :     fn is_loaded(&self) -> bool {
     188      2072000 :         matches!(self, Self::Loaded { .. })
     189      2072000 :     }
     190              : 
     191              :     /// Correctness: must load the iterator before using.
     192              :     ///
     193              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     194              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     195              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     196      2071000 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     197      2071000 :         let Self::Loaded { iter } = self else {
     198            0 :             panic!("must load the iterator before using")
     199              :         };
     200      2071000 :         iter.next().await
     201      2071000 :     }
     202              : }
     203              : 
     204              : /// A merge iterator over delta/image layer iterators. When duplicated records are
     205              : /// found, the iterator will not perform any deduplication, and the caller should handle
     206              : /// these situation. By saying duplicated records, there are many possibilities:
     207              : ///
     208              : /// * Two same delta at the same LSN.
     209              : /// * Two same image at the same LSN.
     210              : /// * Delta/image at the same LSN where the image has already applied the delta.
     211              : ///
     212              : /// The iterator will always put the image before the delta.
     213              : pub struct MergeIterator<'a> {
     214              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     215              : }
     216              : 
     217              : impl<'a> MergeIterator<'a> {
     218           54 :     pub fn create(
     219           54 :         deltas: &[&'a DeltaLayerInner],
     220           54 :         images: &[&'a ImageLayerInner],
     221           54 :         ctx: &'a RequestContext,
     222           54 :     ) -> Self {
     223           54 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     224           76 :         for image in images {
     225           22 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     226           22 :         }
     227          532 :         for delta in deltas {
     228          478 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     229          478 :         }
     230           54 :         Self {
     231           54 :             heap: BinaryHeap::from(heap),
     232           54 :         }
     233           54 :     }
     234              : 
     235      2070554 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     236      2071554 :         while let Some(mut iter) = self.heap.peek_mut() {
     237      2071500 :             if !iter.is_loaded() {
     238              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     239              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     240              :                 // [potential_first_key, loaded_first_key).
     241          611 :                 iter.load().await?;
     242          500 :                 continue;
     243      2071000 :             }
     244      2071000 :             let Some(item) = iter.next().await? else {
     245              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     246              :                 // we order None > Some, and all the rest of the iterators should return None.
     247          500 :                 binary_heap::PeekMut::pop(iter);
     248          500 :                 continue;
     249              :             };
     250      2070500 :             return Ok(Some(item));
     251              :         }
     252           54 :         Ok(None)
     253      2070554 :     }
     254              : }
     255              : 
     256              : #[cfg(test)]
     257              : mod tests {
     258              :     use super::*;
     259              : 
     260              :     use itertools::Itertools;
     261              :     use pageserver_api::key::Key;
     262              :     use utils::lsn::Lsn;
     263              : 
     264              :     use crate::{
     265              :         tenant::{
     266              :             harness::{TenantHarness, TIMELINE_ID},
     267              :             storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
     268              :         },
     269              :         walrecord::NeonWalRecord,
     270              :         DEFAULT_PG_VERSION,
     271              :     };
     272              : 
     273            8 :     async fn assert_merge_iter_equal(
     274            8 :         merge_iter: &mut MergeIterator<'_>,
     275            8 :         expect: &[(Key, Lsn, Value)],
     276            8 :     ) {
     277            8 :         let mut expect_iter = expect.iter();
     278              :         loop {
     279         6080 :             let o1 = merge_iter.next().await.unwrap();
     280         6080 :             let o2 = expect_iter.next();
     281         6080 :             assert_eq!(o1.is_some(), o2.is_some());
     282         6080 :             if o1.is_none() && o2.is_none() {
     283            8 :                 break;
     284         6072 :             }
     285         6072 :             let (k1, l1, v1) = o1.unwrap();
     286         6072 :             let (k2, l2, v2) = o2.unwrap();
     287         6072 :             assert_eq!(&k1, k2);
     288         6072 :             assert_eq!(l1, *l2);
     289         6072 :             assert_eq!(&v1, v2);
     290              :         }
     291            8 :     }
     292              : 
     293              :     #[tokio::test]
     294            2 :     async fn merge_in_between() {
     295            2 :         use crate::repository::Value;
     296            2 :         use bytes::Bytes;
     297            2 : 
     298            2 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     299            2 :             .await
     300            2 :             .unwrap();
     301            8 :         let (tenant, ctx) = harness.load().await;
     302            2 : 
     303            2 :         let tline = tenant
     304            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     305            4 :             .await
     306            2 :             .unwrap();
     307            2 : 
     308            8 :         fn get_key(id: u32) -> Key {
     309            8 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     310            8 :             key.field6 = id;
     311            8 :             key
     312            8 :         }
     313            2 :         let test_deltas1 = vec![
     314            2 :             (
     315            2 :                 get_key(0),
     316            2 :                 Lsn(0x10),
     317            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     318            2 :             ),
     319            2 :             (
     320            2 :                 get_key(5),
     321            2 :                 Lsn(0x10),
     322            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     323            2 :             ),
     324            2 :         ];
     325            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     326            6 :             .await
     327            2 :             .unwrap();
     328            2 :         let test_deltas2 = vec![
     329            2 :             (
     330            2 :                 get_key(3),
     331            2 :                 Lsn(0x10),
     332            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     333            2 :             ),
     334            2 :             (
     335            2 :                 get_key(4),
     336            2 :                 Lsn(0x10),
     337            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     338            2 :             ),
     339            2 :         ];
     340            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     341            6 :             .await
     342            2 :             .unwrap();
     343            2 :         let mut merge_iter = MergeIterator::create(
     344            2 :             &[
     345            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     346            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     347            2 :             ],
     348            2 :             &[],
     349            2 :             &ctx,
     350            2 :         );
     351            2 :         let mut expect = Vec::new();
     352            2 :         expect.extend(test_deltas1);
     353            2 :         expect.extend(test_deltas2);
     354            2 :         expect.sort_by(sort_delta);
     355            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     356            2 :     }
     357              : 
     358              :     #[tokio::test]
     359            2 :     async fn delta_merge() {
     360            2 :         use crate::repository::Value;
     361            2 :         use bytes::Bytes;
     362            2 : 
     363            2 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     364            2 :             .await
     365            2 :             .unwrap();
     366            8 :         let (tenant, ctx) = harness.load().await;
     367            2 : 
     368            2 :         let tline = tenant
     369            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     370            4 :             .await
     371            2 :             .unwrap();
     372            2 : 
     373         6000 :         fn get_key(id: u32) -> Key {
     374         6000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     375         6000 :             key.field6 = id;
     376         6000 :             key
     377         6000 :         }
     378            2 :         const N: usize = 1000;
     379            2 :         let test_deltas1 = (0..N)
     380         2000 :             .map(|idx| {
     381         2000 :                 (
     382         2000 :                     get_key(idx as u32 / 10),
     383         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     384         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     385         2000 :                 )
     386         2000 :             })
     387            2 :             .collect_vec();
     388            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     389            8 :             .await
     390            2 :             .unwrap();
     391            2 :         let test_deltas2 = (0..N)
     392         2000 :             .map(|idx| {
     393         2000 :                 (
     394         2000 :                     get_key(idx as u32 / 10),
     395         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     396         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     397         2000 :                 )
     398         2000 :             })
     399            2 :             .collect_vec();
     400            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     401            8 :             .await
     402            2 :             .unwrap();
     403            2 :         let test_deltas3 = (0..N)
     404         2000 :             .map(|idx| {
     405         2000 :                 (
     406         2000 :                     get_key(idx as u32 / 10 + N as u32),
     407         2000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     408         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     409         2000 :                 )
     410         2000 :             })
     411            2 :             .collect_vec();
     412            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     413            8 :             .await
     414            2 :             .unwrap();
     415            2 :         let mut merge_iter = MergeIterator::create(
     416            2 :             &[
     417            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     418            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     419            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     420            2 :             ],
     421            2 :             &[],
     422            2 :             &ctx,
     423            2 :         );
     424            2 :         let mut expect = Vec::new();
     425            2 :         expect.extend(test_deltas1);
     426            2 :         expect.extend(test_deltas2);
     427            2 :         expect.extend(test_deltas3);
     428            2 :         expect.sort_by(sort_delta);
     429           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     430            2 : 
     431            2 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     432            2 :     }
     433              : 
     434              :     #[tokio::test]
     435            2 :     async fn delta_image_mixed_merge() {
     436            2 :         use crate::repository::Value;
     437            2 :         use bytes::Bytes;
     438            2 : 
     439            2 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     440            2 :             .await
     441            2 :             .unwrap();
     442            8 :         let (tenant, ctx) = harness.load().await;
     443            2 : 
     444            2 :         let tline = tenant
     445            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     446            4 :             .await
     447            2 :             .unwrap();
     448            2 : 
     449           18 :         fn get_key(id: u32) -> Key {
     450           18 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     451           18 :             key.field6 = id;
     452           18 :             key
     453           18 :         }
     454            2 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     455            2 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     456            2 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     457            2 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     458            2 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     459            2 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     460            2 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     461            2 :         // will handle duplicated LSNs.
     462            2 :         let test_deltas1 = vec![
     463            2 :             (
     464            2 :                 get_key(0),
     465            2 :                 Lsn(0x10),
     466            2 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     467            2 :             ),
     468            2 :             (
     469            2 :                 get_key(0),
     470            2 :                 Lsn(0x18),
     471            2 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     472            2 :             ),
     473            2 :             (
     474            2 :                 get_key(5),
     475            2 :                 Lsn(0x10),
     476            2 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     477            2 :             ),
     478            2 :             (
     479            2 :                 get_key(5),
     480            2 :                 Lsn(0x18),
     481            2 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     482            2 :             ),
     483            2 :         ];
     484            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     485            6 :             .await
     486            2 :             .unwrap();
     487            2 :         let mut test_deltas2 = test_deltas1.clone();
     488            2 :         test_deltas2.push((
     489            2 :             get_key(10),
     490            2 :             Lsn(0x20),
     491            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     492            2 :         ));
     493            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     494            6 :             .await
     495            2 :             .unwrap();
     496            2 :         let test_deltas3 = vec![
     497            2 :             (
     498            2 :                 get_key(0),
     499            2 :                 Lsn(0x10),
     500            2 :                 Value::Image(Bytes::copy_from_slice(b"")),
     501            2 :             ),
     502            2 :             (
     503            2 :                 get_key(5),
     504            2 :                 Lsn(0x18),
     505            2 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     506            2 :             ),
     507            2 :             (
     508            2 :                 get_key(15),
     509            2 :                 Lsn(0x20),
     510            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     511            2 :             ),
     512            2 :         ];
     513            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     514            6 :             .await
     515            2 :             .unwrap();
     516            2 :         let mut test_deltas4 = test_deltas3.clone();
     517            2 :         test_deltas4.push((
     518            2 :             get_key(20),
     519            2 :             Lsn(0x20),
     520            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     521            2 :         ));
     522            2 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     523            6 :             .await
     524            2 :             .unwrap();
     525            2 :         let mut expect = Vec::new();
     526            2 :         expect.extend(test_deltas1);
     527            2 :         expect.extend(test_deltas2);
     528            2 :         expect.extend(test_deltas3);
     529            2 :         expect.extend(test_deltas4);
     530            2 :         expect.sort_by(sort_delta_value);
     531            2 : 
     532            2 :         // Test with different layer order for MergeIterator::create to ensure the order
     533            2 :         // is stable.
     534            2 : 
     535            2 :         let mut merge_iter = MergeIterator::create(
     536            2 :             &[
     537            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     538            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     539            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     540            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     541            2 :             ],
     542            2 :             &[],
     543            2 :             &ctx,
     544            2 :         );
     545            8 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     546            2 : 
     547            2 :         let mut merge_iter = MergeIterator::create(
     548            2 :             &[
     549            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     550            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     551            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     552            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     553            2 :             ],
     554            2 :             &[],
     555            2 :             &ctx,
     556            2 :         );
     557            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     558            2 : 
     559            2 :         is_send(merge_iter);
     560            2 :     }
     561              : 
     562            2 :     fn is_send(_: impl Send) {}
     563              : }
        

Generated by: LCOV version 2.1-beta