LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 62212f4d57a7ad0f69dc82a04629a0bbd5f7c824.info Lines: 94.3 % 475 448
Test Date: 2025-03-17 10:41:39 Functions: 90.4 % 52 47

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, binary_heap};
       3              : use std::sync::Arc;
       4              : 
       5              : use anyhow::bail;
       6              : use pageserver_api::key::Key;
       7              : use pageserver_api::value::Value;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
      11              : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
      12              : use super::{PersistentLayerDesc, PersistentLayerKey};
      13              : use crate::context::RequestContext;
      14              : 
      15              : #[derive(Clone, Copy)]
      16              : pub(crate) enum LayerRef<'a> {
      17              :     Image(&'a ImageLayerInner),
      18              :     Delta(&'a DeltaLayerInner),
      19              : }
      20              : 
      21              : impl<'a> LayerRef<'a> {
      22         1168 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      23         1168 :         match self {
      24          132 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      25         1036 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      26              :         }
      27         1168 :     }
      28              : 
      29            0 :     fn layer_dbg_info(&self) -> String {
      30            0 :         match self {
      31            0 :             Self::Image(x) => x.layer_dbg_info(),
      32            0 :             Self::Delta(x) => x.layer_dbg_info(),
      33              :         }
      34            0 :     }
      35              : }
      36              : 
      37              : enum LayerIterRef<'a> {
      38              :     Image(ImageLayerIterator<'a>),
      39              :     Delta(DeltaLayerIterator<'a>),
      40              : }
      41              : 
      42              : impl LayerIterRef<'_> {
      43      4145164 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      44      4145164 :         match self {
      45      4143612 :             Self::Delta(x) => x.next().await,
      46         1552 :             Self::Image(x) => x.next().await,
      47              :         }
      48      4145164 :     }
      49              : 
      50            0 :     fn layer_dbg_info(&self) -> String {
      51            0 :         match self {
      52            0 :             Self::Image(x) => x.layer_dbg_info(),
      53            0 :             Self::Delta(x) => x.layer_dbg_info(),
      54              :         }
      55            0 :     }
      56              : }
      57              : 
      58              : /// This type plays several roles at once
      59              : /// 1. Unified iterator for image and delta layers.
      60              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      61              : /// 3. Lazy creation of the real delta/image iterator.
      62              : #[allow(clippy::large_enum_variant, reason = "TODO")]
      63              : pub(crate) enum IteratorWrapper<'a> {
      64              :     NotLoaded {
      65              :         ctx: &'a RequestContext,
      66              :         first_key_lower_bound: (Key, Lsn),
      67              :         layer: LayerRef<'a>,
      68              :         source_desc: Arc<PersistentLayerKey>,
      69              :     },
      70              :     Loaded {
      71              :         iter: PeekableLayerIterRef<'a>,
      72              :         source_desc: Arc<PersistentLayerKey>,
      73              :     },
      74              : }
      75              : 
      76              : pub(crate) struct PeekableLayerIterRef<'a> {
      77              :     iter: LayerIterRef<'a>,
      78              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      79              : }
      80              : 
      81              : impl<'a> PeekableLayerIterRef<'a> {
      82         1168 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      83         1168 :         let peeked = iter.next().await?;
      84         1168 :         Ok(Self { iter, peeked })
      85         1168 :     }
      86              : 
      87     16902210 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      88     16902210 :         &self.peeked
      89     16902210 :     }
      90              : 
      91      4143996 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      92      4143996 :         let result = self.peeked.take();
      93      4143996 :         self.peeked = self.iter.next().await?;
      94      4143996 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
      95      4141668 :             if (k1, l1) < (k2, l2) {
      96            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
      97      4141668 :             }
      98         2328 :         }
      99      4143996 :         Ok(result)
     100      4143996 :     }
     101              : }
     102              : 
     103              : impl std::cmp::PartialEq for IteratorWrapper<'_> {
     104            0 :     fn eq(&self, other: &Self) -> bool {
     105            0 :         self.cmp(other) == Ordering::Equal
     106            0 :     }
     107              : }
     108              : 
     109              : impl std::cmp::Eq for IteratorWrapper<'_> {}
     110              : 
     111              : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
     112      8459567 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     113      8459567 :         Some(self.cmp(other))
     114      8459567 :     }
     115              : }
     116              : 
     117              : impl std::cmp::Ord for IteratorWrapper<'_> {
     118      8459567 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     119              :         use std::cmp::Ordering;
     120      8459567 :         let a = self.peek_next_key_lsn_value();
     121      8459567 :         let b = other.peek_next_key_lsn_value();
     122      8459567 :         match (a, b) {
     123      6043327 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     124     12086654 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     125     12068882 :                     match val {
     126        17772 :                         None => 0,
     127     12067862 :                         Some(Value::Image(_)) => 1,
     128         1020 :                         Some(Value::WalRecord(_)) => 2,
     129              :                     }
     130     12086654 :                 }
     131      6043327 :                 let order_1 = map_value_to_num(&v1);
     132      6043327 :                 let order_2 = map_value_to_num(&v2);
     133      6043327 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     134      6043327 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     135      6043327 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     136              :             }
     137      2005896 :             (Some(_), None) => Ordering::Less,
     138         2777 :             (None, Some(_)) => Ordering::Greater,
     139       407567 :             (None, None) => Ordering::Equal,
     140              :         }
     141      8459567 :         .reverse()
     142      8459567 :     }
     143              : }
     144              : 
     145              : impl<'a> IteratorWrapper<'a> {
     146          132 :     pub fn create_from_image_layer(
     147          132 :         image_layer: &'a ImageLayerInner,
     148          132 :         ctx: &'a RequestContext,
     149          132 :     ) -> Self {
     150          132 :         Self::NotLoaded {
     151          132 :             layer: LayerRef::Image(image_layer),
     152          132 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     153          132 :             ctx,
     154          132 :             source_desc: PersistentLayerKey {
     155          132 :                 key_range: image_layer.key_range().clone(),
     156          132 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
     157          132 :                 is_delta: false,
     158          132 :             }
     159          132 :             .into(),
     160          132 :         }
     161          132 :     }
     162              : 
     163         1036 :     pub fn create_from_delta_layer(
     164         1036 :         delta_layer: &'a DeltaLayerInner,
     165         1036 :         ctx: &'a RequestContext,
     166         1036 :     ) -> Self {
     167         1036 :         Self::NotLoaded {
     168         1036 :             layer: LayerRef::Delta(delta_layer),
     169         1036 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     170         1036 :             ctx,
     171         1036 :             source_desc: PersistentLayerKey {
     172         1036 :                 key_range: delta_layer.key_range().clone(),
     173         1036 :                 lsn_range: delta_layer.lsn_range().clone(),
     174         1036 :                 is_delta: true,
     175         1036 :             }
     176         1036 :             .into(),
     177         1036 :         }
     178         1036 :     }
     179              : 
     180     16919134 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     181     16919134 :         match self {
     182     16901042 :             Self::Loaded { iter, .. } => iter
     183     16901042 :                 .peek()
     184     16901042 :                 .as_ref()
     185     16901042 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     186        18092 :             Self::NotLoaded {
     187        18092 :                 first_key_lower_bound: (key, lsn),
     188        18092 :                 ..
     189        18092 :             } => Some((key, *lsn, None)),
     190              :         }
     191     16919134 :     }
     192              : 
     193              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     194              :     //
     195              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     196              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     197              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     198              :     // and not just `PeekMut::deref`
     199              :     // If we don't take `&mut self`
     200         1168 :     async fn load(&mut self) -> anyhow::Result<()> {
     201         1168 :         assert!(!self.is_loaded());
     202         1168 :         let Self::NotLoaded {
     203         1168 :             ctx,
     204         1168 :             first_key_lower_bound,
     205         1168 :             layer,
     206         1168 :             source_desc,
     207         1168 :         } = self
     208              :         else {
     209            0 :             unreachable!()
     210              :         };
     211         1168 :         let iter = layer.iter(ctx);
     212         1168 :         let iter = PeekableLayerIterRef::create(iter).await?;
     213         1168 :         if let Some((k1, l1, _)) = iter.peek() {
     214         1168 :             let (k2, l2) = first_key_lower_bound;
     215         1168 :             if (k1, l1) < (k2, l2) {
     216            0 :                 bail!(
     217            0 :                     "layer key range did not include the first key in the layer: {}",
     218            0 :                     layer.layer_dbg_info()
     219            0 :                 );
     220         1168 :             }
     221            0 :         }
     222         1168 :         *self = Self::Loaded {
     223         1168 :             iter,
     224         1168 :             source_desc: source_desc.clone(),
     225         1168 :         };
     226         1168 :         Ok(())
     227         1168 :     }
     228              : 
     229      4146332 :     fn is_loaded(&self) -> bool {
     230      4146332 :         matches!(self, Self::Loaded { .. })
     231      4146332 :     }
     232              : 
     233              :     /// Correctness: must load the iterator before using.
     234              :     ///
     235              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     236              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     237              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     238      4143996 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     239      4143996 :         let Self::Loaded { iter, .. } = self else {
     240            0 :             panic!("must load the iterator before using")
     241              :         };
     242      4143996 :         iter.next().await
     243      4143996 :     }
     244              : 
     245              :     /// Get the persistent layer key corresponding to this iterator
     246         1828 :     fn trace_source(&self) -> Arc<PersistentLayerKey> {
     247         1828 :         match self {
     248         1828 :             Self::Loaded { source_desc, .. } => source_desc.clone(),
     249            0 :             Self::NotLoaded { source_desc, .. } => source_desc.clone(),
     250              :         }
     251         1828 :     }
     252              : }
     253              : 
     254              : /// A merge iterator over delta/image layer iterators.
     255              : ///
     256              : /// When duplicated records are found, the iterator will not perform any
     257              : /// deduplication, and the caller should handle these situation. By saying
     258              : /// duplicated records, there are many possibilities:
     259              : ///
     260              : /// * Two same delta at the same LSN.
     261              : /// * Two same image at the same LSN.
     262              : /// * Delta/image at the same LSN where the image has already applied the delta.
     263              : ///
     264              : /// The iterator will always put the image before the delta.
     265              : pub struct MergeIterator<'a> {
     266              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     267              : }
     268              : 
     269              : pub(crate) trait MergeIteratorItem {
     270              :     fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
     271              : 
     272              :     fn key_lsn_value(&self) -> &(Key, Lsn, Value);
     273              : }
     274              : 
     275              : impl MergeIteratorItem for (Key, Lsn, Value) {
     276      4141004 :     fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
     277      4141004 :         item
     278      4141004 :     }
     279              : 
     280         1580 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     281         1580 :         self
     282         1580 :     }
     283              : }
     284              : 
     285              : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
     286         1828 :     fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
     287         1828 :         (item, iter.trace_source().clone())
     288         1828 :     }
     289              : 
     290         3848 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     291         3848 :         &self.0
     292         3848 :     }
     293              : }
     294              : 
     295              : impl<'a> MergeIterator<'a> {
     296          184 :     pub fn create(
     297          184 :         deltas: &[&'a DeltaLayerInner],
     298          184 :         images: &[&'a ImageLayerInner],
     299          184 :         ctx: &'a RequestContext,
     300          184 :     ) -> Self {
     301          184 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     302          316 :         for image in images {
     303          132 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     304          132 :         }
     305         1220 :         for delta in deltas {
     306         1036 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     307         1036 :         }
     308          184 :         Self {
     309          184 :             heap: BinaryHeap::from(heap),
     310          184 :         }
     311          184 :     }
     312              : 
     313      4143012 :     pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
     314      4145344 :         while let Some(mut iter) = self.heap.peek_mut() {
     315      4145164 :             if !iter.is_loaded() {
     316              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     317              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     318              :                 // [potential_first_key, loaded_first_key).
     319         1168 :                 iter.load().await?;
     320         1168 :                 continue;
     321      4143996 :             }
     322      4143996 :             let Some(item) = iter.next().await? else {
     323              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     324              :                 // we order None > Some, and all the rest of the iterators should return None.
     325         1164 :                 binary_heap::PeekMut::pop(iter);
     326         1164 :                 continue;
     327              :             };
     328      4142832 :             return Ok(Some(R::new(item, &iter)));
     329              :         }
     330          180 :         Ok(None)
     331      4143012 :     }
     332              : 
     333              :     /// Get the next key-value pair from the iterator.
     334      4140292 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     335      4140292 :         self.next_inner().await
     336      4140292 :     }
     337              : 
     338              :     /// Get the next key-value pair from the iterator, and trace where the key comes from.
     339            0 :     pub async fn next_with_trace(
     340            0 :         &mut self,
     341            0 :     ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
     342            0 :         self.next_inner().await
     343            0 :     }
     344              : }
     345              : 
     346              : #[cfg(test)]
     347              : mod tests {
     348              :     use itertools::Itertools;
     349              :     use pageserver_api::key::Key;
     350              :     #[cfg(feature = "testing")]
     351              :     use pageserver_api::record::NeonWalRecord;
     352              :     use utils::lsn::Lsn;
     353              : 
     354              :     use super::*;
     355              :     use crate::DEFAULT_PG_VERSION;
     356              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     357              :     #[cfg(feature = "testing")]
     358              :     use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
     359              :     use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
     360              : 
     361           16 :     async fn assert_merge_iter_equal(
     362           16 :         merge_iter: &mut MergeIterator<'_>,
     363           16 :         expect: &[(Key, Lsn, Value)],
     364           16 :     ) {
     365           16 :         let mut expect_iter = expect.iter();
     366              :         loop {
     367        12160 :             let o1 = merge_iter.next().await.unwrap();
     368        12160 :             let o2 = expect_iter.next();
     369        12160 :             assert_eq!(o1.is_some(), o2.is_some());
     370        12160 :             if o1.is_none() && o2.is_none() {
     371           16 :                 break;
     372        12144 :             }
     373        12144 :             let (k1, l1, v1) = o1.unwrap();
     374        12144 :             let (k2, l2, v2) = o2.unwrap();
     375        12144 :             assert_eq!(&k1, k2);
     376        12144 :             assert_eq!(l1, *l2);
     377        12144 :             assert_eq!(&v1, v2);
     378              :         }
     379           16 :     }
     380              : 
     381              :     #[tokio::test]
     382            4 :     async fn merge_in_between() {
     383            4 :         use bytes::Bytes;
     384            4 :         use pageserver_api::value::Value;
     385            4 : 
     386            4 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     387            4 :             .await
     388            4 :             .unwrap();
     389            4 :         let (tenant, ctx) = harness.load().await;
     390            4 : 
     391            4 :         let tline = tenant
     392            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     393            4 :             .await
     394            4 :             .unwrap();
     395            4 : 
     396           16 :         fn get_key(id: u32) -> Key {
     397           16 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     398           16 :             key.field6 = id;
     399           16 :             key
     400           16 :         }
     401            4 :         let test_deltas1 = vec![
     402            4 :             (
     403            4 :                 get_key(0),
     404            4 :                 Lsn(0x10),
     405            4 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     406            4 :             ),
     407            4 :             (
     408            4 :                 get_key(5),
     409            4 :                 Lsn(0x10),
     410            4 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     411            4 :             ),
     412            4 :         ];
     413            4 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     414            4 :             .await
     415            4 :             .unwrap();
     416            4 :         let test_deltas2 = vec![
     417            4 :             (
     418            4 :                 get_key(3),
     419            4 :                 Lsn(0x10),
     420            4 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     421            4 :             ),
     422            4 :             (
     423            4 :                 get_key(4),
     424            4 :                 Lsn(0x10),
     425            4 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     426            4 :             ),
     427            4 :         ];
     428            4 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     429            4 :             .await
     430            4 :             .unwrap();
     431            4 :         let mut merge_iter = MergeIterator::create(
     432            4 :             &[
     433            4 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     434            4 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     435            4 :             ],
     436            4 :             &[],
     437            4 :             &ctx,
     438            4 :         );
     439            4 :         let mut expect = Vec::new();
     440            4 :         expect.extend(test_deltas1);
     441            4 :         expect.extend(test_deltas2);
     442            4 :         expect.sort_by(sort_delta);
     443            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     444            4 :     }
     445              : 
     446              :     #[tokio::test]
     447            4 :     async fn delta_merge() {
     448            4 :         use bytes::Bytes;
     449            4 :         use pageserver_api::value::Value;
     450            4 : 
     451            4 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     452            4 :             .await
     453            4 :             .unwrap();
     454            4 :         let (tenant, ctx) = harness.load().await;
     455            4 : 
     456            4 :         let tline = tenant
     457            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     458            4 :             .await
     459            4 :             .unwrap();
     460            4 : 
     461        12000 :         fn get_key(id: u32) -> Key {
     462        12000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     463        12000 :             key.field6 = id;
     464        12000 :             key
     465        12000 :         }
     466            4 :         const N: usize = 1000;
     467            4 :         let test_deltas1 = (0..N)
     468         4000 :             .map(|idx| {
     469         4000 :                 (
     470         4000 :                     get_key(idx as u32 / 10),
     471         4000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     472         4000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     473         4000 :                 )
     474         4000 :             })
     475            4 :             .collect_vec();
     476            4 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     477            4 :             .await
     478            4 :             .unwrap();
     479            4 :         let test_deltas2 = (0..N)
     480         4000 :             .map(|idx| {
     481         4000 :                 (
     482         4000 :                     get_key(idx as u32 / 10),
     483         4000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     484         4000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     485         4000 :                 )
     486         4000 :             })
     487            4 :             .collect_vec();
     488            4 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     489            4 :             .await
     490            4 :             .unwrap();
     491            4 :         let test_deltas3 = (0..N)
     492         4000 :             .map(|idx| {
     493         4000 :                 (
     494         4000 :                     get_key(idx as u32 / 10 + N as u32),
     495         4000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     496         4000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     497         4000 :                 )
     498         4000 :             })
     499            4 :             .collect_vec();
     500            4 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     501            4 :             .await
     502            4 :             .unwrap();
     503            4 :         let mut merge_iter = MergeIterator::create(
     504            4 :             &[
     505            4 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     506            4 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     507            4 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     508            4 :             ],
     509            4 :             &[],
     510            4 :             &ctx,
     511            4 :         );
     512            4 :         let mut expect = Vec::new();
     513            4 :         expect.extend(test_deltas1);
     514            4 :         expect.extend(test_deltas2);
     515            4 :         expect.extend(test_deltas3);
     516            4 :         expect.sort_by(sort_delta);
     517            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     518            4 : 
     519            4 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     520            4 :     }
     521              : 
     522              :     #[cfg(feature = "testing")]
     523              :     #[tokio::test]
     524            4 :     async fn delta_image_mixed_merge() {
     525            4 :         use bytes::Bytes;
     526            4 :         use pageserver_api::value::Value;
     527            4 : 
     528            4 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     529            4 :             .await
     530            4 :             .unwrap();
     531            4 :         let (tenant, ctx) = harness.load().await;
     532            4 : 
     533            4 :         let tline = tenant
     534            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     535            4 :             .await
     536            4 :             .unwrap();
     537            4 : 
     538           36 :         fn get_key(id: u32) -> Key {
     539           36 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     540           36 :             key.field6 = id;
     541           36 :             key
     542           36 :         }
     543            4 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     544            4 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     545            4 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     546            4 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     547            4 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     548            4 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     549            4 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     550            4 :         // will handle duplicated LSNs.
     551            4 :         let test_deltas1 = vec![
     552            4 :             (
     553            4 :                 get_key(0),
     554            4 :                 Lsn(0x10),
     555            4 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     556            4 :             ),
     557            4 :             (
     558            4 :                 get_key(0),
     559            4 :                 Lsn(0x18),
     560            4 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     561            4 :             ),
     562            4 :             (
     563            4 :                 get_key(5),
     564            4 :                 Lsn(0x10),
     565            4 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     566            4 :             ),
     567            4 :             (
     568            4 :                 get_key(5),
     569            4 :                 Lsn(0x18),
     570            4 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     571            4 :             ),
     572            4 :         ];
     573            4 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     574            4 :             .await
     575            4 :             .unwrap();
     576            4 :         let mut test_deltas2 = test_deltas1.clone();
     577            4 :         test_deltas2.push((
     578            4 :             get_key(10),
     579            4 :             Lsn(0x20),
     580            4 :             Value::Image(Bytes::copy_from_slice(b"test")),
     581            4 :         ));
     582            4 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     583            4 :             .await
     584            4 :             .unwrap();
     585            4 :         let test_deltas3 = vec![
     586            4 :             (
     587            4 :                 get_key(0),
     588            4 :                 Lsn(0x10),
     589            4 :                 Value::Image(Bytes::copy_from_slice(b"")),
     590            4 :             ),
     591            4 :             (
     592            4 :                 get_key(5),
     593            4 :                 Lsn(0x18),
     594            4 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     595            4 :             ),
     596            4 :             (
     597            4 :                 get_key(15),
     598            4 :                 Lsn(0x20),
     599            4 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     600            4 :             ),
     601            4 :         ];
     602            4 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     603            4 :             .await
     604            4 :             .unwrap();
     605            4 :         let mut test_deltas4 = test_deltas3.clone();
     606            4 :         test_deltas4.push((
     607            4 :             get_key(20),
     608            4 :             Lsn(0x20),
     609            4 :             Value::Image(Bytes::copy_from_slice(b"test")),
     610            4 :         ));
     611            4 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     612            4 :             .await
     613            4 :             .unwrap();
     614            4 :         let mut expect = Vec::new();
     615            4 :         expect.extend(test_deltas1);
     616            4 :         expect.extend(test_deltas2);
     617            4 :         expect.extend(test_deltas3);
     618            4 :         expect.extend(test_deltas4);
     619            4 :         expect.sort_by(sort_delta_value);
     620            4 : 
     621            4 :         // Test with different layer order for MergeIterator::create to ensure the order
     622            4 :         // is stable.
     623            4 : 
     624            4 :         let mut merge_iter = MergeIterator::create(
     625            4 :             &[
     626            4 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     627            4 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     628            4 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     629            4 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     630            4 :             ],
     631            4 :             &[],
     632            4 :             &ctx,
     633            4 :         );
     634            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     635            4 : 
     636            4 :         let mut merge_iter = MergeIterator::create(
     637            4 :             &[
     638            4 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     639            4 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     640            4 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     641            4 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     642            4 :             ],
     643            4 :             &[],
     644            4 :             &ctx,
     645            4 :         );
     646            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     647            4 : 
     648            4 :         is_send(merge_iter);
     649            4 :     }
     650              : 
     651              :     #[cfg(feature = "testing")]
     652            4 :     fn is_send(_: impl Send) {}
     653              : }
        

Generated by: LCOV version 2.1-beta