LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 95.2 % 434 413
Test Date: 2024-09-20 13:14:58 Functions: 92.7 % 41 38

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{binary_heap, BinaryHeap},
       4              : };
       5              : 
       6              : use anyhow::bail;
       7              : use pageserver_api::key::Key;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use crate::{context::RequestContext, repository::Value};
      11              : 
      12              : use super::{
      13              :     delta_layer::{DeltaLayerInner, DeltaLayerIterator},
      14              :     image_layer::{ImageLayerInner, ImageLayerIterator},
      15              : };
      16              : 
      17              : #[derive(Clone, Copy)]
      18              : enum LayerRef<'a> {
      19              :     Image(&'a ImageLayerInner),
      20              :     Delta(&'a DeltaLayerInner),
      21              : }
      22              : 
      23              : impl<'a> LayerRef<'a> {
      24         1530 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      25         1530 :         match self {
      26          108 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      27         1422 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      28              :         }
      29         1530 :     }
      30              : 
      31            0 :     fn layer_dbg_info(&self) -> String {
      32            0 :         match self {
      33            0 :             Self::Image(x) => x.layer_dbg_info(),
      34            0 :             Self::Delta(x) => x.layer_dbg_info(),
      35              :         }
      36            0 :     }
      37              : }
      38              : 
      39              : enum LayerIterRef<'a> {
      40              :     Image(ImageLayerIterator<'a>),
      41              :     Delta(DeltaLayerIterator<'a>),
      42              : }
      43              : 
      44              : impl LayerIterRef<'_> {
      45      6215070 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      46      6215070 :         match self {
      47      6213630 :             Self::Delta(x) => x.next().await,
      48         1440 :             Self::Image(x) => x.next().await,
      49              :         }
      50      6215070 :     }
      51              : 
      52            0 :     fn layer_dbg_info(&self) -> String {
      53            0 :         match self {
      54            0 :             Self::Image(x) => x.layer_dbg_info(),
      55            0 :             Self::Delta(x) => x.layer_dbg_info(),
      56              :         }
      57            0 :     }
      58              : }
      59              : 
      60              : /// This type plays several roles at once
      61              : /// 1. Unified iterator for image and delta layers.
      62              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      63              : /// 3. Lazy creation of the real delta/image iterator.
      64              : enum IteratorWrapper<'a> {
      65              :     NotLoaded {
      66              :         ctx: &'a RequestContext,
      67              :         first_key_lower_bound: (Key, Lsn),
      68              :         layer: LayerRef<'a>,
      69              :     },
      70              :     Loaded {
      71              :         iter: PeekableLayerIterRef<'a>,
      72              :     },
      73              : }
      74              : 
      75              : struct PeekableLayerIterRef<'a> {
      76              :     iter: LayerIterRef<'a>,
      77              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      78              : }
      79              : 
      80              : impl<'a> PeekableLayerIterRef<'a> {
      81         1530 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      82         1896 :         let peeked = iter.next().await?;
      83         1530 :         Ok(Self { iter, peeked })
      84         1530 :     }
      85              : 
      86     25351092 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      87     25351092 :         &self.peeked
      88     25351092 :     }
      89              : 
      90      6213540 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      91      6213540 :         let result = self.peeked.take();
      92      6213540 :         self.peeked = self.iter.next().await?;
      93      6213540 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
      94      6210480 :             if (k1, l1) < (k2, l2) {
      95            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
      96      6210480 :             }
      97         3060 :         }
      98      6213540 :         Ok(result)
      99      6213540 :     }
     100              : }
     101              : 
     102              : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
     103            0 :     fn eq(&self, other: &Self) -> bool {
     104            0 :         self.cmp(other) == Ordering::Equal
     105            0 :     }
     106              : }
     107              : 
     108              : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
     109              : 
     110              : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
     111     12687504 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     112     12687504 :         Some(self.cmp(other))
     113     12687504 :     }
     114              : }
     115              : 
     116              : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
     117     12687504 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     118              :         use std::cmp::Ordering;
     119     12687504 :         let a = self.peek_next_key_lsn_value();
     120     12687504 :         let b = other.peek_next_key_lsn_value();
     121     12687504 :         match (a, b) {
     122      9064088 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     123     18128176 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     124     18102982 :                     match val {
     125        25194 :                         None => 0,
     126     18101812 :                         Some(Value::Image(_)) => 1,
     127         1170 :                         Some(Value::WalRecord(_)) => 2,
     128              :                     }
     129     18128176 :                 }
     130      9064088 :                 let order_1 = map_value_to_num(&v1);
     131      9064088 :                 let order_2 = map_value_to_num(&v2);
     132      9064088 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     133      9064088 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     134      9064088 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     135              :             }
     136      3008484 :             (Some(_), None) => Ordering::Less,
     137         3841 :             (None, Some(_)) => Ordering::Greater,
     138       611091 :             (None, None) => Ordering::Equal,
     139              :         }
     140     12687504 :         .reverse()
     141     12687504 :     }
     142              : }
     143              : 
     144              : impl<'a> IteratorWrapper<'a> {
     145          108 :     pub fn create_from_image_layer(
     146          108 :         image_layer: &'a ImageLayerInner,
     147          108 :         ctx: &'a RequestContext,
     148          108 :     ) -> Self {
     149          108 :         Self::NotLoaded {
     150          108 :             layer: LayerRef::Image(image_layer),
     151          108 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     152          108 :             ctx,
     153          108 :         }
     154          108 :     }
     155              : 
     156         1422 :     pub fn create_from_delta_layer(
     157         1422 :         delta_layer: &'a DeltaLayerInner,
     158         1422 :         ctx: &'a RequestContext,
     159         1422 :     ) -> Self {
     160         1422 :         Self::NotLoaded {
     161         1422 :             layer: LayerRef::Delta(delta_layer),
     162         1422 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     163         1422 :             ctx,
     164         1422 :         }
     165         1422 :     }
     166              : 
     167     25375008 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     168     25375008 :         match self {
     169     25349562 :             Self::Loaded { iter } => iter
     170     25349562 :                 .peek()
     171     25349562 :                 .as_ref()
     172     25349562 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     173        25446 :             Self::NotLoaded {
     174        25446 :                 first_key_lower_bound: (key, lsn),
     175        25446 :                 ..
     176        25446 :             } => Some((key, *lsn, None)),
     177              :         }
     178     25375008 :     }
     179              : 
     180              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     181              :     //
     182              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     183              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     184              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     185              :     // and not just `PeekMut::deref`
     186              :     // If we don't take `&mut self`
     187         1530 :     async fn load(&mut self) -> anyhow::Result<()> {
     188         1530 :         assert!(!self.is_loaded());
     189         1530 :         let Self::NotLoaded {
     190         1530 :             ctx,
     191         1530 :             first_key_lower_bound,
     192         1530 :             layer,
     193         1530 :         } = self
     194              :         else {
     195            0 :             unreachable!()
     196              :         };
     197         1530 :         let iter = layer.iter(ctx);
     198         1896 :         let iter = PeekableLayerIterRef::create(iter).await?;
     199         1530 :         if let Some((k1, l1, _)) = iter.peek() {
     200         1530 :             let (k2, l2) = first_key_lower_bound;
     201         1530 :             if (k1, l1) < (k2, l2) {
     202            0 :                 bail!(
     203            0 :                     "layer key range did not include the first key in the layer: {}",
     204            0 :                     layer.layer_dbg_info()
     205            0 :                 );
     206         1530 :             }
     207            0 :         }
     208         1530 :         *self = Self::Loaded { iter };
     209         1530 :         Ok(())
     210         1530 :     }
     211              : 
     212      6216600 :     fn is_loaded(&self) -> bool {
     213      6216600 :         matches!(self, Self::Loaded { .. })
     214      6216600 :     }
     215              : 
     216              :     /// Correctness: must load the iterator before using.
     217              :     ///
     218              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     219              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     220              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     221      6213540 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     222      6213540 :         let Self::Loaded { iter } = self else {
     223            0 :             panic!("must load the iterator before using")
     224              :         };
     225      6213540 :         iter.next().await
     226      6213540 :     }
     227              : }
     228              : 
     229              : /// A merge iterator over delta/image layer iterators.
     230              : ///
     231              : /// When duplicated records are found, the iterator will not perform any
     232              : /// deduplication, and the caller should handle these situation. By saying
     233              : /// duplicated records, there are many possibilities:
     234              : ///
     235              : /// * Two same delta at the same LSN.
     236              : /// * Two same image at the same LSN.
     237              : /// * Delta/image at the same LSN where the image has already applied the delta.
     238              : ///
     239              : /// The iterator will always put the image before the delta.
     240              : pub struct MergeIterator<'a> {
     241              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     242              : }
     243              : 
     244              : impl<'a> MergeIterator<'a> {
     245          186 :     pub fn create(
     246          186 :         deltas: &[&'a DeltaLayerInner],
     247          186 :         images: &[&'a ImageLayerInner],
     248          186 :         ctx: &'a RequestContext,
     249          186 :     ) -> Self {
     250          186 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     251          294 :         for image in images {
     252          108 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     253          108 :         }
     254         1608 :         for delta in deltas {
     255         1422 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     256         1422 :         }
     257          186 :         Self {
     258          186 :             heap: BinaryHeap::from(heap),
     259          186 :         }
     260          186 :     }
     261              : 
     262      6212196 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     263      6215256 :         while let Some(mut iter) = self.heap.peek_mut() {
     264      6215070 :             if !iter.is_loaded() {
     265              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     266              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     267              :                 // [potential_first_key, loaded_first_key).
     268         1896 :                 iter.load().await?;
     269         1530 :                 continue;
     270      6213540 :             }
     271      6213540 :             let Some(item) = iter.next().await? else {
     272              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     273              :                 // we order None > Some, and all the rest of the iterators should return None.
     274         1530 :                 binary_heap::PeekMut::pop(iter);
     275         1530 :                 continue;
     276              :             };
     277      6212010 :             return Ok(Some(item));
     278              :         }
     279          186 :         Ok(None)
     280      6212196 :     }
     281              : }
     282              : 
     283              : #[cfg(test)]
     284              : mod tests {
     285              :     use super::*;
     286              : 
     287              :     use itertools::Itertools;
     288              :     use pageserver_api::key::Key;
     289              :     use utils::lsn::Lsn;
     290              : 
     291              :     use crate::{
     292              :         tenant::{
     293              :             harness::{TenantHarness, TIMELINE_ID},
     294              :             storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
     295              :         },
     296              :         walrecord::NeonWalRecord,
     297              :         DEFAULT_PG_VERSION,
     298              :     };
     299              : 
     300           24 :     async fn assert_merge_iter_equal(
     301           24 :         merge_iter: &mut MergeIterator<'_>,
     302           24 :         expect: &[(Key, Lsn, Value)],
     303           24 :     ) {
     304           24 :         let mut expect_iter = expect.iter();
     305              :         loop {
     306        18240 :             let o1 = merge_iter.next().await.unwrap();
     307        18240 :             let o2 = expect_iter.next();
     308        18240 :             assert_eq!(o1.is_some(), o2.is_some());
     309        18240 :             if o1.is_none() && o2.is_none() {
     310           24 :                 break;
     311        18216 :             }
     312        18216 :             let (k1, l1, v1) = o1.unwrap();
     313        18216 :             let (k2, l2, v2) = o2.unwrap();
     314        18216 :             assert_eq!(&k1, k2);
     315        18216 :             assert_eq!(l1, *l2);
     316        18216 :             assert_eq!(&v1, v2);
     317              :         }
     318           24 :     }
     319              : 
     320              :     #[tokio::test]
     321            6 :     async fn merge_in_between() {
     322            6 :         use crate::repository::Value;
     323            6 :         use bytes::Bytes;
     324            6 : 
     325            6 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     326            6 :             .await
     327            6 :             .unwrap();
     328           24 :         let (tenant, ctx) = harness.load().await;
     329            6 : 
     330            6 :         let tline = tenant
     331            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     332           12 :             .await
     333            6 :             .unwrap();
     334            6 : 
     335           24 :         fn get_key(id: u32) -> Key {
     336           24 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     337           24 :             key.field6 = id;
     338           24 :             key
     339           24 :         }
     340            6 :         let test_deltas1 = vec![
     341            6 :             (
     342            6 :                 get_key(0),
     343            6 :                 Lsn(0x10),
     344            6 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     345            6 :             ),
     346            6 :             (
     347            6 :                 get_key(5),
     348            6 :                 Lsn(0x10),
     349            6 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     350            6 :             ),
     351            6 :         ];
     352            6 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     353           18 :             .await
     354            6 :             .unwrap();
     355            6 :         let test_deltas2 = vec![
     356            6 :             (
     357            6 :                 get_key(3),
     358            6 :                 Lsn(0x10),
     359            6 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     360            6 :             ),
     361            6 :             (
     362            6 :                 get_key(4),
     363            6 :                 Lsn(0x10),
     364            6 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     365            6 :             ),
     366            6 :         ];
     367            6 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     368           18 :             .await
     369            6 :             .unwrap();
     370            6 :         let mut merge_iter = MergeIterator::create(
     371            6 :             &[
     372            6 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     373            6 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     374            6 :             ],
     375            6 :             &[],
     376            6 :             &ctx,
     377            6 :         );
     378            6 :         let mut expect = Vec::new();
     379            6 :         expect.extend(test_deltas1);
     380            6 :         expect.extend(test_deltas2);
     381            6 :         expect.sort_by(sort_delta);
     382           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     383            6 :     }
     384              : 
     385              :     #[tokio::test]
     386            6 :     async fn delta_merge() {
     387            6 :         use crate::repository::Value;
     388            6 :         use bytes::Bytes;
     389            6 : 
     390            6 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     391            6 :             .await
     392            6 :             .unwrap();
     393           24 :         let (tenant, ctx) = harness.load().await;
     394            6 : 
     395            6 :         let tline = tenant
     396            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     397           12 :             .await
     398            6 :             .unwrap();
     399            6 : 
     400        18000 :         fn get_key(id: u32) -> Key {
     401        18000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     402        18000 :             key.field6 = id;
     403        18000 :             key
     404        18000 :         }
     405            6 :         const N: usize = 1000;
     406            6 :         let test_deltas1 = (0..N)
     407         6000 :             .map(|idx| {
     408         6000 :                 (
     409         6000 :                     get_key(idx as u32 / 10),
     410         6000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     411         6000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     412         6000 :                 )
     413         6000 :             })
     414            6 :             .collect_vec();
     415            6 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     416           24 :             .await
     417            6 :             .unwrap();
     418            6 :         let test_deltas2 = (0..N)
     419         6000 :             .map(|idx| {
     420         6000 :                 (
     421         6000 :                     get_key(idx as u32 / 10),
     422         6000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     423         6000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     424         6000 :                 )
     425         6000 :             })
     426            6 :             .collect_vec();
     427            6 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     428           24 :             .await
     429            6 :             .unwrap();
     430            6 :         let test_deltas3 = (0..N)
     431         6000 :             .map(|idx| {
     432         6000 :                 (
     433         6000 :                     get_key(idx as u32 / 10 + N as u32),
     434         6000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     435         6000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     436         6000 :                 )
     437         6000 :             })
     438            6 :             .collect_vec();
     439            6 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     440           24 :             .await
     441            6 :             .unwrap();
     442            6 :         let mut merge_iter = MergeIterator::create(
     443            6 :             &[
     444            6 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     445            6 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     446            6 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     447            6 :             ],
     448            6 :             &[],
     449            6 :             &ctx,
     450            6 :         );
     451            6 :         let mut expect = Vec::new();
     452            6 :         expect.extend(test_deltas1);
     453            6 :         expect.extend(test_deltas2);
     454            6 :         expect.extend(test_deltas3);
     455            6 :         expect.sort_by(sort_delta);
     456           36 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     457            6 : 
     458            6 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     459            6 :     }
     460              : 
     461              :     #[tokio::test]
     462            6 :     async fn delta_image_mixed_merge() {
     463            6 :         use crate::repository::Value;
     464            6 :         use bytes::Bytes;
     465            6 : 
     466            6 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     467            6 :             .await
     468            6 :             .unwrap();
     469           24 :         let (tenant, ctx) = harness.load().await;
     470            6 : 
     471            6 :         let tline = tenant
     472            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     473           12 :             .await
     474            6 :             .unwrap();
     475            6 : 
     476           54 :         fn get_key(id: u32) -> Key {
     477           54 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     478           54 :             key.field6 = id;
     479           54 :             key
     480           54 :         }
     481            6 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     482            6 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     483            6 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     484            6 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     485            6 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     486            6 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     487            6 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     488            6 :         // will handle duplicated LSNs.
     489            6 :         let test_deltas1 = vec![
     490            6 :             (
     491            6 :                 get_key(0),
     492            6 :                 Lsn(0x10),
     493            6 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     494            6 :             ),
     495            6 :             (
     496            6 :                 get_key(0),
     497            6 :                 Lsn(0x18),
     498            6 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     499            6 :             ),
     500            6 :             (
     501            6 :                 get_key(5),
     502            6 :                 Lsn(0x10),
     503            6 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     504            6 :             ),
     505            6 :             (
     506            6 :                 get_key(5),
     507            6 :                 Lsn(0x18),
     508            6 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     509            6 :             ),
     510            6 :         ];
     511            6 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     512           18 :             .await
     513            6 :             .unwrap();
     514            6 :         let mut test_deltas2 = test_deltas1.clone();
     515            6 :         test_deltas2.push((
     516            6 :             get_key(10),
     517            6 :             Lsn(0x20),
     518            6 :             Value::Image(Bytes::copy_from_slice(b"test")),
     519            6 :         ));
     520            6 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     521           18 :             .await
     522            6 :             .unwrap();
     523            6 :         let test_deltas3 = vec![
     524            6 :             (
     525            6 :                 get_key(0),
     526            6 :                 Lsn(0x10),
     527            6 :                 Value::Image(Bytes::copy_from_slice(b"")),
     528            6 :             ),
     529            6 :             (
     530            6 :                 get_key(5),
     531            6 :                 Lsn(0x18),
     532            6 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     533            6 :             ),
     534            6 :             (
     535            6 :                 get_key(15),
     536            6 :                 Lsn(0x20),
     537            6 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     538            6 :             ),
     539            6 :         ];
     540            6 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     541           18 :             .await
     542            6 :             .unwrap();
     543            6 :         let mut test_deltas4 = test_deltas3.clone();
     544            6 :         test_deltas4.push((
     545            6 :             get_key(20),
     546            6 :             Lsn(0x20),
     547            6 :             Value::Image(Bytes::copy_from_slice(b"test")),
     548            6 :         ));
     549            6 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     550           18 :             .await
     551            6 :             .unwrap();
     552            6 :         let mut expect = Vec::new();
     553            6 :         expect.extend(test_deltas1);
     554            6 :         expect.extend(test_deltas2);
     555            6 :         expect.extend(test_deltas3);
     556            6 :         expect.extend(test_deltas4);
     557            6 :         expect.sort_by(sort_delta_value);
     558            6 : 
     559            6 :         // Test with different layer order for MergeIterator::create to ensure the order
     560            6 :         // is stable.
     561            6 : 
     562            6 :         let mut merge_iter = MergeIterator::create(
     563            6 :             &[
     564            6 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     565            6 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     566            6 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     567            6 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     568            6 :             ],
     569            6 :             &[],
     570            6 :             &ctx,
     571            6 :         );
     572           24 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     573            6 : 
     574            6 :         let mut merge_iter = MergeIterator::create(
     575            6 :             &[
     576            6 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     577            6 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     578            6 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     579            6 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     580            6 :             ],
     581            6 :             &[],
     582            6 :             &ctx,
     583            6 :         );
     584           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     585            6 : 
     586            6 :         is_send(merge_iter);
     587            6 :     }
     588              : 
     589            6 :     fn is_send(_: impl Send) {}
     590              : }
        

Generated by: LCOV version 2.1-beta