Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::value::Value;
8 : use utils::lsn::Lsn;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 3540 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
23 3540 : match self {
24 420 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
25 3120 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
26 : }
27 3540 : }
28 :
29 0 : fn layer_dbg_info(&self) -> String {
30 0 : match self {
31 0 : Self::Image(x) => x.layer_dbg_info(),
32 0 : Self::Delta(x) => x.layer_dbg_info(),
33 : }
34 0 : }
35 : }
36 :
37 : enum LayerIterRef<'a> {
38 : Image(ImageLayerIterator<'a>),
39 : Delta(DeltaLayerIterator<'a>),
40 : }
41 :
42 : impl LayerIterRef<'_> {
43 12435684 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
44 12435684 : match self {
45 12430884 : Self::Delta(x) => x.next().await,
46 4800 : Self::Image(x) => x.next().await,
47 : }
48 12435684 : }
49 :
50 0 : fn layer_dbg_info(&self) -> String {
51 0 : match self {
52 0 : Self::Image(x) => x.layer_dbg_info(),
53 0 : Self::Delta(x) => x.layer_dbg_info(),
54 : }
55 0 : }
56 : }
57 :
58 : /// This type plays several roles at once
59 : /// 1. Unified iterator for image and delta layers.
60 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
61 : /// 3. Lazy creation of the real delta/image iterator.
62 : #[allow(clippy::large_enum_variant, reason = "TODO")]
63 : pub(crate) enum IteratorWrapper<'a> {
64 : NotLoaded {
65 : ctx: &'a RequestContext,
66 : first_key_lower_bound: (Key, Lsn),
67 : layer: LayerRef<'a>,
68 : source_desc: Arc<PersistentLayerKey>,
69 : },
70 : Loaded {
71 : iter: PeekableLayerIterRef<'a>,
72 : source_desc: Arc<PersistentLayerKey>,
73 : },
74 : }
75 :
76 : pub(crate) struct PeekableLayerIterRef<'a> {
77 : iter: LayerIterRef<'a>,
78 : peeked: Option<(Key, Lsn, Value)>, // None == end
79 : }
80 :
81 : impl<'a> PeekableLayerIterRef<'a> {
82 3540 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
83 3540 : let peeked = iter.next().await?;
84 3540 : Ok(Self { iter, peeked })
85 3540 : }
86 :
87 50708942 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
88 50708942 : &self.peeked
89 50708942 : }
90 :
91 12432144 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
92 12432144 : let result = self.peeked.take();
93 12432144 : self.peeked = self.iter.next().await?;
94 12432144 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
95 12425136 : if (k1, l1) < (k2, l2) {
96 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
97 12425136 : }
98 7008 : }
99 12432144 : Ok(result)
100 12432144 : }
101 : }
102 :
103 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
104 0 : fn eq(&self, other: &Self) -> bool {
105 0 : self.cmp(other) == Ordering::Equal
106 0 : }
107 : }
108 :
109 : impl std::cmp::Eq for IteratorWrapper<'_> {}
110 :
111 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
112 25380043 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
113 25380043 : Some(self.cmp(other))
114 25380043 : }
115 : }
116 :
117 : impl std::cmp::Ord for IteratorWrapper<'_> {
118 25380043 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
119 : use std::cmp::Ordering;
120 25380043 : let a = self.peek_next_key_lsn_value();
121 25380043 : let b = other.peek_next_key_lsn_value();
122 25380043 : match (a, b) {
123 18131185 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
124 36262370 : fn map_value_to_num(val: &Option<&Value>) -> usize {
125 36208694 : match val {
126 53676 : None => 0,
127 36205598 : Some(Value::Image(_)) => 1,
128 3096 : Some(Value::WalRecord(_)) => 2,
129 : }
130 36262370 : }
131 18131185 : let order_1 = map_value_to_num(&v1);
132 18131185 : let order_2 = map_value_to_num(&v2);
133 18131185 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
134 18131185 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
135 18131185 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
136 : }
137 6017729 : (Some(_), None) => Ordering::Less,
138 8415 : (None, Some(_)) => Ordering::Greater,
139 1222714 : (None, None) => Ordering::Equal,
140 : }
141 25380043 : .reverse()
142 25380043 : }
143 : }
144 :
145 : impl<'a> IteratorWrapper<'a> {
146 420 : pub fn create_from_image_layer(
147 420 : image_layer: &'a ImageLayerInner,
148 420 : ctx: &'a RequestContext,
149 420 : ) -> Self {
150 420 : Self::NotLoaded {
151 420 : layer: LayerRef::Image(image_layer),
152 420 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
153 420 : ctx,
154 420 : source_desc: PersistentLayerKey {
155 420 : key_range: image_layer.key_range().clone(),
156 420 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
157 420 : is_delta: false,
158 420 : }
159 420 : .into(),
160 420 : }
161 420 : }
162 :
163 3120 : pub fn create_from_delta_layer(
164 3120 : delta_layer: &'a DeltaLayerInner,
165 3120 : ctx: &'a RequestContext,
166 3120 : ) -> Self {
167 3120 : Self::NotLoaded {
168 3120 : layer: LayerRef::Delta(delta_layer),
169 3120 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
170 3120 : ctx,
171 3120 : source_desc: PersistentLayerKey {
172 3120 : key_range: delta_layer.key_range().clone(),
173 3120 : lsn_range: delta_layer.lsn_range().clone(),
174 3120 : is_delta: true,
175 3120 : }
176 3120 : .into(),
177 3120 : }
178 3120 : }
179 :
180 50760086 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
181 50760086 : match self {
182 50705402 : Self::Loaded { iter, .. } => iter
183 50705402 : .peek()
184 50705402 : .as_ref()
185 50705402 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
186 54684 : Self::NotLoaded {
187 54684 : first_key_lower_bound: (key, lsn),
188 54684 : ..
189 54684 : } => Some((key, *lsn, None)),
190 : }
191 50760086 : }
192 :
193 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
194 : //
195 : // The reason is that `impl Ord for Self` evaluates differently after this function
196 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
197 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
198 : // and not just `PeekMut::deref`
199 : // If we don't take `&mut self`
200 3540 : async fn load(&mut self) -> anyhow::Result<()> {
201 3540 : assert!(!self.is_loaded());
202 3540 : let Self::NotLoaded {
203 3540 : ctx,
204 3540 : first_key_lower_bound,
205 3540 : layer,
206 3540 : source_desc,
207 3540 : } = self
208 : else {
209 0 : unreachable!()
210 : };
211 3540 : let iter = layer.iter(ctx);
212 3540 : let iter = PeekableLayerIterRef::create(iter).await?;
213 3540 : if let Some((k1, l1, _)) = iter.peek() {
214 3540 : let (k2, l2) = first_key_lower_bound;
215 3540 : if (k1, l1) < (k2, l2) {
216 0 : bail!(
217 0 : "layer key range did not include the first key in the layer: {}",
218 0 : layer.layer_dbg_info()
219 0 : );
220 3540 : }
221 0 : }
222 3540 : *self = Self::Loaded {
223 3540 : iter,
224 3540 : source_desc: source_desc.clone(),
225 3540 : };
226 3540 : Ok(())
227 3540 : }
228 :
229 12439224 : fn is_loaded(&self) -> bool {
230 12439224 : matches!(self, Self::Loaded { .. })
231 12439224 : }
232 :
233 : /// Correctness: must load the iterator before using.
234 : ///
235 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
236 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
237 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
238 12432144 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
239 12432144 : let Self::Loaded { iter, .. } = self else {
240 0 : panic!("must load the iterator before using")
241 : };
242 12432144 : iter.next().await
243 12432144 : }
244 :
245 : /// Get the persistent layer key corresponding to this iterator
246 5640 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
247 5640 : match self {
248 5640 : Self::Loaded { source_desc, .. } => source_desc.clone(),
249 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
250 : }
251 5640 : }
252 : }
253 :
254 : /// A merge iterator over delta/image layer iterators.
255 : ///
256 : /// When duplicated records are found, the iterator will not perform any
257 : /// deduplication, and the caller should handle these situation. By saying
258 : /// duplicated records, there are many possibilities:
259 : ///
260 : /// * Two same delta at the same LSN.
261 : /// * Two same image at the same LSN.
262 : /// * Delta/image at the same LSN where the image has already applied the delta.
263 : ///
264 : /// The iterator will always put the image before the delta.
265 : pub struct MergeIterator<'a> {
266 : heap: BinaryHeap<IteratorWrapper<'a>>,
267 : }
268 :
269 : pub(crate) trait MergeIteratorItem {
270 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
271 :
272 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
273 : }
274 :
275 : impl MergeIteratorItem for (Key, Lsn, Value) {
276 12423012 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
277 12423012 : item
278 12423012 : }
279 :
280 4740 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
281 4740 : self
282 4740 : }
283 : }
284 :
285 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
286 5640 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
287 5640 : (item, iter.trace_source().clone())
288 5640 : }
289 :
290 11880 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
291 11880 : &self.0
292 11880 : }
293 : }
294 :
295 : impl<'a> MergeIterator<'a> {
296 564 : pub fn create(
297 564 : deltas: &[&'a DeltaLayerInner],
298 564 : images: &[&'a ImageLayerInner],
299 564 : ctx: &'a RequestContext,
300 564 : ) -> Self {
301 564 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
302 984 : for image in images {
303 420 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
304 420 : }
305 3684 : for delta in deltas {
306 3120 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
307 3120 : }
308 564 : Self {
309 564 : heap: BinaryHeap::from(heap),
310 564 : }
311 564 : }
312 :
313 12429192 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
314 12436224 : while let Some(mut iter) = self.heap.peek_mut() {
315 12435684 : if !iter.is_loaded() {
316 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
317 : // We put it back into the heap so that a potentially unloaded layer may have a key between
318 : // [potential_first_key, loaded_first_key).
319 3540 : iter.load().await?;
320 3540 : continue;
321 12432144 : }
322 12432144 : let Some(item) = iter.next().await? else {
323 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
324 : // we order None > Some, and all the rest of the iterators should return None.
325 3492 : binary_heap::PeekMut::pop(iter);
326 3492 : continue;
327 : };
328 12428652 : return Ok(Some(R::new(item, &iter)));
329 : }
330 540 : Ok(None)
331 12429192 : }
332 :
333 : /// Get the next key-value pair from the iterator.
334 12420876 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
335 12420876 : self.next_inner().await
336 12420876 : }
337 :
338 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
339 0 : pub async fn next_with_trace(
340 0 : &mut self,
341 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
342 0 : self.next_inner().await
343 0 : }
344 : }
345 :
346 : #[cfg(test)]
347 : mod tests {
348 : use itertools::Itertools;
349 : use pageserver_api::key::Key;
350 : #[cfg(feature = "testing")]
351 : use pageserver_api::record::NeonWalRecord;
352 : use utils::lsn::Lsn;
353 :
354 : use super::*;
355 : use crate::DEFAULT_PG_VERSION;
356 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
357 : #[cfg(feature = "testing")]
358 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
359 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
360 :
361 48 : async fn assert_merge_iter_equal(
362 48 : merge_iter: &mut MergeIterator<'_>,
363 48 : expect: &[(Key, Lsn, Value)],
364 48 : ) {
365 48 : let mut expect_iter = expect.iter();
366 : loop {
367 36480 : let o1 = merge_iter.next().await.unwrap();
368 36480 : let o2 = expect_iter.next();
369 36480 : assert_eq!(o1.is_some(), o2.is_some());
370 36480 : if o1.is_none() && o2.is_none() {
371 48 : break;
372 36432 : }
373 36432 : let (k1, l1, v1) = o1.unwrap();
374 36432 : let (k2, l2, v2) = o2.unwrap();
375 36432 : assert_eq!(&k1, k2);
376 36432 : assert_eq!(l1, *l2);
377 36432 : assert_eq!(&v1, v2);
378 : }
379 48 : }
380 :
381 : #[tokio::test]
382 12 : async fn merge_in_between() {
383 12 : use bytes::Bytes;
384 12 : use pageserver_api::value::Value;
385 12 :
386 12 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
387 12 : .await
388 12 : .unwrap();
389 12 : let (tenant, ctx) = harness.load().await;
390 12 :
391 12 : let tline = tenant
392 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
393 12 : .await
394 12 : .unwrap();
395 12 :
396 48 : fn get_key(id: u32) -> Key {
397 48 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
398 48 : key.field6 = id;
399 48 : key
400 48 : }
401 12 : let test_deltas1 = vec![
402 12 : (
403 12 : get_key(0),
404 12 : Lsn(0x10),
405 12 : Value::Image(Bytes::copy_from_slice(b"test")),
406 12 : ),
407 12 : (
408 12 : get_key(5),
409 12 : Lsn(0x10),
410 12 : Value::Image(Bytes::copy_from_slice(b"test")),
411 12 : ),
412 12 : ];
413 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
414 12 : .await
415 12 : .unwrap();
416 12 : let test_deltas2 = vec![
417 12 : (
418 12 : get_key(3),
419 12 : Lsn(0x10),
420 12 : Value::Image(Bytes::copy_from_slice(b"test")),
421 12 : ),
422 12 : (
423 12 : get_key(4),
424 12 : Lsn(0x10),
425 12 : Value::Image(Bytes::copy_from_slice(b"test")),
426 12 : ),
427 12 : ];
428 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
429 12 : .await
430 12 : .unwrap();
431 12 : let mut merge_iter = MergeIterator::create(
432 12 : &[
433 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
434 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
435 12 : ],
436 12 : &[],
437 12 : &ctx,
438 12 : );
439 12 : let mut expect = Vec::new();
440 12 : expect.extend(test_deltas1);
441 12 : expect.extend(test_deltas2);
442 12 : expect.sort_by(sort_delta);
443 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
444 12 : }
445 :
446 : #[tokio::test]
447 12 : async fn delta_merge() {
448 12 : use bytes::Bytes;
449 12 : use pageserver_api::value::Value;
450 12 :
451 12 : let harness = TenantHarness::create("merge_iterator_delta_merge")
452 12 : .await
453 12 : .unwrap();
454 12 : let (tenant, ctx) = harness.load().await;
455 12 :
456 12 : let tline = tenant
457 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
458 12 : .await
459 12 : .unwrap();
460 12 :
461 36000 : fn get_key(id: u32) -> Key {
462 36000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
463 36000 : key.field6 = id;
464 36000 : key
465 36000 : }
466 12 : const N: usize = 1000;
467 12 : let test_deltas1 = (0..N)
468 12000 : .map(|idx| {
469 12000 : (
470 12000 : get_key(idx as u32 / 10),
471 12000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
472 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
473 12000 : )
474 12000 : })
475 12 : .collect_vec();
476 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
477 12 : .await
478 12 : .unwrap();
479 12 : let test_deltas2 = (0..N)
480 12000 : .map(|idx| {
481 12000 : (
482 12000 : get_key(idx as u32 / 10),
483 12000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
484 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
485 12000 : )
486 12000 : })
487 12 : .collect_vec();
488 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
489 12 : .await
490 12 : .unwrap();
491 12 : let test_deltas3 = (0..N)
492 12000 : .map(|idx| {
493 12000 : (
494 12000 : get_key(idx as u32 / 10 + N as u32),
495 12000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
496 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
497 12000 : )
498 12000 : })
499 12 : .collect_vec();
500 12 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
501 12 : .await
502 12 : .unwrap();
503 12 : let mut merge_iter = MergeIterator::create(
504 12 : &[
505 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
506 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
507 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
508 12 : ],
509 12 : &[],
510 12 : &ctx,
511 12 : );
512 12 : let mut expect = Vec::new();
513 12 : expect.extend(test_deltas1);
514 12 : expect.extend(test_deltas2);
515 12 : expect.extend(test_deltas3);
516 12 : expect.sort_by(sort_delta);
517 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
518 12 :
519 12 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
520 12 : }
521 :
522 : #[cfg(feature = "testing")]
523 : #[tokio::test]
524 12 : async fn delta_image_mixed_merge() {
525 12 : use bytes::Bytes;
526 12 : use pageserver_api::value::Value;
527 12 :
528 12 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
529 12 : .await
530 12 : .unwrap();
531 12 : let (tenant, ctx) = harness.load().await;
532 12 :
533 12 : let tline = tenant
534 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
535 12 : .await
536 12 : .unwrap();
537 12 :
538 108 : fn get_key(id: u32) -> Key {
539 108 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
540 108 : key.field6 = id;
541 108 : key
542 108 : }
543 12 : // In this test case, we want to test if the iterator still works correctly with multiple copies
544 12 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
545 12 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
546 12 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
547 12 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
548 12 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
549 12 : // correctly process these situations and return everything as-is, and the upper layer of the system
550 12 : // will handle duplicated LSNs.
551 12 : let test_deltas1 = vec![
552 12 : (
553 12 : get_key(0),
554 12 : Lsn(0x10),
555 12 : Value::WalRecord(NeonWalRecord::wal_init("")),
556 12 : ),
557 12 : (
558 12 : get_key(0),
559 12 : Lsn(0x18),
560 12 : Value::WalRecord(NeonWalRecord::wal_append("a")),
561 12 : ),
562 12 : (
563 12 : get_key(5),
564 12 : Lsn(0x10),
565 12 : Value::WalRecord(NeonWalRecord::wal_init("")),
566 12 : ),
567 12 : (
568 12 : get_key(5),
569 12 : Lsn(0x18),
570 12 : Value::WalRecord(NeonWalRecord::wal_append("b")),
571 12 : ),
572 12 : ];
573 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
574 12 : .await
575 12 : .unwrap();
576 12 : let mut test_deltas2 = test_deltas1.clone();
577 12 : test_deltas2.push((
578 12 : get_key(10),
579 12 : Lsn(0x20),
580 12 : Value::Image(Bytes::copy_from_slice(b"test")),
581 12 : ));
582 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
583 12 : .await
584 12 : .unwrap();
585 12 : let test_deltas3 = vec![
586 12 : (
587 12 : get_key(0),
588 12 : Lsn(0x10),
589 12 : Value::Image(Bytes::copy_from_slice(b"")),
590 12 : ),
591 12 : (
592 12 : get_key(5),
593 12 : Lsn(0x18),
594 12 : Value::Image(Bytes::copy_from_slice(b"b")),
595 12 : ),
596 12 : (
597 12 : get_key(15),
598 12 : Lsn(0x20),
599 12 : Value::Image(Bytes::copy_from_slice(b"test")),
600 12 : ),
601 12 : ];
602 12 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
603 12 : .await
604 12 : .unwrap();
605 12 : let mut test_deltas4 = test_deltas3.clone();
606 12 : test_deltas4.push((
607 12 : get_key(20),
608 12 : Lsn(0x20),
609 12 : Value::Image(Bytes::copy_from_slice(b"test")),
610 12 : ));
611 12 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
612 12 : .await
613 12 : .unwrap();
614 12 : let mut expect = Vec::new();
615 12 : expect.extend(test_deltas1);
616 12 : expect.extend(test_deltas2);
617 12 : expect.extend(test_deltas3);
618 12 : expect.extend(test_deltas4);
619 12 : expect.sort_by(sort_delta_value);
620 12 :
621 12 : // Test with different layer order for MergeIterator::create to ensure the order
622 12 : // is stable.
623 12 :
624 12 : let mut merge_iter = MergeIterator::create(
625 12 : &[
626 12 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
627 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
628 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
629 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
630 12 : ],
631 12 : &[],
632 12 : &ctx,
633 12 : );
634 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
635 12 :
636 12 : let mut merge_iter = MergeIterator::create(
637 12 : &[
638 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
639 12 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
640 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
641 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
642 12 : ],
643 12 : &[],
644 12 : &ctx,
645 12 : );
646 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
647 12 :
648 12 : is_send(merge_iter);
649 12 : }
650 :
651 : #[cfg(feature = "testing")]
652 12 : fn is_send(_: impl Send) {}
653 : }
|