LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - merge_iterator.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 93.8 % 516 484
Test Date: 2025-05-01 22:50:11 Functions: 88.9 % 54 48

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, binary_heap};
       3              : use std::sync::Arc;
       4              : 
       5              : use anyhow::bail;
       6              : use pageserver_api::key::Key;
       7              : use pageserver_api::value::Value;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
      11              : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
      12              : use super::{PersistentLayerDesc, PersistentLayerKey};
      13              : use crate::context::RequestContext;
      14              : 
      15              : #[derive(Clone, Copy)]
      16              : pub(crate) enum LayerRef<'a> {
      17              :     Image(&'a ImageLayerInner),
      18              :     Delta(&'a DeltaLayerInner),
      19              : }
      20              : 
      21              : impl<'a> LayerRef<'a> {
      22              :     #[allow(dead_code)]
      23            0 :     fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
      24            0 :         match self {
      25            0 :             Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
      26            0 :             Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
      27              :         }
      28            0 :     }
      29              : 
      30         3540 :     fn iter_with_options(
      31         3540 :         self,
      32         3540 :         ctx: &'a RequestContext,
      33         3540 :         max_read_size: u64,
      34         3540 :         max_batch_size: usize,
      35         3540 :     ) -> LayerIterRef<'a> {
      36         3540 :         match self {
      37          420 :             Self::Image(x) => {
      38          420 :                 LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
      39              :             }
      40         3120 :             Self::Delta(x) => {
      41         3120 :                 LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
      42              :             }
      43              :         }
      44         3540 :     }
      45              : 
      46            0 :     fn layer_dbg_info(&self) -> String {
      47            0 :         match self {
      48            0 :             Self::Image(x) => x.layer_dbg_info(),
      49            0 :             Self::Delta(x) => x.layer_dbg_info(),
      50              :         }
      51            0 :     }
      52              : }
      53              : 
      54              : enum LayerIterRef<'a> {
      55              :     Image(ImageLayerIterator<'a>),
      56              :     Delta(DeltaLayerIterator<'a>),
      57              : }
      58              : 
      59              : impl LayerIterRef<'_> {
      60     12435684 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
      61     12435684 :         match self {
      62     12430884 :             Self::Delta(x) => x.next().await,
      63         4800 :             Self::Image(x) => x.next().await,
      64              :         }
      65     12435684 :     }
      66              : 
      67            0 :     fn layer_dbg_info(&self) -> String {
      68            0 :         match self {
      69            0 :             Self::Image(x) => x.layer_dbg_info(),
      70            0 :             Self::Delta(x) => x.layer_dbg_info(),
      71              :         }
      72            0 :     }
      73              : }
      74              : 
      75              : /// This type plays several roles at once
      76              : /// 1. Unified iterator for image and delta layers.
      77              : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
      78              : /// 3. Lazy creation of the real delta/image iterator.
      79              : #[allow(clippy::large_enum_variant, reason = "TODO")]
      80              : pub(crate) enum IteratorWrapper<'a> {
      81              :     NotLoaded {
      82              :         ctx: &'a RequestContext,
      83              :         first_key_lower_bound: (Key, Lsn),
      84              :         layer: LayerRef<'a>,
      85              :         source_desc: Arc<PersistentLayerKey>,
      86              :         max_read_size: u64,
      87              :         max_batch_size: usize,
      88              :     },
      89              :     Loaded {
      90              :         iter: PeekableLayerIterRef<'a>,
      91              :         source_desc: Arc<PersistentLayerKey>,
      92              :     },
      93              : }
      94              : 
      95              : pub(crate) struct PeekableLayerIterRef<'a> {
      96              :     iter: LayerIterRef<'a>,
      97              :     peeked: Option<(Key, Lsn, Value)>, // None == end
      98              : }
      99              : 
     100              : impl<'a> PeekableLayerIterRef<'a> {
     101         3540 :     async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
     102         3540 :         let peeked = iter.next().await?;
     103         3540 :         Ok(Self { iter, peeked })
     104         3540 :     }
     105              : 
     106     50708758 :     fn peek(&self) -> &Option<(Key, Lsn, Value)> {
     107     50708758 :         &self.peeked
     108     50708758 :     }
     109              : 
     110     12432144 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     111     12432144 :         let result = self.peeked.take();
     112     12432144 :         self.peeked = self.iter.next().await?;
     113     12432144 :         if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
     114     12425136 :             if (k1, l1) < (k2, l2) {
     115            0 :                 bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
     116     12425136 :             }
     117         7008 :         }
     118     12432144 :         Ok(result)
     119     12432144 :     }
     120              : }
     121              : 
     122              : impl std::cmp::PartialEq for IteratorWrapper<'_> {
     123            0 :     fn eq(&self, other: &Self) -> bool {
     124            0 :         self.cmp(other) == Ordering::Equal
     125            0 :     }
     126              : }
     127              : 
     128              : impl std::cmp::Eq for IteratorWrapper<'_> {}
     129              : 
     130              : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
     131     25379951 :     fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
     132     25379951 :         Some(self.cmp(other))
     133     25379951 :     }
     134              : }
     135              : 
     136              : impl std::cmp::Ord for IteratorWrapper<'_> {
     137     25379951 :     fn cmp(&self, other: &Self) -> std::cmp::Ordering {
     138              :         use std::cmp::Ordering;
     139     25379951 :         let a = self.peek_next_key_lsn_value();
     140     25379951 :         let b = other.peek_next_key_lsn_value();
     141     25379951 :         match (a, b) {
     142     18131072 :             (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
     143     36262144 :                 fn map_value_to_num(val: &Option<&Value>) -> usize {
     144     36208468 :                     match val {
     145        53676 :                         None => 0,
     146     36205372 :                         Some(Value::Image(_)) => 1,
     147         3096 :                         Some(Value::WalRecord(_)) => 2,
     148              :                     }
     149     36262144 :                 }
     150     18131072 :                 let order_1 = map_value_to_num(&v1);
     151     18131072 :                 let order_2 = map_value_to_num(&v2);
     152     18131072 :                 // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
     153     18131072 :                 // And note that we do a reverse at the end of the comparison, so it works with the max heap.
     154     18131072 :                 (k1, l1, order_1).cmp(&(k2, l2, order_2))
     155              :             }
     156      6017731 :             (Some(_), None) => Ordering::Less,
     157         8426 :             (None, Some(_)) => Ordering::Greater,
     158      1222722 :             (None, None) => Ordering::Equal,
     159              :         }
     160     25379951 :         .reverse()
     161     25379951 :     }
     162              : }
     163              : 
     164              : impl<'a> IteratorWrapper<'a> {
     165          420 :     pub fn create_from_image_layer(
     166          420 :         image_layer: &'a ImageLayerInner,
     167          420 :         ctx: &'a RequestContext,
     168          420 :         max_read_size: u64,
     169          420 :         max_batch_size: usize,
     170          420 :     ) -> Self {
     171          420 :         Self::NotLoaded {
     172          420 :             layer: LayerRef::Image(image_layer),
     173          420 :             first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
     174          420 :             ctx,
     175          420 :             source_desc: PersistentLayerKey {
     176          420 :                 key_range: image_layer.key_range().clone(),
     177          420 :                 lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
     178          420 :                 is_delta: false,
     179          420 :             }
     180          420 :             .into(),
     181          420 :             max_read_size,
     182          420 :             max_batch_size,
     183          420 :         }
     184          420 :     }
     185              : 
     186         3120 :     pub fn create_from_delta_layer(
     187         3120 :         delta_layer: &'a DeltaLayerInner,
     188         3120 :         ctx: &'a RequestContext,
     189         3120 :         max_read_size: u64,
     190         3120 :         max_batch_size: usize,
     191         3120 :     ) -> Self {
     192         3120 :         Self::NotLoaded {
     193         3120 :             layer: LayerRef::Delta(delta_layer),
     194         3120 :             first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
     195         3120 :             ctx,
     196         3120 :             source_desc: PersistentLayerKey {
     197         3120 :                 key_range: delta_layer.key_range().clone(),
     198         3120 :                 lsn_range: delta_layer.lsn_range().clone(),
     199         3120 :                 is_delta: true,
     200         3120 :             }
     201         3120 :             .into(),
     202         3120 :             max_read_size,
     203         3120 :             max_batch_size,
     204         3120 :         }
     205         3120 :     }
     206              : 
     207     50759902 :     fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
     208     50759902 :         match self {
     209     50705218 :             Self::Loaded { iter, .. } => iter
     210     50705218 :                 .peek()
     211     50705218 :                 .as_ref()
     212     50705218 :                 .map(|(key, lsn, val)| (key, *lsn, Some(val))),
     213        54684 :             Self::NotLoaded {
     214        54684 :                 first_key_lower_bound: (key, lsn),
     215        54684 :                 ..
     216        54684 :             } => Some((key, *lsn, None)),
     217              :         }
     218     50759902 :     }
     219              : 
     220              :     // CORRECTNESS: this function must always take `&mut self`, never `&self`.
     221              :     //
     222              :     // The reason is that `impl Ord for Self` evaluates differently after this function
     223              :     // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
     224              :     // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
     225              :     // and not just `PeekMut::deref`
     226              :     // If we don't take `&mut self`
     227         3540 :     async fn load(&mut self) -> anyhow::Result<()> {
     228         3540 :         assert!(!self.is_loaded());
     229         3540 :         let Self::NotLoaded {
     230         3540 :             ctx,
     231         3540 :             first_key_lower_bound,
     232         3540 :             layer,
     233         3540 :             source_desc,
     234         3540 :             max_read_size,
     235         3540 :             max_batch_size,
     236         3540 :         } = self
     237              :         else {
     238            0 :             unreachable!()
     239              :         };
     240         3540 :         let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
     241         3540 :         let iter = PeekableLayerIterRef::create(iter).await?;
     242         3540 :         if let Some((k1, l1, _)) = iter.peek() {
     243         3540 :             let (k2, l2) = first_key_lower_bound;
     244         3540 :             if (k1, l1) < (k2, l2) {
     245            0 :                 bail!(
     246            0 :                     "layer key range did not include the first key in the layer: {}",
     247            0 :                     layer.layer_dbg_info()
     248            0 :                 );
     249         3540 :             }
     250            0 :         }
     251         3540 :         *self = Self::Loaded {
     252         3540 :             iter,
     253         3540 :             source_desc: source_desc.clone(),
     254         3540 :         };
     255         3540 :         Ok(())
     256         3540 :     }
     257              : 
     258     12439224 :     fn is_loaded(&self) -> bool {
     259     12439224 :         matches!(self, Self::Loaded { .. })
     260     12439224 :     }
     261              : 
     262              :     /// Correctness: must load the iterator before using.
     263              :     ///
     264              :     /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
     265              :     /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
     266              :     /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     267     12432144 :     async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     268     12432144 :         let Self::Loaded { iter, .. } = self else {
     269            0 :             panic!("must load the iterator before using")
     270              :         };
     271     12432144 :         iter.next().await
     272     12432144 :     }
     273              : 
     274              :     /// Get the persistent layer key corresponding to this iterator
     275         5640 :     fn trace_source(&self) -> Arc<PersistentLayerKey> {
     276         5640 :         match self {
     277         5640 :             Self::Loaded { source_desc, .. } => source_desc.clone(),
     278            0 :             Self::NotLoaded { source_desc, .. } => source_desc.clone(),
     279              :         }
     280         5640 :     }
     281              : }
     282              : 
     283              : /// A merge iterator over delta/image layer iterators.
     284              : ///
     285              : /// When duplicated records are found, the iterator will not perform any
     286              : /// deduplication, and the caller should handle these situation. By saying
     287              : /// duplicated records, there are many possibilities:
     288              : ///
     289              : /// * Two same delta at the same LSN.
     290              : /// * Two same image at the same LSN.
     291              : /// * Delta/image at the same LSN where the image has already applied the delta.
     292              : ///
     293              : /// The iterator will always put the image before the delta.
     294              : pub struct MergeIterator<'a> {
     295              :     heap: BinaryHeap<IteratorWrapper<'a>>,
     296              : }
     297              : 
     298              : pub(crate) trait MergeIteratorItem {
     299              :     fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
     300              : 
     301              :     fn key_lsn_value(&self) -> &(Key, Lsn, Value);
     302              : }
     303              : 
     304              : impl MergeIteratorItem for (Key, Lsn, Value) {
     305     12423012 :     fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
     306     12423012 :         item
     307     12423012 :     }
     308              : 
     309         4740 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     310         4740 :         self
     311         4740 :     }
     312              : }
     313              : 
     314              : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
     315         5640 :     fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
     316         5640 :         (item, iter.trace_source().clone())
     317         5640 :     }
     318              : 
     319        11880 :     fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
     320        11880 :         &self.0
     321        11880 :     }
     322              : }
     323              : 
     324              : impl<'a> MergeIterator<'a> {
     325          564 :     pub fn create_with_options(
     326          564 :         deltas: &[&'a DeltaLayerInner],
     327          564 :         images: &[&'a ImageLayerInner],
     328          564 :         ctx: &'a RequestContext,
     329          564 :         max_read_size: u64,
     330          564 :         max_batch_size: usize,
     331          564 :     ) -> Self {
     332          564 :         let mut heap = Vec::with_capacity(images.len() + deltas.len());
     333          984 :         for image in images {
     334          420 :             heap.push(IteratorWrapper::create_from_image_layer(
     335          420 :                 image,
     336          420 :                 ctx,
     337          420 :                 max_read_size,
     338          420 :                 max_batch_size,
     339          420 :             ));
     340          420 :         }
     341         3684 :         for delta in deltas {
     342         3120 :             heap.push(IteratorWrapper::create_from_delta_layer(
     343         3120 :                 delta,
     344         3120 :                 ctx,
     345         3120 :                 max_read_size,
     346         3120 :                 max_batch_size,
     347         3120 :             ));
     348         3120 :         }
     349          564 :         Self {
     350          564 :             heap: BinaryHeap::from(heap),
     351          564 :         }
     352          564 :     }
     353              : 
     354          240 :     pub fn create(
     355          240 :         deltas: &[&'a DeltaLayerInner],
     356          240 :         images: &[&'a ImageLayerInner],
     357          240 :         ctx: &'a RequestContext,
     358          240 :     ) -> Self {
     359          240 :         Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
     360          240 :     }
     361              : 
     362     12429192 :     pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
     363     12436224 :         while let Some(mut iter) = self.heap.peek_mut() {
     364     12435684 :             if !iter.is_loaded() {
     365              :                 // Once we load the iterator, we can know the real first key-value pair in the iterator.
     366              :                 // We put it back into the heap so that a potentially unloaded layer may have a key between
     367              :                 // [potential_first_key, loaded_first_key).
     368         3540 :                 iter.load().await?;
     369         3540 :                 continue;
     370     12432144 :             }
     371     12432144 :             let Some(item) = iter.next().await? else {
     372              :                 // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
     373              :                 // we order None > Some, and all the rest of the iterators should return None.
     374         3492 :                 binary_heap::PeekMut::pop(iter);
     375         3492 :                 continue;
     376              :             };
     377     12428652 :             return Ok(Some(R::new(item, &iter)));
     378              :         }
     379          540 :         Ok(None)
     380     12429192 :     }
     381              : 
     382              :     /// Get the next key-value pair from the iterator.
     383     12420876 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
     384     12420876 :         self.next_inner().await
     385     12420876 :     }
     386              : 
     387              :     /// Get the next key-value pair from the iterator, and trace where the key comes from.
     388            0 :     pub async fn next_with_trace(
     389            0 :         &mut self,
     390            0 :     ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
     391            0 :         self.next_inner().await
     392            0 :     }
     393              : }
     394              : 
     395              : #[cfg(test)]
     396              : mod tests {
     397              :     use itertools::Itertools;
     398              :     use pageserver_api::key::Key;
     399              :     #[cfg(feature = "testing")]
     400              :     use pageserver_api::record::NeonWalRecord;
     401              :     use utils::lsn::Lsn;
     402              : 
     403              :     use super::*;
     404              :     use crate::DEFAULT_PG_VERSION;
     405              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
     406              :     #[cfg(feature = "testing")]
     407              :     use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
     408              :     use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
     409              : 
     410           48 :     async fn assert_merge_iter_equal(
     411           48 :         merge_iter: &mut MergeIterator<'_>,
     412           48 :         expect: &[(Key, Lsn, Value)],
     413           48 :     ) {
     414           48 :         let mut expect_iter = expect.iter();
     415              :         loop {
     416        36480 :             let o1 = merge_iter.next().await.unwrap();
     417        36480 :             let o2 = expect_iter.next();
     418        36480 :             assert_eq!(o1.is_some(), o2.is_some());
     419        36480 :             if o1.is_none() && o2.is_none() {
     420           48 :                 break;
     421        36432 :             }
     422        36432 :             let (k1, l1, v1) = o1.unwrap();
     423        36432 :             let (k2, l2, v2) = o2.unwrap();
     424        36432 :             assert_eq!(&k1, k2);
     425        36432 :             assert_eq!(l1, *l2);
     426        36432 :             assert_eq!(&v1, v2);
     427              :         }
     428           48 :     }
     429              : 
     430              :     #[tokio::test]
     431           12 :     async fn merge_in_between() {
     432           12 :         use bytes::Bytes;
     433           12 :         use pageserver_api::value::Value;
     434           12 : 
     435           12 :         let harness = TenantHarness::create("merge_iterator_merge_in_between")
     436           12 :             .await
     437           12 :             .unwrap();
     438           12 :         let (tenant, ctx) = harness.load().await;
     439           12 : 
     440           12 :         let tline = tenant
     441           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     442           12 :             .await
     443           12 :             .unwrap();
     444           12 : 
     445           48 :         fn get_key(id: u32) -> Key {
     446           48 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     447           48 :             key.field6 = id;
     448           48 :             key
     449           48 :         }
     450           12 :         let test_deltas1 = vec![
     451           12 :             (
     452           12 :                 get_key(0),
     453           12 :                 Lsn(0x10),
     454           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     455           12 :             ),
     456           12 :             (
     457           12 :                 get_key(5),
     458           12 :                 Lsn(0x10),
     459           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     460           12 :             ),
     461           12 :         ];
     462           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     463           12 :             .await
     464           12 :             .unwrap();
     465           12 :         let test_deltas2 = vec![
     466           12 :             (
     467           12 :                 get_key(3),
     468           12 :                 Lsn(0x10),
     469           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     470           12 :             ),
     471           12 :             (
     472           12 :                 get_key(4),
     473           12 :                 Lsn(0x10),
     474           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     475           12 :             ),
     476           12 :         ];
     477           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     478           12 :             .await
     479           12 :             .unwrap();
     480           12 :         let mut merge_iter = MergeIterator::create(
     481           12 :             &[
     482           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     483           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     484           12 :             ],
     485           12 :             &[],
     486           12 :             &ctx,
     487           12 :         );
     488           12 :         let mut expect = Vec::new();
     489           12 :         expect.extend(test_deltas1);
     490           12 :         expect.extend(test_deltas2);
     491           12 :         expect.sort_by(sort_delta);
     492           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     493           12 :     }
     494              : 
     495              :     #[tokio::test]
     496           12 :     async fn delta_merge() {
     497           12 :         use bytes::Bytes;
     498           12 :         use pageserver_api::value::Value;
     499           12 : 
     500           12 :         let harness = TenantHarness::create("merge_iterator_delta_merge")
     501           12 :             .await
     502           12 :             .unwrap();
     503           12 :         let (tenant, ctx) = harness.load().await;
     504           12 : 
     505           12 :         let tline = tenant
     506           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     507           12 :             .await
     508           12 :             .unwrap();
     509           12 : 
     510        36000 :         fn get_key(id: u32) -> Key {
     511        36000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     512        36000 :             key.field6 = id;
     513        36000 :             key
     514        36000 :         }
     515           12 :         const N: usize = 1000;
     516           12 :         let test_deltas1 = (0..N)
     517        12000 :             .map(|idx| {
     518        12000 :                 (
     519        12000 :                     get_key(idx as u32 / 10),
     520        12000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1)),
     521        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     522        12000 :                 )
     523        12000 :             })
     524           12 :             .collect_vec();
     525           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     526           12 :             .await
     527           12 :             .unwrap();
     528           12 :         let test_deltas2 = (0..N)
     529        12000 :             .map(|idx| {
     530        12000 :                 (
     531        12000 :                     get_key(idx as u32 / 10),
     532        12000 :                     Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
     533        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     534        12000 :                 )
     535        12000 :             })
     536           12 :             .collect_vec();
     537           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     538           12 :             .await
     539           12 :             .unwrap();
     540           12 :         let test_deltas3 = (0..N)
     541        12000 :             .map(|idx| {
     542        12000 :                 (
     543        12000 :                     get_key(idx as u32 / 10 + N as u32),
     544        12000 :                     Lsn(0x10 * ((idx as u64) % 10 + 1)),
     545        12000 :                     Value::Image(Bytes::from(format!("img{idx:05}"))),
     546        12000 :                 )
     547        12000 :             })
     548           12 :             .collect_vec();
     549           12 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     550           12 :             .await
     551           12 :             .unwrap();
     552           12 :         let mut merge_iter = MergeIterator::create(
     553           12 :             &[
     554           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     555           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     556           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     557           12 :             ],
     558           12 :             &[],
     559           12 :             &ctx,
     560           12 :         );
     561           12 :         let mut expect = Vec::new();
     562           12 :         expect.extend(test_deltas1);
     563           12 :         expect.extend(test_deltas2);
     564           12 :         expect.extend(test_deltas3);
     565           12 :         expect.sort_by(sort_delta);
     566           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     567           12 : 
     568           12 :         // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
     569           12 :     }
     570              : 
     571              :     #[cfg(feature = "testing")]
     572              :     #[tokio::test]
     573           12 :     async fn delta_image_mixed_merge() {
     574           12 :         use bytes::Bytes;
     575           12 :         use pageserver_api::value::Value;
     576           12 : 
     577           12 :         let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
     578           12 :             .await
     579           12 :             .unwrap();
     580           12 :         let (tenant, ctx) = harness.load().await;
     581           12 : 
     582           12 :         let tline = tenant
     583           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
     584           12 :             .await
     585           12 :             .unwrap();
     586           12 : 
     587          108 :         fn get_key(id: u32) -> Key {
     588          108 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
     589          108 :             key.field6 = id;
     590          108 :             key
     591          108 :         }
     592           12 :         // In this test case, we want to test if the iterator still works correctly with multiple copies
     593           12 :         // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
     594           12 :         // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
     595           12 :         // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
     596           12 :         // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
     597           12 :         // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
     598           12 :         // correctly process these situations and return everything as-is, and the upper layer of the system
     599           12 :         // will handle duplicated LSNs.
     600           12 :         let test_deltas1 = vec![
     601           12 :             (
     602           12 :                 get_key(0),
     603           12 :                 Lsn(0x10),
     604           12 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     605           12 :             ),
     606           12 :             (
     607           12 :                 get_key(0),
     608           12 :                 Lsn(0x18),
     609           12 :                 Value::WalRecord(NeonWalRecord::wal_append("a")),
     610           12 :             ),
     611           12 :             (
     612           12 :                 get_key(5),
     613           12 :                 Lsn(0x10),
     614           12 :                 Value::WalRecord(NeonWalRecord::wal_init("")),
     615           12 :             ),
     616           12 :             (
     617           12 :                 get_key(5),
     618           12 :                 Lsn(0x18),
     619           12 :                 Value::WalRecord(NeonWalRecord::wal_append("b")),
     620           12 :             ),
     621           12 :         ];
     622           12 :         let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
     623           12 :             .await
     624           12 :             .unwrap();
     625           12 :         let mut test_deltas2 = test_deltas1.clone();
     626           12 :         test_deltas2.push((
     627           12 :             get_key(10),
     628           12 :             Lsn(0x20),
     629           12 :             Value::Image(Bytes::copy_from_slice(b"test")),
     630           12 :         ));
     631           12 :         let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
     632           12 :             .await
     633           12 :             .unwrap();
     634           12 :         let test_deltas3 = vec![
     635           12 :             (
     636           12 :                 get_key(0),
     637           12 :                 Lsn(0x10),
     638           12 :                 Value::Image(Bytes::copy_from_slice(b"")),
     639           12 :             ),
     640           12 :             (
     641           12 :                 get_key(5),
     642           12 :                 Lsn(0x18),
     643           12 :                 Value::Image(Bytes::copy_from_slice(b"b")),
     644           12 :             ),
     645           12 :             (
     646           12 :                 get_key(15),
     647           12 :                 Lsn(0x20),
     648           12 :                 Value::Image(Bytes::copy_from_slice(b"test")),
     649           12 :             ),
     650           12 :         ];
     651           12 :         let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
     652           12 :             .await
     653           12 :             .unwrap();
     654           12 :         let mut test_deltas4 = test_deltas3.clone();
     655           12 :         test_deltas4.push((
     656           12 :             get_key(20),
     657           12 :             Lsn(0x20),
     658           12 :             Value::Image(Bytes::copy_from_slice(b"test")),
     659           12 :         ));
     660           12 :         let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
     661           12 :             .await
     662           12 :             .unwrap();
     663           12 :         let mut expect = Vec::new();
     664           12 :         expect.extend(test_deltas1);
     665           12 :         expect.extend(test_deltas2);
     666           12 :         expect.extend(test_deltas3);
     667           12 :         expect.extend(test_deltas4);
     668           12 :         expect.sort_by(sort_delta_value);
     669           12 : 
     670           12 :         // Test with different layer order for MergeIterator::create to ensure the order
     671           12 :         // is stable.
     672           12 : 
     673           12 :         let mut merge_iter = MergeIterator::create(
     674           12 :             &[
     675           12 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     676           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     677           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     678           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     679           12 :             ],
     680           12 :             &[],
     681           12 :             &ctx,
     682           12 :         );
     683           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     684           12 : 
     685           12 :         let mut merge_iter = MergeIterator::create(
     686           12 :             &[
     687           12 :                 resident_layer_1.get_as_delta(&ctx).await.unwrap(),
     688           12 :                 resident_layer_4.get_as_delta(&ctx).await.unwrap(),
     689           12 :                 resident_layer_3.get_as_delta(&ctx).await.unwrap(),
     690           12 :                 resident_layer_2.get_as_delta(&ctx).await.unwrap(),
     691           12 :             ],
     692           12 :             &[],
     693           12 :             &ctx,
     694           12 :         );
     695           12 :         assert_merge_iter_equal(&mut merge_iter, &expect).await;
     696           12 : 
     697           12 :         is_send(merge_iter);
     698           12 :     }
     699              : 
     700              :     #[cfg(feature = "testing")]
     701           12 :     fn is_send(_: impl Send) {}
     702              : }
        

Generated by: LCOV version 2.1-beta