LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 94.3 % 460 434
Test Date: 2025-07-16 12:29:03 Functions: 90.6 % 53 48

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, binary_heap};
       3              : use std::sync::Arc;
       4              : 
       5              : use anyhow::bail;
       6              : use pageserver_api::key::Key;
       7              : use utils::lsn::Lsn;
       8              : use wal_decoder::models::value::Value;
       9              : 
      10              : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
      11              : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
      12              : use super::{PersistentLayerDesc, PersistentLayerKey};
      13              : use crate::context::RequestContext;
      14              : 
      15              : #[derive(Clone, Copy)]
      16              : pub(crate) enum LayerRef<'a> {
      17              :     Image(&'a ImageLayerInner),
      18              :     Delta(&'a DeltaLayerInner),
      19              : }
      20              : 
      21              : impl<'a> LayerRef<'a> {
      22          295 :     fn iter_with_options(
      23          295 :         self,
      24          295 :         ctx: &'a RequestContext,
      25          295 :         max_read_size: u64,
      26          295 :         max_batch_size: usize,
      27          295 :     ) -> LayerIterRef<'a> {
      28          295 :         match self {
      29           35 :             Self::Image(x) => {
      30           35 :                 LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
      31              :             }
      32          260 :             Self::Delta(x) => {
      33          260 :                 LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
      34              :             }
      35              :         }
      36          295 :     }
      37              : 
      38            0 :     fn layer_dbg_info(&self) -> String {
      39            0 :         match self {
      40            0 :             Self::Image(x) => x.layer_dbg_info(),
      41            0 :             Self::Delta(x) => x.layer_dbg_info(),
      42              :         }
      43            0 :     }
      44              : }
      45              : 
      46              : enum LayerIterRef<'a> {
      47              :     Image(ImageLayerIterator<'a>),
      48              :     Delta(DeltaLayerIterator<'a>),
      49              : }
      50              : 
      51              : impl LayerIterRef<'_> {
      52      1036307 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      53      1036307 :         match self {
      54      1035907 :             Self::Delta(x) => x.next().await,
      55          400 :             Self::Image(x) => x.next().await,
      56              :         }
      57      1036307 :     }
      58              : 
      59            0 :     fn layer_dbg_info(&self) -> String {
      60            0 :         match self {
      61            0 :             Self::Image(x) => x.layer_dbg_info(),
      62            0 :             Self::Delta(x) => x.layer_dbg_info(),
      63              :         }
      64            0 :     }
      65              : }
      66              : 
      67              : /// This type plays several roles at once
      68              : /// 1. Unified iterator for image and delta layers.
      69              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      70              : /// 3. Lazy creation of the real delta/image iterator.
      71              : #[allow(clippy::large_enum_variant, reason = "TODO")]
      72              : pub(crate) enum IteratorWrapper<'a> {
      73              :     NotLoaded {
      74              :         ctx: &'a RequestContext,
      75              :         first_key_lower_bound: (Key, Lsn),
      76              :         layer: LayerRef<'a>,
      77              :         source_desc: Arc<PersistentLayerKey>,
      78              :         max_read_size: u64,
      79              :         max_batch_size: usize,
      80              :     },
      81              :     Loaded {
      82              :         iter: PeekableLayerIterRef<'a>,
      83              :         source_desc: Arc<PersistentLayerKey>,
      84              :     },
      85              : }
      86              : 
      87              : pub(crate) struct PeekableLayerIterRef<'a> {
      88              :     iter: LayerIterRef<'a>,
      89              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      90              : }
      91              : 
      92              : impl<'a> PeekableLayerIterRef<'a> {
      93          295 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
      94          295 :         let peeked = iter.next().await?;
      95          295 :         Ok(Self { iter, peeked })
      96          295 :     }
      97              : 
      98      4140847 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
      99      4140847 :         &self.peeked
     100      4140847 :     }
     101              : 
     102      1036012 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     103      1036012 :         let result = self.peeked.take();
     104      1036012 :         self.peeked = self.iter.next().await?;
     105      1036012 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
     106      1035428 :             if (k1, l1) < (k2, l2) {
     107            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
     108      1035428 :             }
     109          584 :         }
     110      1036012 :         Ok(result)
     111      1036012 :     }
     112              : }
     113              : 
     114              : impl std::cmp::PartialEq for IteratorWrapper<'_> {
     115            0 :     fn eq(&self, other: &Self) -> bool {
     116            0 :         self.cmp(other) == Ordering::Equal
     117            0 :     }
     118              : }
     119              : 
     120              : impl std::cmp::Eq for IteratorWrapper<'_> {}
     121              : 
     122              : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
     123      2072521 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     124      2072521 :         Some(self.cmp(other))
     125      2072521 :     }
     126              : }
     127              : 
     128              : impl std::cmp::Ord for IteratorWrapper<'_> {
     129      2072521 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     130              :         use std::cmp::Ordering;
     131      2072521 :         let a = self.peek_next_key_lsn_value();
     132      2072521 :         let b = other.peek_next_key_lsn_value();
     133      2072521 :         match (a, b) {
     134      1468509 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     135      2937018 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     136      2932612 :                     match val {
     137         4406 :                         None => 0,
     138      2932354 :                         Some(Value::Image(_)) => 1,
     139          258 :                         Some(Value::WalRecord(_)) => 2,
     140              :                     }
     141      2937018 :                 }
     142      1468509 :                 let order_1 = map_value_to_num(&v1);
     143      1468509 :                 let order_2 = map_value_to_num(&v2);
     144              :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     145              :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     146      1468509 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     147              :             }
     148       501462 :             (Some(_), None) => Ordering::Less,
     149          685 :             (None, Some(_)) => Ordering::Greater,
     150       101865 :             (None, None) => Ordering::Equal,
     151              :         }
     152      2072521 :         .reverse()
     153      2072521 :     }
     154              : }
     155              : 
     156              : impl<'a> IteratorWrapper<'a> {
     157           35 :     pub fn create_from_image_layer(
     158           35 :         image_layer: &'a ImageLayerInner,
     159           35 :         ctx: &'a RequestContext,
     160           35 :         max_read_size: u64,
     161           35 :         max_batch_size: usize,
     162           35 :     ) -> Self {
     163           35 :         Self::NotLoaded {
     164           35 :             layer: LayerRef::Image(image_layer),
     165           35 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     166           35 :             ctx,
     167           35 :             source_desc: PersistentLayerKey {
     168           35 :                 key_range: image_layer.key_range().clone(),
     169           35 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
     170           35 :                 is_delta: false,
     171           35 :             }
     172           35 :             .into(),
     173           35 :             max_read_size,
     174           35 :             max_batch_size,
     175           35 :         }
     176           35 :     }
     177              : 
     178          260 :     pub fn create_from_delta_layer(
     179          260 :         delta_layer: &'a DeltaLayerInner,
     180          260 :         ctx: &'a RequestContext,
     181          260 :         max_read_size: u64,
     182          260 :         max_batch_size: usize,
     183          260 :     ) -> Self {
     184          260 :         Self::NotLoaded {
     185          260 :             layer: LayerRef::Delta(delta_layer),
     186          260 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     187          260 :             ctx,
     188          260 :             source_desc: PersistentLayerKey {
     189          260 :                 key_range: delta_layer.key_range().clone(),
     190          260 :                 lsn_range: delta_layer.lsn_range().clone(),
     191          260 :                 is_delta: true,
     192          260 :             }
     193          260 :             .into(),
     194          260 :             max_read_size,
     195          260 :             max_batch_size,
     196          260 :         }
     197          260 :     }
     198              : 
     199      4145042 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     200      4145042 :         match self {
     201      4140552 :             Self::Loaded { iter, .. } => iter
     202      4140552 :                 .peek()
     203      4140552 :                 .as_ref()
     204      4140552 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     205         4490 :             Self::NotLoaded {
     206         4490 :                 first_key_lower_bound: (key, lsn),
     207         4490 :                 ..
     208         4490 :             } => Some((key, *lsn, None)),
     209              :         }
     210      4145042 :     }
     211              : 
     212              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     213              :     //
     214              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     215              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     216              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     217              :     // and not just `PeekMut::deref`
     218              :     // If we don't take `&mut self`
     219          295 :     async fn load(&mut self) -> anyhow::Result<()> {
     220          295 :         assert!(!self.is_loaded());
     221          295 :         let Self::NotLoaded {
     222          295 :             ctx,
     223          295 :             first_key_lower_bound,
     224          295 :             layer,
     225          295 :             source_desc,
     226          295 :             max_read_size,
     227          295 :             max_batch_size,
     228          295 :         } = self
     229              :         else {
     230            0 :             unreachable!()
     231              :         };
     232          295 :         let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
     233          295 :         let iter = PeekableLayerIterRef::create(iter).await?;
     234          295 :         if let Some((k1, l1, _)) = iter.peek() {
     235          295 :             let (k2, l2) = first_key_lower_bound;
     236          295 :             if (k1, l1) < (k2, l2) {
     237            0 :                 bail!(
     238            0 :                     "layer key range did not include the first key in the layer: {}",
     239            0 :                     layer.layer_dbg_info()
     240              :                 );
     241          295 :             }
     242            0 :         }
     243          295 :         *self = Self::Loaded {
     244          295 :             iter,
     245          295 :             source_desc: source_desc.clone(),
     246          295 :         };
     247          295 :         Ok(())
     248          295 :     }
     249              : 
     250      1036602 :     fn is_loaded(&self) -> bool {
     251      1036602 :         matches!(self, Self::Loaded { .. })
     252      1036602 :     }
     253              : 
     254              :     /// Correctness: must load the iterator before using.
     255              :     ///
     256              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     257              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     258              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     259      1036012 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     260      1036012 :         let Self::Loaded { iter, .. } = self else {
     261            0 :             panic!("must load the iterator before using")
     262              :         };
     263      1036012 :         iter.next().await
     264      1036012 :     }
     265              : 
     266              :     /// Get the persistent layer key corresponding to this iterator
     267          470 :     fn trace_source(&self) -> Arc<PersistentLayerKey> {
     268          470 :         match self {
     269          470 :             Self::Loaded { source_desc, .. } => source_desc.clone(),
     270            0 :             Self::NotLoaded { source_desc, .. } => source_desc.clone(),
     271              :         }
     272          470 :     }
     273              : }
     274              : 
     275              : /// A merge iterator over delta/image layer iterators.
     276              : ///
     277              : /// When duplicated records are found, the iterator will not perform any
     278              : /// deduplication, and the caller should handle these situation. By saying
     279              : /// duplicated records, there are many possibilities:
     280              : ///
     281              : /// * Two same delta at the same LSN.
     282              : /// * Two same image at the same LSN.
     283              : /// * Delta/image at the same LSN where the image has already applied the delta.
     284              : ///
     285              : /// The iterator will always put the image before the delta.
     286              : pub struct MergeIterator<'a> {
     287              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     288              : }
     289              : 
     290              : pub(crate) trait MergeIteratorItem {
     291              :     fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
     292              : 
     293              :     fn key_lsn_value(&self) -> &(Key, Lsn, Value);
     294              : }
     295              : 
     296              : impl MergeIteratorItem for (Key, Lsn, Value) {
     297      1035251 :     fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
     298      1035251 :         item
     299      1035251 :     }
     300              : 
     301          395 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     302          395 :         self
     303          395 :     }
     304              : }
     305              : 
     306              : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
     307          470 :     fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
     308          470 :         (item, iter.trace_source().clone())
     309          470 :     }
     310              : 
     311          990 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     312          990 :         &self.0
     313          990 :     }
     314              : }
     315              : 
     316              : impl<'a> MergeIterator<'a> {
     317              :     #[cfg(test)]
     318            6 :     pub(crate) fn create_for_testing(
     319            6 :         deltas: &[&'a DeltaLayerInner],
     320            6 :         images: &[&'a ImageLayerInner],
     321            6 :         ctx: &'a RequestContext,
     322            6 :     ) -> Self {
     323            6 :         Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
     324            6 :     }
     325              : 
     326              :     /// Create a new merge iterator with custom options.
     327              :     ///
     328              :     /// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
     329              :     /// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
     330              :     /// the buffer does not take too much memory.
     331              :     ///
     332              :     /// The default options for L0 compactions are:
     333              :     /// - max_read_size: 1024 * 8192 (8MB)
     334              :     /// - max_batch_size: 1024
     335              :     ///
     336              :     /// The default options for gc-compaction are:
     337              :     /// - max_read_size: 128 * 8192 (1MB)
     338              :     /// - max_batch_size: 128
     339           56 :     pub fn create_with_options(
     340           56 :         deltas: &[&'a DeltaLayerInner],
     341           56 :         images: &[&'a ImageLayerInner],
     342           56 :         ctx: &'a RequestContext,
     343           56 :         max_read_size: u64,
     344           56 :         max_batch_size: usize,
     345           56 :     ) -> Self {
     346           56 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     347           91 :         for image in images {
     348           35 :             heap.push(IteratorWrapper::create_from_image_layer(
     349           35 :                 image,
     350           35 :                 ctx,
     351           35 :                 max_read_size,
     352           35 :                 max_batch_size,
     353           35 :             ));
     354           35 :         }
     355          316 :         for delta in deltas {
     356          260 :             heap.push(IteratorWrapper::create_from_delta_layer(
     357          260 :                 delta,
     358          260 :                 ctx,
     359          260 :                 max_read_size,
     360          260 :                 max_batch_size,
     361          260 :             ));
     362          260 :         }
     363           56 :         Self {
     364           56 :             heap: BinaryHeap::from(heap),
     365           56 :         }
     366           56 :     }
     367              : 
     368      1035775 :     pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
     369      1036361 :         while let Some(mut iter) = self.heap.peek_mut() {
     370      1036307 :             if !iter.is_loaded() {
     371              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     372              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     373              :                 // [potential_first_key, loaded_first_key).
     374          295 :                 iter.load().await?;
     375          295 :                 continue;
     376      1036012 :             }
     377      1036012 :             let Some(item) = iter.next().await? else {
     378              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     379              :                 // we order None > Some, and all the rest of the iterators should return None.
     380          291 :                 binary_heap::PeekMut::pop(iter);
     381          291 :                 continue;
     382              :             };
     383      1035721 :             return Ok(Some(R::new(item, &iter)));
     384              :         }
     385           54 :         Ok(None)
     386      1035775 :     }
     387              : 
     388              :     /// Get the next key-value pair from the iterator.
     389      1035082 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     390      1035082 :         self.next_inner().await
     391      1035082 :     }
     392              : 
     393              :     /// Get the next key-value pair from the iterator, and trace where the key comes from.
     394            0 :     pub async fn next_with_trace(
     395            0 :         &mut self,
     396            0 :     ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
     397            0 :         self.next_inner().await
     398            0 :     }
     399              : }
     400              : 
     401              : #[cfg(test)]
     402              : mod tests {
     403              :     use itertools::Itertools;
     404              :     use pageserver_api::key::Key;
     405              :     use utils::lsn::Lsn;
     406              :     #[cfg(feature = "testing")]
     407              :     use wal_decoder::models::record::NeonWalRecord;
     408              : 
     409              :     use super::*;
     410              :     use crate::DEFAULT_PG_VERSION;
     411              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     412              :     #[cfg(feature = "testing")]
     413              :     use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
     414              :     use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
     415              : 
     416            4 :     async fn assert_merge_iter_equal(
     417            4 :         merge_iter: &mut MergeIterator<'_>,
     418            4 :         expect: &[(Key, Lsn, Value)],
     419            4 :     ) {
     420            4 :         let mut expect_iter = expect.iter();
     421              :         loop {
     422         3040 :             let o1 = merge_iter.next().await.unwrap();
     423         3040 :             let o2 = expect_iter.next();
     424         3040 :             assert_eq!(o1.is_some(), o2.is_some());
     425         3040 :             if o1.is_none() && o2.is_none() {
     426            4 :                 break;
     427         3036 :             }
     428         3036 :             let (k1, l1, v1) = o1.unwrap();
     429         3036 :             let (k2, l2, v2) = o2.unwrap();
     430         3036 :             assert_eq!(&k1, k2);
     431         3036 :             assert_eq!(l1, *l2);
     432         3036 :             assert_eq!(&v1, v2);
     433              :         }
     434            4 :     }
     435              : 
     436              :     #[tokio::test]
     437            1 :     async fn merge_in_between() {
     438              :         use bytes::Bytes;
     439              : 
     440            1 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     441            1 :             .await
     442            1 :             .unwrap();
     443            1 :         let (tenant, ctx) = harness.load().await;
     444              : 
     445            1 :         let tline = tenant
     446            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     447            1 :             .await
     448            1 :             .unwrap();
     449              : 
     450            4 :         fn get_key(id: u32) -> Key {
     451            4 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     452            4 :             key.field6 = id;
     453            4 :             key
     454            4 :         }
     455            1 :         let test_deltas1 = vec![
     456            1 :             (
     457            1 :                 get_key(0),
     458            1 :                 Lsn(0x10),
     459            1 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     460            1 :             ),
     461            1 :             (
     462            1 :                 get_key(5),
     463            1 :                 Lsn(0x10),
     464            1 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     465            1 :             ),
     466              :         ];
     467            1 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     468            1 :             .await
     469            1 :             .unwrap();
     470            1 :         let test_deltas2 = vec![
     471            1 :             (
     472            1 :                 get_key(3),
     473            1 :                 Lsn(0x10),
     474            1 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     475            1 :             ),
     476            1 :             (
     477            1 :                 get_key(4),
     478            1 :                 Lsn(0x10),
     479            1 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     480            1 :             ),
     481              :         ];
     482            1 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     483            1 :             .await
     484            1 :             .unwrap();
     485            1 :         let mut merge_iter = MergeIterator::create_for_testing(
     486              :             &[
     487            1 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     488            1 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     489              :             ],
     490            1 :             &[],
     491            1 :             &ctx,
     492              :         );
     493            1 :         let mut expect = Vec::new();
     494            1 :         expect.extend(test_deltas1);
     495            1 :         expect.extend(test_deltas2);
     496            1 :         expect.sort_by(sort_delta);
     497            1 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     498            1 :     }
     499              : 
     500              :     #[tokio::test]
     501            1 :     async fn delta_merge() {
     502              :         use bytes::Bytes;
     503              : 
     504            1 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     505            1 :             .await
     506            1 :             .unwrap();
     507            1 :         let (tenant, ctx) = harness.load().await;
     508              : 
     509            1 :         let tline = tenant
     510            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     511            1 :             .await
     512            1 :             .unwrap();
     513              : 
     514         3000 :         fn get_key(id: u32) -> Key {
     515         3000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     516         3000 :             key.field6 = id;
     517         3000 :             key
     518         3000 :         }
     519              :         const N: usize = 1000;
     520            1 :         let test_deltas1 = (0..N)
     521         1000 :             .map(|idx| {
     522         1000 :                 (
     523         1000 :                     get_key(idx as u32 / 10),
     524         1000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     525         1000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     526         1000 :                 )
     527         1000 :             })
     528            1 :             .collect_vec();
     529            1 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     530            1 :             .await
     531            1 :             .unwrap();
     532            1 :         let test_deltas2 = (0..N)
     533         1000 :             .map(|idx| {
     534         1000 :                 (
     535         1000 :                     get_key(idx as u32 / 10),
     536         1000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     537         1000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     538         1000 :                 )
     539         1000 :             })
     540            1 :             .collect_vec();
     541            1 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     542            1 :             .await
     543            1 :             .unwrap();
     544            1 :         let test_deltas3 = (0..N)
     545         1000 :             .map(|idx| {
     546         1000 :                 (
     547         1000 :                     get_key(idx as u32 / 10 + N as u32),
     548         1000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     549         1000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     550         1000 :                 )
     551         1000 :             })
     552            1 :             .collect_vec();
     553            1 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     554            1 :             .await
     555            1 :             .unwrap();
     556            1 :         let mut merge_iter = MergeIterator::create_for_testing(
     557              :             &[
     558            1 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     559            1 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     560            1 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     561              :             ],
     562            1 :             &[],
     563            1 :             &ctx,
     564              :         );
     565            1 :         let mut expect = Vec::new();
     566            1 :         expect.extend(test_deltas1);
     567            1 :         expect.extend(test_deltas2);
     568            1 :         expect.extend(test_deltas3);
     569            1 :         expect.sort_by(sort_delta);
     570            1 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     571              : 
     572              :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     573            1 :     }
     574              : 
     575              :     #[cfg(feature = "testing")]
     576              :     #[tokio::test]
     577            1 :     async fn delta_image_mixed_merge() {
     578              :         use bytes::Bytes;
     579              : 
     580            1 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     581            1 :             .await
     582            1 :             .unwrap();
     583            1 :         let (tenant, ctx) = harness.load().await;
     584              : 
     585            1 :         let tline = tenant
     586            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     587            1 :             .await
     588            1 :             .unwrap();
     589              : 
     590            9 :         fn get_key(id: u32) -> Key {
     591            9 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     592            9 :             key.field6 = id;
     593            9 :             key
     594            9 :         }
     595              :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     596              :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     597              :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     598              :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     599              :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     600              :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     601              :         // correctly process these situations and return everything as-is, and the upper layer of the system
     602              :         // will handle duplicated LSNs.
     603            1 :         let test_deltas1 = vec![
     604            1 :             (
     605            1 :                 get_key(0),
     606            1 :                 Lsn(0x10),
     607            1 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     608            1 :             ),
     609            1 :             (
     610            1 :                 get_key(0),
     611            1 :                 Lsn(0x18),
     612            1 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     613            1 :             ),
     614            1 :             (
     615            1 :                 get_key(5),
     616            1 :                 Lsn(0x10),
     617            1 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     618            1 :             ),
     619            1 :             (
     620            1 :                 get_key(5),
     621            1 :                 Lsn(0x18),
     622            1 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     623            1 :             ),
     624              :         ];
     625            1 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     626            1 :             .await
     627            1 :             .unwrap();
     628            1 :         let mut test_deltas2 = test_deltas1.clone();
     629            1 :         test_deltas2.push((
     630            1 :             get_key(10),
     631            1 :             Lsn(0x20),
     632            1 :             Value::Image(Bytes::copy_from_slice(b"test")),
     633            1 :         ));
     634            1 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     635            1 :             .await
     636            1 :             .unwrap();
     637            1 :         let test_deltas3 = vec![
     638            1 :             (
     639            1 :                 get_key(0),
     640            1 :                 Lsn(0x10),
     641            1 :                 Value::Image(Bytes::copy_from_slice(b"")),
     642            1 :             ),
     643            1 :             (
     644            1 :                 get_key(5),
     645            1 :                 Lsn(0x18),
     646            1 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     647            1 :             ),
     648            1 :             (
     649            1 :                 get_key(15),
     650            1 :                 Lsn(0x20),
     651            1 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     652            1 :             ),
     653              :         ];
     654            1 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     655            1 :             .await
     656            1 :             .unwrap();
     657            1 :         let mut test_deltas4 = test_deltas3.clone();
     658            1 :         test_deltas4.push((
     659            1 :             get_key(20),
     660            1 :             Lsn(0x20),
     661            1 :             Value::Image(Bytes::copy_from_slice(b"test")),
     662            1 :         ));
     663            1 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     664            1 :             .await
     665            1 :             .unwrap();
     666            1 :         let mut expect = Vec::new();
     667            1 :         expect.extend(test_deltas1);
     668            1 :         expect.extend(test_deltas2);
     669            1 :         expect.extend(test_deltas3);
     670            1 :         expect.extend(test_deltas4);
     671            1 :         expect.sort_by(sort_delta_value);
     672              : 
     673              :         // Test with different layer order for MergeIterator::create to ensure the order
     674              :         // is stable.
     675              : 
     676            1 :         let mut merge_iter = MergeIterator::create_for_testing(
     677              :             &[
     678            1 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     679            1 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     680            1 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     681            1 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     682              :             ],
     683            1 :             &[],
     684            1 :             &ctx,
     685              :         );
     686            1 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     687              : 
     688            1 :         let mut merge_iter = MergeIterator::create_for_testing(
     689              :             &[
     690            1 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     691            1 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     692            1 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     693            1 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     694              :             ],
     695            1 :             &[],
     696            1 :             &ctx,
     697              :         );
     698            1 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     699              : 
     700            1 :         is_send(merge_iter);
     701            1 :     }
     702              : 
     703              :     #[cfg(feature = "testing")]
     704            1 :     fn is_send(_: impl Send) {}
     705              : }
        

Generated by: LCOV version 2.1-beta