LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 52d9d4a58355424a48c56cb9ba9670a073f618b9.info Lines: 94.3 % 475 448
Test Date: 2024-11-21 08:31:22 Functions: 90.4 % 52 47

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{binary_heap, BinaryHeap},
       4              :     sync::Arc,
       5              : };
       6              : 
       7              : use anyhow::bail;
       8              : use pageserver_api::key::Key;
       9              : use utils::lsn::Lsn;
      10              : 
      11              : use crate::context::RequestContext;
      12              : use pageserver_api::value::Value;
      13              : 
      14              : use super::{
      15              :     delta_layer::{DeltaLayerInner, DeltaLayerIterator},
      16              :     image_layer::{ImageLayerInner, ImageLayerIterator},
      17              :     PersistentLayerDesc, PersistentLayerKey,
      18              : };
      19              : 
      20              : #[derive(Clone, Copy)]
      21              : pub(crate) enum LayerRef<'a> {
      22              :     Image(&'a ImageLayerInner),
      23              :     Delta(&'a DeltaLayerInner),
      24              : }
      25              : 
      26              : impl<'a> LayerRef<'a> {
      27          552 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      28          552 :         match self {
      29           58 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      30          494 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      31              :         }
      32          552 :     }
      33              : 
      34            0 :     fn layer_dbg_info(&self) -> String {
      35            0 :         match self {
      36            0 :             Self::Image(x) => x.layer_dbg_info(),
      37            0 :             Self::Delta(x) => x.layer_dbg_info(),
      38              :         }
      39            0 :     }
      40              : }
      41              : 
      42              : enum LayerIterRef<'a> {
      43              :     Image(ImageLayerIterator<'a>),
      44              :     Delta(DeltaLayerIterator<'a>),
      45              : }
      46              : 
      47              : impl LayerIterRef<'_> {
      48      2072386 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      49      2072386 :         match self {
      50      2071694 :             Self::Delta(x) => x.next().await,
      51          692 :             Self::Image(x) => x.next().await,
      52              :         }
      53      2072386 :     }
      54              : 
      55            0 :     fn layer_dbg_info(&self) -> String {
      56            0 :         match self {
      57            0 :             Self::Image(x) => x.layer_dbg_info(),
      58            0 :             Self::Delta(x) => x.layer_dbg_info(),
      59              :         }
      60            0 :     }
      61              : }
      62              : 
      63              : /// This type plays several roles at once
      64              : /// 1. Unified iterator for image and delta layers.
      65              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      66              : /// 3. Lazy creation of the real delta/image iterator.
      67              : pub(crate) enum IteratorWrapper<'a> {
      68              :     NotLoaded {
      69              :         ctx: &'a RequestContext,
      70              :         first_key_lower_bound: (Key, Lsn),
      71              :         layer: LayerRef<'a>,
      72              :         source_desc: Arc<PersistentLayerKey>,
      73              :     },
      74              :     Loaded {
      75              :         iter: PeekableLayerIterRef<'a>,
      76              :         source_desc: Arc<PersistentLayerKey>,
      77              :     },
      78              : }
      79              : 
      80              : pub(crate) struct PeekableLayerIterRef<'a> {
      81              :     iter: LayerIterRef<'a>,
      82              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      83              : }
      84              : 
      85              : impl<'a> PeekableLayerIterRef<'a> {
      86          552 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      87          670 :         let peeked = iter.next().await?;
      88          552 :         Ok(Self { iter, peeked })
      89          552 :     }
      90              : 
      91      8450516 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      92      8450516 :         &self.peeked
      93      8450516 :     }
      94              : 
      95      2071834 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      96      2071834 :         let result = self.peeked.take();
      97      2071834 :         self.peeked = self.iter.next().await?;
      98      2071834 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
      99      2070734 :             if (k1, l1) < (k2, l2) {
     100            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
     101      2070734 :             }
     102         1100 :         }
     103      2071834 :         Ok(result)
     104      2071834 :     }
     105              : }
     106              : 
     107              : impl std::cmp::PartialEq for IteratorWrapper<'_> {
     108            0 :     fn eq(&self, other: &Self) -> bool {
     109            0 :         self.cmp(other) == Ordering::Equal
     110            0 :     }
     111              : }
     112              : 
     113              : impl std::cmp::Eq for IteratorWrapper<'_> {}
     114              : 
     115              : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
     116      4229377 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     117      4229377 :         Some(self.cmp(other))
     118      4229377 :     }
     119              : }
     120              : 
     121              : impl std::cmp::Ord for IteratorWrapper<'_> {
     122      4229377 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     123              :         use std::cmp::Ordering;
     124      4229377 :         let a = self.peek_next_key_lsn_value();
     125      4229377 :         let b = other.peek_next_key_lsn_value();
     126      4229377 :         match (a, b) {
     127      3021393 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     128      6042786 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     129      6034118 :                     match val {
     130         8668 :                         None => 0,
     131      6033658 :                         Some(Value::Image(_)) => 1,
     132          460 :                         Some(Value::WalRecord(_)) => 2,
     133              :                     }
     134      6042786 :                 }
     135      3021393 :                 let order_1 = map_value_to_num(&v1);
     136      3021393 :                 let order_2 = map_value_to_num(&v2);
     137      3021393 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     138      3021393 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     139      3021393 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     140              :             }
     141      1002911 :             (Some(_), None) => Ordering::Less,
     142         1327 :             (None, Some(_)) => Ordering::Greater,
     143       203746 :             (None, None) => Ordering::Equal,
     144              :         }
     145      4229377 :         .reverse()
     146      4229377 :     }
     147              : }
     148              : 
     149              : impl<'a> IteratorWrapper<'a> {
     150           58 :     pub fn create_from_image_layer(
     151           58 :         image_layer: &'a ImageLayerInner,
     152           58 :         ctx: &'a RequestContext,
     153           58 :     ) -> Self {
     154           58 :         Self::NotLoaded {
     155           58 :             layer: LayerRef::Image(image_layer),
     156           58 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     157           58 :             ctx,
     158           58 :             source_desc: PersistentLayerKey {
     159           58 :                 key_range: image_layer.key_range().clone(),
     160           58 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
     161           58 :                 is_delta: false,
     162           58 :             }
     163           58 :             .into(),
     164           58 :         }
     165           58 :     }
     166              : 
     167          494 :     pub fn create_from_delta_layer(
     168          494 :         delta_layer: &'a DeltaLayerInner,
     169          494 :         ctx: &'a RequestContext,
     170          494 :     ) -> Self {
     171          494 :         Self::NotLoaded {
     172          494 :             layer: LayerRef::Delta(delta_layer),
     173          494 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     174          494 :             ctx,
     175          494 :             source_desc: PersistentLayerKey {
     176          494 :                 key_range: delta_layer.key_range().clone(),
     177          494 :                 lsn_range: delta_layer.lsn_range().clone(),
     178          494 :                 is_delta: true,
     179          494 :             }
     180          494 :             .into(),
     181          494 :         }
     182          494 :     }
     183              : 
     184      8458754 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     185      8458754 :         match self {
     186      8449964 :             Self::Loaded { iter, .. } => iter
     187      8449964 :                 .peek()
     188      8449964 :                 .as_ref()
     189      8449964 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     190         8790 :             Self::NotLoaded {
     191         8790 :                 first_key_lower_bound: (key, lsn),
     192         8790 :                 ..
     193         8790 :             } => Some((key, *lsn, None)),
     194              :         }
     195      8458754 :     }
     196              : 
     197              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     198              :     //
     199              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     200              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     201              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     202              :     // and not just `PeekMut::deref`
     203              :     // If we don't take `&mut self`
     204          552 :     async fn load(&mut self) -> anyhow::Result<()> {
     205          552 :         assert!(!self.is_loaded());
     206          552 :         let Self::NotLoaded {
     207          552 :             ctx,
     208          552 :             first_key_lower_bound,
     209          552 :             layer,
     210          552 :             source_desc,
     211          552 :         } = self
     212              :         else {
     213            0 :             unreachable!()
     214              :         };
     215          552 :         let iter = layer.iter(ctx);
     216          670 :         let iter = PeekableLayerIterRef::create(iter).await?;
     217          552 :         if let Some((k1, l1, _)) = iter.peek() {
     218          552 :             let (k2, l2) = first_key_lower_bound;
     219          552 :             if (k1, l1) < (k2, l2) {
     220            0 :                 bail!(
     221            0 :                     "layer key range did not include the first key in the layer: {}",
     222            0 :                     layer.layer_dbg_info()
     223            0 :                 );
     224          552 :             }
     225            0 :         }
     226          552 :         *self = Self::Loaded {
     227          552 :             iter,
     228          552 :             source_desc: source_desc.clone(),
     229          552 :         };
     230          552 :         Ok(())
     231          552 :     }
     232              : 
     233      2072938 :     fn is_loaded(&self) -> bool {
     234      2072938 :         matches!(self, Self::Loaded { .. })
     235      2072938 :     }
     236              : 
     237              :     /// Correctness: must load the iterator before using.
     238              :     ///
     239              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     240              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     241              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     242      2071834 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     243      2071834 :         let Self::Loaded { iter, .. } = self else {
     244            0 :             panic!("must load the iterator before using")
     245              :         };
     246      2071834 :         iter.next().await
     247      2071834 :     }
     248              : 
     249              :     /// Get the persistent layer key corresponding to this iterator
     250          782 :     fn trace_source(&self) -> Arc<PersistentLayerKey> {
     251          782 :         match self {
     252          782 :             Self::Loaded { source_desc, .. } => source_desc.clone(),
     253            0 :             Self::NotLoaded { source_desc, .. } => source_desc.clone(),
     254              :         }
     255          782 :     }
     256              : }
     257              : 
     258              : /// A merge iterator over delta/image layer iterators.
     259              : ///
     260              : /// When duplicated records are found, the iterator will not perform any
     261              : /// deduplication, and the caller should handle these situation. By saying
     262              : /// duplicated records, there are many possibilities:
     263              : ///
     264              : /// * Two same delta at the same LSN.
     265              : /// * Two same image at the same LSN.
     266              : /// * Delta/image at the same LSN where the image has already applied the delta.
     267              : ///
     268              : /// The iterator will always put the image before the delta.
     269              : pub struct MergeIterator<'a> {
     270              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     271              : }
     272              : 
     273              : pub(crate) trait MergeIteratorItem {
     274              :     fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
     275              : 
     276              :     fn key_lsn_value(&self) -> &(Key, Lsn, Value);
     277              : }
     278              : 
     279              : impl MergeIteratorItem for (Key, Lsn, Value) {
     280      2070502 :     fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
     281      2070502 :         item
     282      2070502 :     }
     283              : 
     284          790 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     285          790 :         self
     286          790 :     }
     287              : }
     288              : 
     289              : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
     290          782 :     fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
     291          782 :         (item, iter.trace_source().clone())
     292          782 :     }
     293              : 
     294         1640 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     295         1640 :         &self.0
     296         1640 :     }
     297              : }
     298              : 
     299              : impl<'a> MergeIterator<'a> {
     300           80 :     pub fn create(
     301           80 :         deltas: &[&'a DeltaLayerInner],
     302           80 :         images: &[&'a ImageLayerInner],
     303           80 :         ctx: &'a RequestContext,
     304           80 :     ) -> Self {
     305           80 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     306          138 :         for image in images {
     307           58 :             heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
     308           58 :         }
     309          574 :         for delta in deltas {
     310          494 :             heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
     311          494 :         }
     312           80 :         Self {
     313           80 :             heap: BinaryHeap::from(heap),
     314           80 :         }
     315           80 :     }
     316              : 
     317      2071362 :     pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
     318      2072464 :         while let Some(mut iter) = self.heap.peek_mut() {
     319      2072386 :             if !iter.is_loaded() {
     320              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     321              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     322              :                 // [potential_first_key, loaded_first_key).
     323          670 :                 iter.load().await?;
     324          552 :                 continue;
     325      2071834 :             }
     326      2071834 :             let Some(item) = iter.next().await? else {
     327              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     328              :                 // we order None > Some, and all the rest of the iterators should return None.
     329          550 :                 binary_heap::PeekMut::pop(iter);
     330          550 :                 continue;
     331              :             };
     332      2071284 :             return Ok(Some(R::new(item, &iter)));
     333              :         }
     334           78 :         Ok(None)
     335      2071362 :     }
     336              : 
     337              :     /// Get the next key-value pair from the iterator.
     338      2070146 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     339      2070146 :         self.next_inner().await
     340      2070146 :     }
     341              : 
     342              :     /// Get the next key-value pair from the iterator, and trace where the key comes from.
     343            0 :     pub async fn next_with_trace(
     344            0 :         &mut self,
     345            0 :     ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
     346            0 :         self.next_inner().await
     347            0 :     }
     348              : }
     349              : 
     350              : #[cfg(test)]
     351              : mod tests {
     352              :     use super::*;
     353              : 
     354              :     use itertools::Itertools;
     355              :     use pageserver_api::key::Key;
     356              :     use utils::lsn::Lsn;
     357              : 
     358              :     use crate::{
     359              :         tenant::{
     360              :             harness::{TenantHarness, TIMELINE_ID},
     361              :             storage_layer::delta_layer::test::{produce_delta_layer, sort_delta},
     362              :         },
     363              :         DEFAULT_PG_VERSION,
     364              :     };
     365              : 
     366              :     #[cfg(feature = "testing")]
     367              :     use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
     368              :     #[cfg(feature = "testing")]
     369              :     use pageserver_api::record::NeonWalRecord;
     370              : 
     371            8 :     async fn assert_merge_iter_equal(
     372            8 :         merge_iter: &mut MergeIterator<'_>,
     373            8 :         expect: &[(Key, Lsn, Value)],
     374            8 :     ) {
     375            8 :         let mut expect_iter = expect.iter();
     376              :         loop {
     377         6080 :             let o1 = merge_iter.next().await.unwrap();
     378         6080 :             let o2 = expect_iter.next();
     379         6080 :             assert_eq!(o1.is_some(), o2.is_some());
     380         6080 :             if o1.is_none() && o2.is_none() {
     381            8 :                 break;
     382         6072 :             }
     383         6072 :             let (k1, l1, v1) = o1.unwrap();
     384         6072 :             let (k2, l2, v2) = o2.unwrap();
     385         6072 :             assert_eq!(&k1, k2);
     386         6072 :             assert_eq!(l1, *l2);
     387         6072 :             assert_eq!(&v1, v2);
     388              :         }
     389            8 :     }
     390              : 
     391              :     #[tokio::test]
     392            2 :     async fn merge_in_between() {
     393            2 :         use bytes::Bytes;
     394            2 :         use pageserver_api::value::Value;
     395            2 : 
     396            2 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     397            2 :             .await
     398            2 :             .unwrap();
     399           20 :         let (tenant, ctx) = harness.load().await;
     400            2 : 
     401            2 :         let tline = tenant
     402            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     403            6 :             .await
     404            2 :             .unwrap();
     405            2 : 
     406            8 :         fn get_key(id: u32) -> Key {
     407            8 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     408            8 :             key.field6 = id;
     409            8 :             key
     410            8 :         }
     411            2 :         let test_deltas1 = vec![
     412            2 :             (
     413            2 :                 get_key(0),
     414            2 :                 Lsn(0x10),
     415            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     416            2 :             ),
     417            2 :             (
     418            2 :                 get_key(5),
     419            2 :                 Lsn(0x10),
     420            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     421            2 :             ),
     422            2 :         ];
     423            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     424            6 :             .await
     425            2 :             .unwrap();
     426            2 :         let test_deltas2 = vec![
     427            2 :             (
     428            2 :                 get_key(3),
     429            2 :                 Lsn(0x10),
     430            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     431            2 :             ),
     432            2 :             (
     433            2 :                 get_key(4),
     434            2 :                 Lsn(0x10),
     435            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     436            2 :             ),
     437            2 :         ];
     438            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     439            6 :             .await
     440            2 :             .unwrap();
     441            2 :         let mut merge_iter = MergeIterator::create(
     442            2 :             &[
     443            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     444            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     445            2 :             ],
     446            2 :             &[],
     447            2 :             &ctx,
     448            2 :         );
     449            2 :         let mut expect = Vec::new();
     450            2 :         expect.extend(test_deltas1);
     451            2 :         expect.extend(test_deltas2);
     452            2 :         expect.sort_by(sort_delta);
     453            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     454            2 :     }
     455              : 
     456              :     #[tokio::test]
     457            2 :     async fn delta_merge() {
     458            2 :         use bytes::Bytes;
     459            2 :         use pageserver_api::value::Value;
     460            2 : 
     461            2 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     462            2 :             .await
     463            2 :             .unwrap();
     464           20 :         let (tenant, ctx) = harness.load().await;
     465            2 : 
     466            2 :         let tline = tenant
     467            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     468            6 :             .await
     469            2 :             .unwrap();
     470            2 : 
     471         6000 :         fn get_key(id: u32) -> Key {
     472         6000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     473         6000 :             key.field6 = id;
     474         6000 :             key
     475         6000 :         }
     476            2 :         const N: usize = 1000;
     477            2 :         let test_deltas1 = (0..N)
     478         2000 :             .map(|idx| {
     479         2000 :                 (
     480         2000 :                     get_key(idx as u32 / 10),
     481         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     482         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     483         2000 :                 )
     484         2000 :             })
     485            2 :             .collect_vec();
     486            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     487            8 :             .await
     488            2 :             .unwrap();
     489            2 :         let test_deltas2 = (0..N)
     490         2000 :             .map(|idx| {
     491         2000 :                 (
     492         2000 :                     get_key(idx as u32 / 10),
     493         2000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     494         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     495         2000 :                 )
     496         2000 :             })
     497            2 :             .collect_vec();
     498            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     499            8 :             .await
     500            2 :             .unwrap();
     501            2 :         let test_deltas3 = (0..N)
     502         2000 :             .map(|idx| {
     503         2000 :                 (
     504         2000 :                     get_key(idx as u32 / 10 + N as u32),
     505         2000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     506         2000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     507         2000 :                 )
     508         2000 :             })
     509            2 :             .collect_vec();
     510            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     511            8 :             .await
     512            2 :             .unwrap();
     513            2 :         let mut merge_iter = MergeIterator::create(
     514            2 :             &[
     515            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     516            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     517            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     518            2 :             ],
     519            2 :             &[],
     520            2 :             &ctx,
     521            2 :         );
     522            2 :         let mut expect = Vec::new();
     523            2 :         expect.extend(test_deltas1);
     524            2 :         expect.extend(test_deltas2);
     525            2 :         expect.extend(test_deltas3);
     526            2 :         expect.sort_by(sort_delta);
     527           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     528            2 : 
     529            2 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     530            2 :     }
     531              : 
     532              :     #[cfg(feature = "testing")]
     533              :     #[tokio::test]
     534            2 :     async fn delta_image_mixed_merge() {
     535            2 :         use bytes::Bytes;
     536            2 :         use pageserver_api::value::Value;
     537            2 : 
     538            2 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     539            2 :             .await
     540            2 :             .unwrap();
     541           20 :         let (tenant, ctx) = harness.load().await;
     542            2 : 
     543            2 :         let tline = tenant
     544            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     545            6 :             .await
     546            2 :             .unwrap();
     547            2 : 
     548           18 :         fn get_key(id: u32) -> Key {
     549           18 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     550           18 :             key.field6 = id;
     551           18 :             key
     552           18 :         }
     553            2 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     554            2 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     555            2 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     556            2 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     557            2 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     558            2 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     559            2 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     560            2 :         // will handle duplicated LSNs.
     561            2 :         let test_deltas1 = vec![
     562            2 :             (
     563            2 :                 get_key(0),
     564            2 :                 Lsn(0x10),
     565            2 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     566            2 :             ),
     567            2 :             (
     568            2 :                 get_key(0),
     569            2 :                 Lsn(0x18),
     570            2 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     571            2 :             ),
     572            2 :             (
     573            2 :                 get_key(5),
     574            2 :                 Lsn(0x10),
     575            2 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     576            2 :             ),
     577            2 :             (
     578            2 :                 get_key(5),
     579            2 :                 Lsn(0x18),
     580            2 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     581            2 :             ),
     582            2 :         ];
     583            2 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     584            6 :             .await
     585            2 :             .unwrap();
     586            2 :         let mut test_deltas2 = test_deltas1.clone();
     587            2 :         test_deltas2.push((
     588            2 :             get_key(10),
     589            2 :             Lsn(0x20),
     590            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     591            2 :         ));
     592            2 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     593            6 :             .await
     594            2 :             .unwrap();
     595            2 :         let test_deltas3 = vec![
     596            2 :             (
     597            2 :                 get_key(0),
     598            2 :                 Lsn(0x10),
     599            2 :                 Value::Image(Bytes::copy_from_slice(b"")),
     600            2 :             ),
     601            2 :             (
     602            2 :                 get_key(5),
     603            2 :                 Lsn(0x18),
     604            2 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     605            2 :             ),
     606            2 :             (
     607            2 :                 get_key(15),
     608            2 :                 Lsn(0x20),
     609            2 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     610            2 :             ),
     611            2 :         ];
     612            2 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     613            6 :             .await
     614            2 :             .unwrap();
     615            2 :         let mut test_deltas4 = test_deltas3.clone();
     616            2 :         test_deltas4.push((
     617            2 :             get_key(20),
     618            2 :             Lsn(0x20),
     619            2 :             Value::Image(Bytes::copy_from_slice(b"test")),
     620            2 :         ));
     621            2 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     622            6 :             .await
     623            2 :             .unwrap();
     624            2 :         let mut expect = Vec::new();
     625            2 :         expect.extend(test_deltas1);
     626            2 :         expect.extend(test_deltas2);
     627            2 :         expect.extend(test_deltas3);
     628            2 :         expect.extend(test_deltas4);
     629            2 :         expect.sort_by(sort_delta_value);
     630            2 : 
     631            2 :         // Test with different layer order for MergeIterator::create to ensure the order
     632            2 :         // is stable.
     633            2 : 
     634            2 :         let mut merge_iter = MergeIterator::create(
     635            2 :             &[
     636            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     637            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     638            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     639            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     640            2 :             ],
     641            2 :             &[],
     642            2 :             &ctx,
     643            2 :         );
     644            8 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     645            2 : 
     646            2 :         let mut merge_iter = MergeIterator::create(
     647            2 :             &[
     648            2 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     649            2 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     650            2 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     651            2 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     652            2 :             ],
     653            2 :             &[],
     654            2 :             &ctx,
     655            2 :         );
     656            4 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     657            2 : 
     658            2 :         is_send(merge_iter);
     659            2 :     }
     660              : 
     661              :     #[cfg(feature = "testing")]
     662            2 :     fn is_send(_: impl Send) {}
     663              : }
        

Generated by: LCOV version 2.1-beta