LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 94.3 % 475 448
Test Date: 2025-04-24 20:31:15 Functions: 90.4 % 52 47

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, binary_heap};
       3              : use std::sync::Arc;
       4              : 
       5              : use anyhow::bail;
       6              : use pageserver_api::key::Key;
       7              : use pageserver_api::value::Value;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
      11              : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
      12              : use super::{PersistentLayerDesc, PersistentLayerKey};
      13              : use crate::context::RequestContext;
      14              : 
      15              : #[derive(Clone, Copy)]
      16              : pub(crate) enum LayerRef<'a> {
      17              :     Image(&'a ImageLayerInner),
      18              :     Delta(&'a DeltaLayerInner),
      19              : }
      20              : 
      21              : impl<'a> LayerRef<'a> {
      22         3540 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      23         3540 :         match self {
      24          420 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      25         3120 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      26              :         }
      27         3540 :     }
      28              : 
      29            0 :     fn layer_dbg_info(&self) -> String {
      30            0 :         match self {
      31            0 :             Self::Image(x) => x.layer_dbg_info(),
      32            0 :             Self::Delta(x) => x.layer_dbg_info(),
      33              :         }
      34            0 :     }
      35              : }
      36              : 
      37              : enum LayerIterRef<'a> {
      38              :     Image(ImageLayerIterator<'a>),
      39              :     Delta(DeltaLayerIterator<'a>),
      40              : }
      41              : 
      42              : impl LayerIterRef<'_> {
      43     12435684 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      44     12435684 :         match self {
      45     12430884 :             Self::Delta(x) => x.next().await,
      46         4800 :             Self::Image(x) => x.next().await,
      47              :         }
      48     12435684 :     }
      49              : 
      50            0 :     fn layer_dbg_info(&self) -> String {
      51            0 :         match self {
      52            0 :             Self::Image(x) => x.layer_dbg_info(),
      53            0 :             Self::Delta(x) => x.layer_dbg_info(),
      54              :         }
      55            0 :     }
      56              : }
      57              : 
      58              : /// This type plays several roles at once
      59              : /// 1. Unified iterator for image and delta layers.
      60              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      61              : /// 3. Lazy creation of the real delta/image iterator.
      62              : #[allow(clippy::large_enum_variant, reason = "TODO")]
      63              : pub(crate) enum IteratorWrapper<'a> {
      64              :     NotLoaded {
      65              :         ctx: &'a RequestContext,
      66              :         first_key_lower_bound: (Key, Lsn),
      67              :         layer: LayerRef<'a>,
      68              :         source_desc: Arc<PersistentLayerKey>,
      69              :     },
      70              :     Loaded {
      71              :         iter: PeekableLayerIterRef<'a>,
      72              :         source_desc: Arc<PersistentLayerKey>,
      73              :     },
      74              : }
      75              : 
      76              : pub(crate) struct PeekableLayerIterRef<'a> {
      77              :     iter: LayerIterRef<'a>,
      78              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      79              : }
      80              : 
      81              : impl<'a> PeekableLayerIterRef<'a> {
      82         3540 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      83         3540 :         let peeked = iter.next().await?;
      84         3540 :         Ok(Self { iter, peeked })
      85         3540 :     }
      86              : 
      87     50708942 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      88     50708942 :         &self.peeked
      89     50708942 :     }
      90              : 
      91     12432144 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      92     12432144 :         let result = self.peeked.take();
      93     12432144 :         self.peeked = self.iter.next().await?;
      94     12432144 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
      95     12425136 :             if (k1, l1) < (k2, l2) {
      96            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
      97     12425136 :             }
      98         7008 :         }
      99     12432144 :         Ok(result)
     100     12432144 :     }
     101              : }
     102              : 
     103              : impl std::cmp::PartialEq for IteratorWrapper<'_> {
     104            0 :     fn eq(&self, other: &Self) -> bool {
     105            0 :         self.cmp(other) == Ordering::Equal
     106            0 :     }
     107              : }
     108              : 
     109              : impl std::cmp::Eq for IteratorWrapper<'_> {}
     110              : 
     111              : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
     112     25380043 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     113     25380043 :         Some(self.cmp(other))
     114     25380043 :     }
     115              : }
     116              : 
     117              : impl std::cmp::Ord for IteratorWrapper<'_> {
     118     25380043 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     119              :         use std::cmp::Ordering;
     120     25380043 :         let a = self.peek_next_key_lsn_value();
     121     25380043 :         let b = other.peek_next_key_lsn_value();
     122     25380043 :         match (a, b) {
     123     18131185 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     124     36262370 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     125     36208694 :                     match val {
     126        53676 :                         None => 0,
     127     36205598 :                         Some(Value::Image(_)) => 1,
     128         3096 :                         Some(Value::WalRecord(_)) => 2,
     129              :                     }
     130     36262370 :                 }
     131     18131185 :                 let order_1 = map_value_to_num(&v1);
     132     18131185 :                 let order_2 = map_value_to_num(&v2);
     133     18131185 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     134     18131185 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     135     18131185 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     136              :             }
     137      6017729 :             (Some(_), None) => Ordering::Less,
     138         8415 :             (None, Some(_)) => Ordering::Greater,
     139      1222714 :             (None, None) => Ordering::Equal,
     140              :         }
     141     25380043 :         .reverse()
     142     25380043 :     }
     143              : }
     144              : 
     145              : impl<'a> IteratorWrapper<'a> {
     146          420 :     pub fn create_from_image_layer(
     147          420 :         image_layer: &'a ImageLayerInner,
     148          420 :         ctx: &'a RequestContext,
     149          420 :     ) -> Self {
     150          420 :         Self::NotLoaded {
     151          420 :             layer: LayerRef::Image(image_layer),
     152          420 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     153          420 :             ctx,
     154          420 :             source_desc: PersistentLayerKey {
     155          420 :                 key_range: image_layer.key_range().clone(),
     156          420 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
     157          420 :                 is_delta: false,
     158          420 :             }
     159          420 :             .into(),
     160          420 :         }
     161          420 :     }
     162              : 
     163         3120 :     pub fn create_from_delta_layer(
     164         3120 :         delta_layer: &'a DeltaLayerInner,
     165         3120 :         ctx: &'a RequestContext,
     166         3120 :     ) -> Self {
     167         3120 :         Self::NotLoaded {
     168         3120 :             layer: LayerRef::Delta(delta_layer),
     169         3120 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     170         3120 :             ctx,
     171         3120 :             source_desc: PersistentLayerKey {
     172         3120 :                 key_range: delta_layer.key_range().clone(),
     173         3120 :                 lsn_range: delta_layer.lsn_range().clone(),
     174         3120 :                 is_delta: true,
     175         3120 :             }
     176         3120 :             .into(),
     177         3120 :         }
     178         3120 :     }
     179              : 
     180     50760086 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     181     50760086 :         match self {
     182     50705402 :             Self::Loaded { iter, .. } => iter
     183     50705402 :                 .peek()
     184     50705402 :                 .as_ref()
     185     50705402 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     186        54684 :             Self::NotLoaded {
     187        54684 :                 first_key_lower_bound: (key, lsn),
     188        54684 :                 ..
     189        54684 :             } => Some((key, *lsn, None)),
     190              :         }
     191     50760086 :     }
     192              : 
     193              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     194              :     //
     195              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     196              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     197              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     198              :     // and not just `PeekMut::deref`
     199              :     // If we don't take `&mut self`
     200         3540 :     async fn load(&mut self) -> anyhow::Result<()> {
     201         3540 :         assert!(!self.is_loaded());
     202         3540 :         let Self::NotLoaded {
     203         3540 :             ctx,
     204         3540 :             first_key_lower_bound,
     205         3540 :             layer,
     206         3540 :             source_desc,
     207         3540 :         } = self
     208              :         else {
     209            0 :             unreachable!()
     210              :         };
     211         3540 :         let iter = layer.iter(ctx);
     212         3540 :         let iter = PeekableLayerIterRef::create(iter).await?;
     213         3540 :         if let Some((k1, l1, _)) = iter.peek() {
     214         3540 :             let (k2, l2) = first_key_lower_bound;
     215         3540 :             if (k1, l1) < (k2, l2) {
     216            0 :                 bail!(
     217            0 :                     "layer key range did not include the first key in the layer: {}",
     218            0 :                     layer.layer_dbg_info()
     219            0 :                 );
     220         3540 :             }
     221            0 :         }
     222         3540 :         *self = Self::Loaded {
     223         3540 :             iter,
     224         3540 :             source_desc: source_desc.clone(),
     225         3540 :         };
     226         3540 :         Ok(())
     227         3540 :     }
     228              : 
     229     12439224 :     fn is_loaded(&self) -> bool {
     230     12439224 :         matches!(self, Self::Loaded { .. })
     231     12439224 :     }
     232              : 
     233              :     /// Correctness: must load the iterator before using.
     234              :     ///
     235              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     236              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     237              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     238     12432144 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     239     12432144 :         let Self::Loaded { iter, .. } = self else {
     240            0 :             panic!("must load the iterator before using")
     241              :         };
     242     12432144 :         iter.next().await
     243     12432144 :     }
     244              : 
     245              :     /// Get the persistent layer key corresponding to this iterator
     246         5640 :     fn trace_source(&self) -> Arc<PersistentLayerKey> {
     247         5640 :         match self {
     248         5640 :             Self::Loaded { source_desc, .. } => source_desc.clone(),
     249            0 :             Self::NotLoaded { source_desc, .. } => source_desc.clone(),
     250              :         }
     251         5640 :     }
     252              : }
     253              : 
     254              : /// A merge iterator over delta/image layer iterators.
     255              : ///
     256              : /// When duplicated records are found, the iterator will not perform any
     257              : /// deduplication, and the caller should handle these situation. By saying
     258              : /// duplicated records, there are many possibilities:
     259              : ///
     260              : /// * Two same delta at the same LSN.
     261              : /// * Two same image at the same LSN.
     262              : /// * Delta/image at the same LSN where the image has already applied the delta.
     263              : ///
     264              : /// The iterator will always put the image before the delta.
     265              : pub struct MergeIterator<'a> {
     266              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     267              : }
     268              : 
     269              : pub(crate) trait MergeIteratorItem {
     270              :     fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
     271              : 
     272              :     fn key_lsn_value(&self) -> &(Key, Lsn, Value);
     273              : }
     274              : 
     275              : impl MergeIteratorItem for (Key, Lsn, Value) {
     276     12423012 :     fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
     277     12423012 :         item
     278     12423012 :     }
     279              : 
     280         4740 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     281         4740 :         self
     282         4740 :     }
     283              : }
     284              : 
     285              : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
     286         5640 :     fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
     287         5640 :         (item, iter.trace_source().clone())
     288         5640 :     }
     289              : 
     290        11880 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     291        11880 :         &self.0
     292        11880 :     }
     293              : }
     294              : 
     295              : impl<'a> MergeIterator<'a> {
     296          564 :     pub fn create(
     297          564 :         deltas: &[&'a DeltaLayerInner],
     298          564 :         images: &[&'a ImageLayerInner],
     299          564 :         ctx: &'a RequestContext,
     300          564 :     ) -> Self {
     301          564 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     302          984 :         for image in images {
     303          420 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     304          420 :         }
     305         3684 :         for delta in deltas {
     306         3120 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     307         3120 :         }
     308          564 :         Self {
     309          564 :             heap: BinaryHeap::from(heap),
     310          564 :         }
     311          564 :     }
     312              : 
     313     12429192 :     pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
     314     12436224 :         while let Some(mut iter) = self.heap.peek_mut() {
     315     12435684 :             if !iter.is_loaded() {
     316              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     317              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     318              :                 // [potential_first_key, loaded_first_key).
     319         3540 :                 iter.load().await?;
     320         3540 :                 continue;
     321     12432144 :             }
     322     12432144 :             let Some(item) = iter.next().await? else {
     323              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     324              :                 // we order None > Some, and all the rest of the iterators should return None.
     325         3492 :                 binary_heap::PeekMut::pop(iter);
     326         3492 :                 continue;
     327              :             };
     328     12428652 :             return Ok(Some(R::new(item, &iter)));
     329              :         }
     330          540 :         Ok(None)
     331     12429192 :     }
     332              : 
     333              :     /// Get the next key-value pair from the iterator.
     334     12420876 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     335     12420876 :         self.next_inner().await
     336     12420876 :     }
     337              : 
     338              :     /// Get the next key-value pair from the iterator, and trace where the key comes from.
     339            0 :     pub async fn next_with_trace(
     340            0 :         &mut self,
     341            0 :     ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
     342            0 :         self.next_inner().await
     343            0 :     }
     344              : }
     345              : 
     346              : #[cfg(test)]
     347              : mod tests {
     348              :     use itertools::Itertools;
     349              :     use pageserver_api::key::Key;
     350              :     #[cfg(feature = "testing")]
     351              :     use pageserver_api::record::NeonWalRecord;
     352              :     use utils::lsn::Lsn;
     353              : 
     354              :     use super::*;
     355              :     use crate::DEFAULT_PG_VERSION;
     356              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     357              :     #[cfg(feature = "testing")]
     358              :     use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
     359              :     use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
     360              : 
     361           48 :     async fn assert_merge_iter_equal(
     362           48 :         merge_iter: &mut MergeIterator<'_>,
     363           48 :         expect: &[(Key, Lsn, Value)],
     364           48 :     ) {
     365           48 :         let mut expect_iter = expect.iter();
     366              :         loop {
     367        36480 :             let o1 = merge_iter.next().await.unwrap();
     368        36480 :             let o2 = expect_iter.next();
     369        36480 :             assert_eq!(o1.is_some(), o2.is_some());
     370        36480 :             if o1.is_none() && o2.is_none() {
     371           48 :                 break;
     372        36432 :             }
     373        36432 :             let (k1, l1, v1) = o1.unwrap();
     374        36432 :             let (k2, l2, v2) = o2.unwrap();
     375        36432 :             assert_eq!(&k1, k2);
     376        36432 :             assert_eq!(l1, *l2);
     377        36432 :             assert_eq!(&v1, v2);
     378              :         }
     379           48 :     }
     380              : 
     381              :     #[tokio::test]
     382           12 :     async fn merge_in_between() {
     383           12 :         use bytes::Bytes;
     384           12 :         use pageserver_api::value::Value;
     385           12 : 
     386           12 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     387           12 :             .await
     388           12 :             .unwrap();
     389           12 :         let (tenant, ctx) = harness.load().await;
     390           12 : 
     391           12 :         let tline = tenant
     392           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     393           12 :             .await
     394           12 :             .unwrap();
     395           12 : 
     396           48 :         fn get_key(id: u32) -> Key {
     397           48 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     398           48 :             key.field6 = id;
     399           48 :             key
     400           48 :         }
     401           12 :         let test_deltas1 = vec![
     402           12 :             (
     403           12 :                 get_key(0),
     404           12 :                 Lsn(0x10),
     405           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     406           12 :             ),
     407           12 :             (
     408           12 :                 get_key(5),
     409           12 :                 Lsn(0x10),
     410           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     411           12 :             ),
     412           12 :         ];
     413           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     414           12 :             .await
     415           12 :             .unwrap();
     416           12 :         let test_deltas2 = vec![
     417           12 :             (
     418           12 :                 get_key(3),
     419           12 :                 Lsn(0x10),
     420           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     421           12 :             ),
     422           12 :             (
     423           12 :                 get_key(4),
     424           12 :                 Lsn(0x10),
     425           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     426           12 :             ),
     427           12 :         ];
     428           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     429           12 :             .await
     430           12 :             .unwrap();
     431           12 :         let mut merge_iter = MergeIterator::create(
     432           12 :             &[
     433           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     434           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     435           12 :             ],
     436           12 :             &[],
     437           12 :             &ctx,
     438           12 :         );
     439           12 :         let mut expect = Vec::new();
     440           12 :         expect.extend(test_deltas1);
     441           12 :         expect.extend(test_deltas2);
     442           12 :         expect.sort_by(sort_delta);
     443           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     444           12 :     }
     445              : 
     446              :     #[tokio::test]
     447           12 :     async fn delta_merge() {
     448           12 :         use bytes::Bytes;
     449           12 :         use pageserver_api::value::Value;
     450           12 : 
     451           12 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     452           12 :             .await
     453           12 :             .unwrap();
     454           12 :         let (tenant, ctx) = harness.load().await;
     455           12 : 
     456           12 :         let tline = tenant
     457           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     458           12 :             .await
     459           12 :             .unwrap();
     460           12 : 
     461        36000 :         fn get_key(id: u32) -> Key {
     462        36000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     463        36000 :             key.field6 = id;
     464        36000 :             key
     465        36000 :         }
     466           12 :         const N: usize = 1000;
     467           12 :         let test_deltas1 = (0..N)
     468        12000 :             .map(|idx| {
     469        12000 :                 (
     470        12000 :                     get_key(idx as u32 / 10),
     471        12000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     472        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     473        12000 :                 )
     474        12000 :             })
     475           12 :             .collect_vec();
     476           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     477           12 :             .await
     478           12 :             .unwrap();
     479           12 :         let test_deltas2 = (0..N)
     480        12000 :             .map(|idx| {
     481        12000 :                 (
     482        12000 :                     get_key(idx as u32 / 10),
     483        12000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     484        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     485        12000 :                 )
     486        12000 :             })
     487           12 :             .collect_vec();
     488           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     489           12 :             .await
     490           12 :             .unwrap();
     491           12 :         let test_deltas3 = (0..N)
     492        12000 :             .map(|idx| {
     493        12000 :                 (
     494        12000 :                     get_key(idx as u32 / 10 + N as u32),
     495        12000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     496        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     497        12000 :                 )
     498        12000 :             })
     499           12 :             .collect_vec();
     500           12 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     501           12 :             .await
     502           12 :             .unwrap();
     503           12 :         let mut merge_iter = MergeIterator::create(
     504           12 :             &[
     505           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     506           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     507           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     508           12 :             ],
     509           12 :             &[],
     510           12 :             &ctx,
     511           12 :         );
     512           12 :         let mut expect = Vec::new();
     513           12 :         expect.extend(test_deltas1);
     514           12 :         expect.extend(test_deltas2);
     515           12 :         expect.extend(test_deltas3);
     516           12 :         expect.sort_by(sort_delta);
     517           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     518           12 : 
     519           12 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     520           12 :     }
     521              : 
     522              :     #[cfg(feature = "testing")]
     523              :     #[tokio::test]
     524           12 :     async fn delta_image_mixed_merge() {
     525           12 :         use bytes::Bytes;
     526           12 :         use pageserver_api::value::Value;
     527           12 : 
     528           12 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     529           12 :             .await
     530           12 :             .unwrap();
     531           12 :         let (tenant, ctx) = harness.load().await;
     532           12 : 
     533           12 :         let tline = tenant
     534           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     535           12 :             .await
     536           12 :             .unwrap();
     537           12 : 
     538          108 :         fn get_key(id: u32) -> Key {
     539          108 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     540          108 :             key.field6 = id;
     541          108 :             key
     542          108 :         }
     543           12 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     544           12 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     545           12 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     546           12 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     547           12 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     548           12 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     549           12 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     550           12 :         // will handle duplicated LSNs.
     551           12 :         let test_deltas1 = vec![
     552           12 :             (
     553           12 :                 get_key(0),
     554           12 :                 Lsn(0x10),
     555           12 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     556           12 :             ),
     557           12 :             (
     558           12 :                 get_key(0),
     559           12 :                 Lsn(0x18),
     560           12 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     561           12 :             ),
     562           12 :             (
     563           12 :                 get_key(5),
     564           12 :                 Lsn(0x10),
     565           12 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     566           12 :             ),
     567           12 :             (
     568           12 :                 get_key(5),
     569           12 :                 Lsn(0x18),
     570           12 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     571           12 :             ),
     572           12 :         ];
     573           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     574           12 :             .await
     575           12 :             .unwrap();
     576           12 :         let mut test_deltas2 = test_deltas1.clone();
     577           12 :         test_deltas2.push((
     578           12 :             get_key(10),
     579           12 :             Lsn(0x20),
     580           12 :             Value::Image(Bytes::copy_from_slice(b"test")),
     581           12 :         ));
     582           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     583           12 :             .await
     584           12 :             .unwrap();
     585           12 :         let test_deltas3 = vec![
     586           12 :             (
     587           12 :                 get_key(0),
     588           12 :                 Lsn(0x10),
     589           12 :                 Value::Image(Bytes::copy_from_slice(b"")),
     590           12 :             ),
     591           12 :             (
     592           12 :                 get_key(5),
     593           12 :                 Lsn(0x18),
     594           12 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     595           12 :             ),
     596           12 :             (
     597           12 :                 get_key(15),
     598           12 :                 Lsn(0x20),
     599           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     600           12 :             ),
     601           12 :         ];
     602           12 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     603           12 :             .await
     604           12 :             .unwrap();
     605           12 :         let mut test_deltas4 = test_deltas3.clone();
     606           12 :         test_deltas4.push((
     607           12 :             get_key(20),
     608           12 :             Lsn(0x20),
     609           12 :             Value::Image(Bytes::copy_from_slice(b"test")),
     610           12 :         ));
     611           12 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     612           12 :             .await
     613           12 :             .unwrap();
     614           12 :         let mut expect = Vec::new();
     615           12 :         expect.extend(test_deltas1);
     616           12 :         expect.extend(test_deltas2);
     617           12 :         expect.extend(test_deltas3);
     618           12 :         expect.extend(test_deltas4);
     619           12 :         expect.sort_by(sort_delta_value);
     620           12 : 
     621           12 :         // Test with different layer order for MergeIterator::create to ensure the order
     622           12 :         // is stable.
     623           12 : 
     624           12 :         let mut merge_iter = MergeIterator::create(
     625           12 :             &[
     626           12 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     627           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     628           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     629           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     630           12 :             ],
     631           12 :             &[],
     632           12 :             &ctx,
     633           12 :         );
     634           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     635           12 : 
     636           12 :         let mut merge_iter = MergeIterator::create(
     637           12 :             &[
     638           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     639           12 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     640           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     641           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     642           12 :             ],
     643           12 :             &[],
     644           12 :             &ctx,
     645           12 :         );
     646           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     647           12 : 
     648           12 :         is_send(merge_iter);
     649           12 :     }
     650              : 
     651              :     #[cfg(feature = "testing")]
     652           12 :     fn is_send(_: impl Send) {}
     653              : }
        

Generated by: LCOV version 2.1-beta