LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 98.6 % 416 410
Test Date: 2024-07-21 16:16:09 Functions: 97.4 % 39 38

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{binary_heap, BinaryHeap},
       4              : };
       5              : 
       6              : use pageserver_api::key::Key;
       7              : use utils::lsn::Lsn;
       8              : 
       9              : use crate::{context::RequestContext, repository::Value};
      10              : 
      11              : use super::{
      12              :     delta_layer::{DeltaLayerInner, DeltaLayerIterator},
      13              :     image_layer::{ImageLayerInner, ImageLayerIterator},
      14              : };
      15              : 
      16              : #[derive(Clone, Copy)]
      17              : enum LayerRef<'a> {
      18              :     Image(&'a ImageLayerInner),
      19              :     Delta(&'a DeltaLayerInner),
      20              : }
      21              : 
      22              : impl<'a> LayerRef<'a> {
      23           42 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      24           42 :         match self {
      25            8 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      26           34 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      27              :         }
      28           42 :     }
      29              : }
      30              : 
      31              : enum LayerIterRef<'a> {
      32              :     Image(ImageLayerIterator<'a>),
      33              :     Delta(DeltaLayerIterator<'a>),
      34              : }
      35              : 
      36              : impl LayerIterRef<'_> {
      37         6252 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      38         6252 :         match self {
      39         6164 :             Self::Delta(x) => x.next().await,
      40           88 :             Self::Image(x) => x.next().await,
      41              :         }
      42         6252 :     }
      43              : }
      44              : 
      45              : /// This type plays several roles at once
      46              : /// 1. Unified iterator for image and delta layers.
      47              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      48              : /// 3. Lazy creation of the real delta/image iterator.
      49              : enum IteratorWrapper<'a> {
      50              :     NotLoaded {
      51              :         ctx: &'a RequestContext,
      52              :         first_key_lower_bound: (Key, Lsn),
      53              :         layer: LayerRef<'a>,
      54              :     },
      55              :     Loaded {
      56              :         iter: PeekableLayerIterRef<'a>,
      57              :     },
      58              : }
      59              : 
      60              : struct PeekableLayerIterRef<'a> {
      61              :     iter: LayerIterRef<'a>,
      62              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      63              : }
      64              : 
      65              : impl<'a> PeekableLayerIterRef<'a> {
      66           42 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      67           42 :         let peeked = iter.next().await?;
      68           42 :         Ok(Self { iter, peeked })
      69           42 :     }
      70              : 
      71        20812 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      72        20812 :         &self.peeked
      73        20812 :     }
      74              : 
      75         6210 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      76         6210 :         let result = self.peeked.take();
      77         6210 :         self.peeked = self.iter.next().await?;
      78         6210 :         Ok(result)
      79         6210 :     }
      80              : }
      81              : 
      82              : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
      83            0 :     fn eq(&self, other: &Self) -> bool {
      84            0 :         self.cmp(other) == Ordering::Equal
      85            0 :     }
      86              : }
      87              : 
      88              : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
      89              : 
      90              : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
      91        12532 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
      92        12532 :         Some(self.cmp(other))
      93        12532 :     }
      94              : }
      95              : 
      96              : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
      97        12532 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
      98        12532 :         use std::cmp::Ordering;
      99        12532 :         let a = self.peek_next_key_lsn_value();
     100        12532 :         let b = other.peek_next_key_lsn_value();
     101        12532 :         match (a, b) {
     102         8368 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     103        16736 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     104        12460 :                     match val {
     105         8368 :                         None => 0,
     106        12290 :                         Some(Value::Image(_)) => 1,
     107         8368 :                         Some(Value::WalRecord(_)) => 2,
     108         8368 :                     }
     109        16736 :                 }
     110         8368 :                 let order_1 = map_value_to_num(&v1);
     111         8368 :                 let order_2 = map_value_to_num(&v2);
     112         8368 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     113         8368 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     114         8368 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     115              :             }
     116         2060 :             (Some(_), None) => Ordering::Less,
     117           40 :             (None, Some(_)) => Ordering::Greater,
     118         2064 :             (None, None) => Ordering::Equal,
     119              :         }
     120        12532 :         .reverse()
     121        12532 :     }
     122              : }
     123              : 
     124              : impl<'a> IteratorWrapper<'a> {
     125            8 :     pub fn create_from_image_layer(
     126            8 :         image_layer: &'a ImageLayerInner,
     127            8 :         ctx: &'a RequestContext,
     128            8 :     ) -> Self {
     129            8 :         Self::NotLoaded {
     130            8 :             layer: LayerRef::Image(image_layer),
     131            8 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     132            8 :             ctx,
     133            8 :         }
     134            8 :     }
     135              : 
     136           34 :     pub fn create_from_delta_layer(
     137           34 :         delta_layer: &'a DeltaLayerInner,
     138           34 :         ctx: &'a RequestContext,
     139           34 :     ) -> Self {
     140           34 :         Self::NotLoaded {
     141           34 :             layer: LayerRef::Delta(delta_layer),
     142           34 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     143           34 :             ctx,
     144           34 :         }
     145           34 :     }
     146              : 
     147        25064 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     148        25064 :         match self {
     149        20770 :             Self::Loaded { iter } => iter
     150        20770 :                 .peek()
     151        20770 :                 .as_ref()
     152        20770 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     153         4294 :             Self::NotLoaded {
     154         4294 :                 first_key_lower_bound: (key, lsn),
     155         4294 :                 ..
     156         4294 :             } => Some((key, *lsn, None)),
     157              :         }
     158        25064 :     }
     159              : 
     160              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     161              :     //
     162              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     163              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     164              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     165              :     // and not just `PeekMut::deref`
     166              :     // If we don't take `&mut self`
     167           42 :     async fn load(&mut self) -> anyhow::Result<()> {
     168           42 :         assert!(!self.is_loaded());
     169           42 :         let Self::NotLoaded {
     170           42 :             ctx,
     171           42 :             first_key_lower_bound,
     172           42 :             layer,
     173           42 :         } = self
     174              :         else {
     175            0 :             unreachable!()
     176              :         };
     177           42 :         let iter = layer.iter(ctx);
     178           42 :         let iter = PeekableLayerIterRef::create(iter).await?;
     179           42 :         if let Some((k1, l1, _)) = iter.peek() {
     180           42 :             let (k2, l2) = first_key_lower_bound;
     181           42 :             debug_assert!((k1, l1) >= (k2, l2));
     182            0 :         }
     183           42 :         *self = Self::Loaded { iter };
     184           42 :         Ok(())
     185           42 :     }
     186              : 
     187         6294 :     fn is_loaded(&self) -> bool {
     188         6294 :         matches!(self, Self::Loaded { .. })
     189         6294 :     }
     190              : 
     191              :     /// Correctness: must load the iterator before using.
     192              :     ///
     193              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     194              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     195              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     196         6210 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     197         6210 :         let Self::Loaded { iter } = self else {
     198            0 :             panic!("must load the iterator before using")
     199              :         };
     200         6210 :         iter.next().await
     201         6210 :     }
     202              : }
     203              : 
     204              : /// A merge iterator over delta/image layer iterators. When duplicated records are
     205              : /// found, the iterator will not perform any deduplication, and the caller should handle
     206              : /// these situation. By saying duplicated records, there are many possibilities:
     207              : /// * Two same delta at the same LSN.
     208              : /// * Two same image at the same LSN.
     209              : /// * Delta/image at the same LSN where the image has already applied the delta.
     210              : /// The iterator will always put the image before the delta.
     211              : pub struct MergeIterator<'a> {
     212              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     213              : }
     214              : 
     215              : impl<'a> MergeIterator<'a> {
     216           12 :     pub fn create(
     217           12 :         deltas: &[&'a DeltaLayerInner],
     218           12 :         images: &[&'a ImageLayerInner],
     219           12 :         ctx: &'a RequestContext,
     220           12 :     ) -> Self {
     221           12 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     222           20 :         for image in images {
     223            8 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     224            8 :         }
     225           46 :         for delta in deltas {
     226           34 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     227           34 :         }
     228           12 :         Self {
     229           12 :             heap: BinaryHeap::from(heap),
     230           12 :         }
     231           12 :     }
     232              : 
     233         6180 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     234         6264 :         while let Some(mut iter) = self.heap.peek_mut() {
     235         6252 :             if !iter.is_loaded() {
     236              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     237              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     238              :                 // [potential_first_key, loaded_first_key).
     239           42 :                 iter.load().await?;
     240           42 :                 continue;
     241         6210 :             }
     242         6210 :             let Some(item) = iter.next().await? else {
     243              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     244              :                 // we order None > Some, and all the rest of the iterators should return None.
     245           42 :                 binary_heap::PeekMut::pop(iter);
     246           42 :                 continue;
     247              :             };
     248         6168 :             return Ok(Some(item));
     249              :         }
     250           12 :         Ok(None)
     251         6180 :     }
     252              : }
     253              : 
     254              : #[cfg(test)]
     255              : mod tests {
     256              :     use super::*;
     257              : 
     258              :     use itertools::Itertools;
     259              :     use pageserver_api::key::Key;
     260              :     use utils::lsn::Lsn;
     261              : 
     262              :     use crate::{
     263              :         tenant::{
     264              :             harness::{TenantHarness, TIMELINE_ID},
     265              :             storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
     266              :         },
     267              :         walrecord::NeonWalRecord,
     268              :         DEFAULT_PG_VERSION,
     269              :     };
     270              : 
     271            8 :     async fn assert_merge_iter_equal(
     272            8 :         merge_iter: &mut MergeIterator<'_>,
     273            8 :         expect: &[(Key, Lsn, Value)],
     274            8 :     ) {
     275            8 :         let mut expect_iter = expect.iter();
     276              :         loop {
     277         6080 :             let o1 = merge_iter.next().await.unwrap();
     278         6080 :             let o2 = expect_iter.next();
     279         6080 :             assert_eq!(o1.is_some(), o2.is_some());
     280         6080 :             if o1.is_none() && o2.is_none() {
     281            8 :                 break;
     282         6072 :             }
     283         6072 :             let (k1, l1, v1) = o1.unwrap();
     284         6072 :             let (k2, l2, v2) = o2.unwrap();
     285         6072 :             assert_eq!(&k1, k2);
     286         6072 :             assert_eq!(l1, *l2);
     287         6072 :             assert_eq!(&v1, v2);
     288              :         }
     289            8 :     }
     290              : 
     291              :     #[tokio::test]
     292            2 :     async fn merge_in_between() {
     293            2 :         use crate::repository::Value;
     294            2 :         use bytes::Bytes;
     295            2 : 
     296            2 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     297            2 :             .await
     298            2 :             .unwrap();
     299            8 :         let (tenant, ctx) = harness.load().await;
     300            2 : 
     301            2 :         let tline = tenant
     302            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     303            6 :             .await
     304            2 :             .unwrap();
     305            2 : 
     306            8 :         fn get_key(id: u32) -> Key {
     307            8 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     308            8 :             key.field6 = id;
     309            8 :             key
     310            8 :         }
     311            2 :         let test_deltas1 = vec![
     312            2 :             (
     313            2 :                 get_key(0),
     314            2 :                 Lsn(0x10),
     315            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     316            2 :             ),
     317            2 :             (
     318            2 :                 get_key(5),
     319            2 :                 Lsn(0x10),
     320            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     321            2 :             ),
     322            2 :         ];
     323            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     324            6 :             .await
     325            2 :             .unwrap();
     326            2 :         let test_deltas2 = vec![
     327            2 :             (
     328            2 :                 get_key(3),
     329            2 :                 Lsn(0x10),
     330            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     331            2 :             ),
     332            2 :             (
     333            2 :                 get_key(4),
     334            2 :                 Lsn(0x10),
     335            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     336            2 :             ),
     337            2 :         ];
     338            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     339            6 :             .await
     340            2 :             .unwrap();
     341            2 :         let mut merge_iter = MergeIterator::create(
     342            2 :             &[
     343            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     344            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     345            2 :             ],
     346            2 :             &[],
     347            2 :             &ctx,
     348            2 :         );
     349            2 :         let mut expect = Vec::new();
     350            2 :         expect.extend(test_deltas1);
     351            2 :         expect.extend(test_deltas2);
     352            2 :         expect.sort_by(sort_delta);
     353            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     354            2 :     }
     355              : 
     356              :     #[tokio::test]
     357            2 :     async fn delta_merge() {
     358            2 :         use crate::repository::Value;
     359            2 :         use bytes::Bytes;
     360            2 : 
     361            2 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     362            2 :             .await
     363            2 :             .unwrap();
     364            8 :         let (tenant, ctx) = harness.load().await;
     365            2 : 
     366            2 :         let tline = tenant
     367            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     368            6 :             .await
     369            2 :             .unwrap();
     370            2 : 
     371         6000 :         fn get_key(id: u32) -> Key {
     372         6000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     373         6000 :             key.field6 = id;
     374         6000 :             key
     375         6000 :         }
     376            2 :         const N: usize = 1000;
     377            2 :         let test_deltas1 = (0..N)
     378         2000 :             .map(|idx| {
     379         2000 :                 (
     380         2000 :                     get_key(idx as u32 / 10),
     381         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     382         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     383         2000 :                 )
     384         2000 :             })
     385            2 :             .collect_vec();
     386            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     387            8 :             .await
     388            2 :             .unwrap();
     389            2 :         let test_deltas2 = (0..N)
     390         2000 :             .map(|idx| {
     391         2000 :                 (
     392         2000 :                     get_key(idx as u32 / 10),
     393         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     394         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     395         2000 :                 )
     396         2000 :             })
     397            2 :             .collect_vec();
     398            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     399            8 :             .await
     400            2 :             .unwrap();
     401            2 :         let test_deltas3 = (0..N)
     402         2000 :             .map(|idx| {
     403         2000 :                 (
     404         2000 :                     get_key(idx as u32 / 10 + N as u32),
     405         2000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     406         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     407         2000 :                 )
     408         2000 :             })
     409            2 :             .collect_vec();
     410            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     411            8 :             .await
     412            2 :             .unwrap();
     413            2 :         let mut merge_iter = MergeIterator::create(
     414            2 :             &[
     415            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     416            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     417            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     418            2 :             ],
     419            2 :             &[],
     420            2 :             &ctx,
     421            2 :         );
     422            2 :         let mut expect = Vec::new();
     423            2 :         expect.extend(test_deltas1);
     424            2 :         expect.extend(test_deltas2);
     425            2 :         expect.extend(test_deltas3);
     426            2 :         expect.sort_by(sort_delta);
     427           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     428            2 : 
     429            2 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     430            2 :     }
     431              : 
     432              :     #[tokio::test]
     433            2 :     async fn delta_image_mixed_merge() {
     434            2 :         use crate::repository::Value;
     435            2 :         use bytes::Bytes;
     436            2 : 
     437            2 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     438            2 :             .await
     439            2 :             .unwrap();
     440            8 :         let (tenant, ctx) = harness.load().await;
     441            2 : 
     442            2 :         let tline = tenant
     443            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     444            6 :             .await
     445            2 :             .unwrap();
     446            2 : 
     447           18 :         fn get_key(id: u32) -> Key {
     448           18 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     449           18 :             key.field6 = id;
     450           18 :             key
     451           18 :         }
     452            2 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     453            2 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     454            2 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     455            2 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     456            2 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     457            2 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     458            2 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     459            2 :         // will handle duplicated LSNs.
     460            2 :         let test_deltas1 = vec![
     461            2 :             (
     462            2 :                 get_key(0),
     463            2 :                 Lsn(0x10),
     464            2 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     465            2 :             ),
     466            2 :             (
     467            2 :                 get_key(0),
     468            2 :                 Lsn(0x18),
     469            2 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     470            2 :             ),
     471            2 :             (
     472            2 :                 get_key(5),
     473            2 :                 Lsn(0x10),
     474            2 :                 Value::WalRecord(NeonWalRecord::wal_init()),
     475            2 :             ),
     476            2 :             (
     477            2 :                 get_key(5),
     478            2 :                 Lsn(0x18),
     479            2 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     480            2 :             ),
     481            2 :         ];
     482            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     483            6 :             .await
     484            2 :             .unwrap();
     485            2 :         let mut test_deltas2 = test_deltas1.clone();
     486            2 :         test_deltas2.push((
     487            2 :             get_key(10),
     488            2 :             Lsn(0x20),
     489            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     490            2 :         ));
     491            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     492            6 :             .await
     493            2 :             .unwrap();
     494            2 :         let test_deltas3 = vec![
     495            2 :             (
     496            2 :                 get_key(0),
     497            2 :                 Lsn(0x10),
     498            2 :                 Value::Image(Bytes::copy_from_slice(b"")),
     499            2 :             ),
     500            2 :             (
     501            2 :                 get_key(5),
     502            2 :                 Lsn(0x18),
     503            2 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     504            2 :             ),
     505            2 :             (
     506            2 :                 get_key(15),
     507            2 :                 Lsn(0x20),
     508            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     509            2 :             ),
     510            2 :         ];
     511            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     512            6 :             .await
     513            2 :             .unwrap();
     514            2 :         let mut test_deltas4 = test_deltas3.clone();
     515            2 :         test_deltas4.push((
     516            2 :             get_key(20),
     517            2 :             Lsn(0x20),
     518            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     519            2 :         ));
     520            2 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     521            6 :             .await
     522            2 :             .unwrap();
     523            2 :         let mut expect = Vec::new();
     524            2 :         expect.extend(test_deltas1);
     525            2 :         expect.extend(test_deltas2);
     526            2 :         expect.extend(test_deltas3);
     527            2 :         expect.extend(test_deltas4);
     528            2 :         expect.sort_by(sort_delta_value);
     529            2 : 
     530            2 :         // Test with different layer order for MergeIterator::create to ensure the order
     531            2 :         // is stable.
     532            2 : 
     533            2 :         let mut merge_iter = MergeIterator::create(
     534            2 :             &[
     535            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     536            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     537            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     538            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     539            2 :             ],
     540            2 :             &[],
     541            2 :             &ctx,
     542            2 :         );
     543            8 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     544            2 : 
     545            2 :         let mut merge_iter = MergeIterator::create(
     546            2 :             &[
     547            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     548            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     549            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     550            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     551            2 :             ],
     552            2 :             &[],
     553            2 :             &ctx,
     554            2 :         );
     555            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     556            2 : 
     557            2 :         is_send(merge_iter);
     558            2 :     }
     559              : 
     560            2 :     fn is_send(_: impl Send) {}
     561              : }
        

Generated by: LCOV version 2.1-beta