Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : sync::Arc,
5 : };
6 :
7 : use anyhow::bail;
8 : use pageserver_api::key::Key;
9 : use utils::lsn::Lsn;
10 :
11 : use crate::context::RequestContext;
12 : use pageserver_api::value::Value;
13 :
14 : use super::{
15 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
16 : image_layer::{ImageLayerInner, ImageLayerIterator},
17 : PersistentLayerDesc, PersistentLayerKey,
18 : };
19 :
20 : #[derive(Clone, Copy)]
21 : pub(crate) enum LayerRef<'a> {
22 : Image(&'a ImageLayerInner),
23 : Delta(&'a DeltaLayerInner),
24 : }
25 :
26 : impl<'a> LayerRef<'a> {
27 1168 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
28 1168 : match self {
29 132 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
30 1036 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
31 : }
32 1168 : }
33 :
34 0 : fn layer_dbg_info(&self) -> String {
35 0 : match self {
36 0 : Self::Image(x) => x.layer_dbg_info(),
37 0 : Self::Delta(x) => x.layer_dbg_info(),
38 : }
39 0 : }
40 : }
41 :
42 : enum LayerIterRef<'a> {
43 : Image(ImageLayerIterator<'a>),
44 : Delta(DeltaLayerIterator<'a>),
45 : }
46 :
47 : impl LayerIterRef<'_> {
48 4145164 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
49 4145164 : match self {
50 4143612 : Self::Delta(x) => x.next().await,
51 1552 : Self::Image(x) => x.next().await,
52 : }
53 4145164 : }
54 :
55 0 : fn layer_dbg_info(&self) -> String {
56 0 : match self {
57 0 : Self::Image(x) => x.layer_dbg_info(),
58 0 : Self::Delta(x) => x.layer_dbg_info(),
59 : }
60 0 : }
61 : }
62 :
63 : /// This type plays several roles at once
64 : /// 1. Unified iterator for image and delta layers.
65 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
66 : /// 3. Lazy creation of the real delta/image iterator.
67 : pub(crate) enum IteratorWrapper<'a> {
68 : NotLoaded {
69 : ctx: &'a RequestContext,
70 : first_key_lower_bound: (Key, Lsn),
71 : layer: LayerRef<'a>,
72 : source_desc: Arc<PersistentLayerKey>,
73 : },
74 : Loaded {
75 : iter: PeekableLayerIterRef<'a>,
76 : source_desc: Arc<PersistentLayerKey>,
77 : },
78 : }
79 :
80 : pub(crate) struct PeekableLayerIterRef<'a> {
81 : iter: LayerIterRef<'a>,
82 : peeked: Option<(Key, Lsn, Value)>, // None == end
83 : }
84 :
85 : impl<'a> PeekableLayerIterRef<'a> {
86 1168 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
87 1168 : let peeked = iter.next().await?;
88 1168 : Ok(Self { iter, peeked })
89 1168 : }
90 :
91 16903246 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
92 16903246 : &self.peeked
93 16903246 : }
94 :
95 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
96 4143996 : let result = self.peeked.take();
97 4143996 : self.peeked = self.iter.next().await?;
98 4143996 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
99 4141668 : if (k1, l1) < (k2, l2) {
100 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
101 4141668 : }
102 2328 : }
103 4143996 : Ok(result)
104 4143996 : }
105 : }
106 :
107 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
108 0 : fn eq(&self, other: &Self) -> bool {
109 0 : self.cmp(other) == Ordering::Equal
110 0 : }
111 : }
112 :
113 : impl std::cmp::Eq for IteratorWrapper<'_> {}
114 :
115 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
116 8460085 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
117 8460085 : Some(self.cmp(other))
118 8460085 : }
119 : }
120 :
121 : impl std::cmp::Ord for IteratorWrapper<'_> {
122 8460085 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
123 : use std::cmp::Ordering;
124 8460085 : let a = self.peek_next_key_lsn_value();
125 8460085 : let b = other.peek_next_key_lsn_value();
126 8460085 : match (a, b) {
127 6043839 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
128 12087678 : fn map_value_to_num(val: &Option<&Value>) -> usize {
129 12069906 : match val {
130 17772 : None => 0,
131 12068886 : Some(Value::Image(_)) => 1,
132 1020 : Some(Value::WalRecord(_)) => 2,
133 : }
134 12087678 : }
135 6043839 : let order_1 = map_value_to_num(&v1);
136 6043839 : let order_2 = map_value_to_num(&v2);
137 6043839 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
138 6043839 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
139 6043839 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
140 : }
141 2005898 : (Some(_), None) => Ordering::Less,
142 2780 : (None, Some(_)) => Ordering::Greater,
143 407568 : (None, None) => Ordering::Equal,
144 : }
145 8460085 : .reverse()
146 8460085 : }
147 : }
148 :
149 : impl<'a> IteratorWrapper<'a> {
150 132 : pub fn create_from_image_layer(
151 132 : image_layer: &'a ImageLayerInner,
152 132 : ctx: &'a RequestContext,
153 132 : ) -> Self {
154 132 : Self::NotLoaded {
155 132 : layer: LayerRef::Image(image_layer),
156 132 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
157 132 : ctx,
158 132 : source_desc: PersistentLayerKey {
159 132 : key_range: image_layer.key_range().clone(),
160 132 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
161 132 : is_delta: false,
162 132 : }
163 132 : .into(),
164 132 : }
165 132 : }
166 :
167 1036 : pub fn create_from_delta_layer(
168 1036 : delta_layer: &'a DeltaLayerInner,
169 1036 : ctx: &'a RequestContext,
170 1036 : ) -> Self {
171 1036 : Self::NotLoaded {
172 1036 : layer: LayerRef::Delta(delta_layer),
173 1036 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
174 1036 : ctx,
175 1036 : source_desc: PersistentLayerKey {
176 1036 : key_range: delta_layer.key_range().clone(),
177 1036 : lsn_range: delta_layer.lsn_range().clone(),
178 1036 : is_delta: true,
179 1036 : }
180 1036 : .into(),
181 1036 : }
182 1036 : }
183 :
184 16920170 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
185 16920170 : match self {
186 16902078 : Self::Loaded { iter, .. } => iter
187 16902078 : .peek()
188 16902078 : .as_ref()
189 16902078 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
190 18092 : Self::NotLoaded {
191 18092 : first_key_lower_bound: (key, lsn),
192 18092 : ..
193 18092 : } => Some((key, *lsn, None)),
194 : }
195 16920170 : }
196 :
197 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
198 : //
199 : // The reason is that `impl Ord for Self` evaluates differently after this function
200 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
201 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
202 : // and not just `PeekMut::deref`
203 : // If we don't take `&mut self`
204 1168 : async fn load(&mut self) -> anyhow::Result<()> {
205 1168 : assert!(!self.is_loaded());
206 1168 : let Self::NotLoaded {
207 1168 : ctx,
208 1168 : first_key_lower_bound,
209 1168 : layer,
210 1168 : source_desc,
211 1168 : } = self
212 : else {
213 0 : unreachable!()
214 : };
215 1168 : let iter = layer.iter(ctx);
216 1168 : let iter = PeekableLayerIterRef::create(iter).await?;
217 1168 : if let Some((k1, l1, _)) = iter.peek() {
218 1168 : let (k2, l2) = first_key_lower_bound;
219 1168 : if (k1, l1) < (k2, l2) {
220 0 : bail!(
221 0 : "layer key range did not include the first key in the layer: {}",
222 0 : layer.layer_dbg_info()
223 0 : );
224 1168 : }
225 0 : }
226 1168 : *self = Self::Loaded {
227 1168 : iter,
228 1168 : source_desc: source_desc.clone(),
229 1168 : };
230 1168 : Ok(())
231 1168 : }
232 :
233 4146332 : fn is_loaded(&self) -> bool {
234 4146332 : matches!(self, Self::Loaded { .. })
235 4146332 : }
236 :
237 : /// Correctness: must load the iterator before using.
238 : ///
239 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
240 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
241 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
242 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
243 4143996 : let Self::Loaded { iter, .. } = self else {
244 0 : panic!("must load the iterator before using")
245 : };
246 4143996 : iter.next().await
247 4143996 : }
248 :
249 : /// Get the persistent layer key corresponding to this iterator
250 1828 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
251 1828 : match self {
252 1828 : Self::Loaded { source_desc, .. } => source_desc.clone(),
253 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
254 : }
255 1828 : }
256 : }
257 :
258 : /// A merge iterator over delta/image layer iterators.
259 : ///
260 : /// When duplicated records are found, the iterator will not perform any
261 : /// deduplication, and the caller should handle these situation. By saying
262 : /// duplicated records, there are many possibilities:
263 : ///
264 : /// * Two same delta at the same LSN.
265 : /// * Two same image at the same LSN.
266 : /// * Delta/image at the same LSN where the image has already applied the delta.
267 : ///
268 : /// The iterator will always put the image before the delta.
269 : pub struct MergeIterator<'a> {
270 : heap: BinaryHeap<IteratorWrapper<'a>>,
271 : }
272 :
273 : pub(crate) trait MergeIteratorItem {
274 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
275 :
276 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
277 : }
278 :
279 : impl MergeIteratorItem for (Key, Lsn, Value) {
280 4141004 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
281 4141004 : item
282 4141004 : }
283 :
284 1580 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
285 1580 : self
286 1580 : }
287 : }
288 :
289 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
290 1828 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
291 1828 : (item, iter.trace_source().clone())
292 1828 : }
293 :
294 3848 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
295 3848 : &self.0
296 3848 : }
297 : }
298 :
299 : impl<'a> MergeIterator<'a> {
300 184 : pub fn create(
301 184 : deltas: &[&'a DeltaLayerInner],
302 184 : images: &[&'a ImageLayerInner],
303 184 : ctx: &'a RequestContext,
304 184 : ) -> Self {
305 184 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
306 316 : for image in images {
307 132 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
308 132 : }
309 1220 : for delta in deltas {
310 1036 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
311 1036 : }
312 184 : Self {
313 184 : heap: BinaryHeap::from(heap),
314 184 : }
315 184 : }
316 :
317 4143012 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
318 4145344 : while let Some(mut iter) = self.heap.peek_mut() {
319 4145164 : if !iter.is_loaded() {
320 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
321 : // We put it back into the heap so that a potentially unloaded layer may have a key between
322 : // [potential_first_key, loaded_first_key).
323 1168 : iter.load().await?;
324 1168 : continue;
325 4143996 : }
326 4143996 : let Some(item) = iter.next().await? else {
327 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
328 : // we order None > Some, and all the rest of the iterators should return None.
329 1164 : binary_heap::PeekMut::pop(iter);
330 1164 : continue;
331 : };
332 4142832 : return Ok(Some(R::new(item, &iter)));
333 : }
334 180 : Ok(None)
335 4143012 : }
336 :
337 : /// Get the next key-value pair from the iterator.
338 4140292 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
339 4140292 : self.next_inner().await
340 4140292 : }
341 :
342 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
343 0 : pub async fn next_with_trace(
344 0 : &mut self,
345 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
346 0 : self.next_inner().await
347 0 : }
348 : }
349 :
350 : #[cfg(test)]
351 : mod tests {
352 : use super::*;
353 :
354 : use itertools::Itertools;
355 : use pageserver_api::key::Key;
356 : use utils::lsn::Lsn;
357 :
358 : use crate::{
359 : tenant::{
360 : harness::{TenantHarness, TIMELINE_ID},
361 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta},
362 : },
363 : DEFAULT_PG_VERSION,
364 : };
365 :
366 : #[cfg(feature = "testing")]
367 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
368 : #[cfg(feature = "testing")]
369 : use pageserver_api::record::NeonWalRecord;
370 :
371 16 : async fn assert_merge_iter_equal(
372 16 : merge_iter: &mut MergeIterator<'_>,
373 16 : expect: &[(Key, Lsn, Value)],
374 16 : ) {
375 16 : let mut expect_iter = expect.iter();
376 : loop {
377 12160 : let o1 = merge_iter.next().await.unwrap();
378 12160 : let o2 = expect_iter.next();
379 12160 : assert_eq!(o1.is_some(), o2.is_some());
380 12160 : if o1.is_none() && o2.is_none() {
381 16 : break;
382 12144 : }
383 12144 : let (k1, l1, v1) = o1.unwrap();
384 12144 : let (k2, l2, v2) = o2.unwrap();
385 12144 : assert_eq!(&k1, k2);
386 12144 : assert_eq!(l1, *l2);
387 12144 : assert_eq!(&v1, v2);
388 : }
389 16 : }
390 :
391 : #[tokio::test]
392 4 : async fn merge_in_between() {
393 4 : use bytes::Bytes;
394 4 : use pageserver_api::value::Value;
395 4 :
396 4 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
397 4 : .await
398 4 : .unwrap();
399 4 : let (tenant, ctx) = harness.load().await;
400 4 :
401 4 : let tline = tenant
402 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
403 4 : .await
404 4 : .unwrap();
405 4 :
406 16 : fn get_key(id: u32) -> Key {
407 16 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
408 16 : key.field6 = id;
409 16 : key
410 16 : }
411 4 : let test_deltas1 = vec![
412 4 : (
413 4 : get_key(0),
414 4 : Lsn(0x10),
415 4 : Value::Image(Bytes::copy_from_slice(b"test")),
416 4 : ),
417 4 : (
418 4 : get_key(5),
419 4 : Lsn(0x10),
420 4 : Value::Image(Bytes::copy_from_slice(b"test")),
421 4 : ),
422 4 : ];
423 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
424 4 : .await
425 4 : .unwrap();
426 4 : let test_deltas2 = vec![
427 4 : (
428 4 : get_key(3),
429 4 : Lsn(0x10),
430 4 : Value::Image(Bytes::copy_from_slice(b"test")),
431 4 : ),
432 4 : (
433 4 : get_key(4),
434 4 : Lsn(0x10),
435 4 : Value::Image(Bytes::copy_from_slice(b"test")),
436 4 : ),
437 4 : ];
438 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
439 4 : .await
440 4 : .unwrap();
441 4 : let mut merge_iter = MergeIterator::create(
442 4 : &[
443 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
444 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
445 4 : ],
446 4 : &[],
447 4 : &ctx,
448 4 : );
449 4 : let mut expect = Vec::new();
450 4 : expect.extend(test_deltas1);
451 4 : expect.extend(test_deltas2);
452 4 : expect.sort_by(sort_delta);
453 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
454 4 : }
455 :
456 : #[tokio::test]
457 4 : async fn delta_merge() {
458 4 : use bytes::Bytes;
459 4 : use pageserver_api::value::Value;
460 4 :
461 4 : let harness = TenantHarness::create("merge_iterator_delta_merge")
462 4 : .await
463 4 : .unwrap();
464 4 : let (tenant, ctx) = harness.load().await;
465 4 :
466 4 : let tline = tenant
467 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
468 4 : .await
469 4 : .unwrap();
470 4 :
471 12000 : fn get_key(id: u32) -> Key {
472 12000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
473 12000 : key.field6 = id;
474 12000 : key
475 12000 : }
476 4 : const N: usize = 1000;
477 4 : let test_deltas1 = (0..N)
478 4000 : .map(|idx| {
479 4000 : (
480 4000 : get_key(idx as u32 / 10),
481 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
482 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
483 4000 : )
484 4000 : })
485 4 : .collect_vec();
486 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
487 4 : .await
488 4 : .unwrap();
489 4 : let test_deltas2 = (0..N)
490 4000 : .map(|idx| {
491 4000 : (
492 4000 : get_key(idx as u32 / 10),
493 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
494 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
495 4000 : )
496 4000 : })
497 4 : .collect_vec();
498 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
499 4 : .await
500 4 : .unwrap();
501 4 : let test_deltas3 = (0..N)
502 4000 : .map(|idx| {
503 4000 : (
504 4000 : get_key(idx as u32 / 10 + N as u32),
505 4000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
506 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
507 4000 : )
508 4000 : })
509 4 : .collect_vec();
510 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
511 4 : .await
512 4 : .unwrap();
513 4 : let mut merge_iter = MergeIterator::create(
514 4 : &[
515 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
516 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
517 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
518 4 : ],
519 4 : &[],
520 4 : &ctx,
521 4 : );
522 4 : let mut expect = Vec::new();
523 4 : expect.extend(test_deltas1);
524 4 : expect.extend(test_deltas2);
525 4 : expect.extend(test_deltas3);
526 4 : expect.sort_by(sort_delta);
527 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
528 4 :
529 4 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
530 4 : }
531 :
532 : #[cfg(feature = "testing")]
533 : #[tokio::test]
534 4 : async fn delta_image_mixed_merge() {
535 4 : use bytes::Bytes;
536 4 : use pageserver_api::value::Value;
537 4 :
538 4 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
539 4 : .await
540 4 : .unwrap();
541 4 : let (tenant, ctx) = harness.load().await;
542 4 :
543 4 : let tline = tenant
544 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
545 4 : .await
546 4 : .unwrap();
547 4 :
548 36 : fn get_key(id: u32) -> Key {
549 36 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
550 36 : key.field6 = id;
551 36 : key
552 36 : }
553 4 : // In this test case, we want to test if the iterator still works correctly with multiple copies
554 4 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
555 4 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
556 4 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
557 4 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
558 4 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
559 4 : // correctly process these situations and return everything as-is, and the upper layer of the system
560 4 : // will handle duplicated LSNs.
561 4 : let test_deltas1 = vec![
562 4 : (
563 4 : get_key(0),
564 4 : Lsn(0x10),
565 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
566 4 : ),
567 4 : (
568 4 : get_key(0),
569 4 : Lsn(0x18),
570 4 : Value::WalRecord(NeonWalRecord::wal_append("a")),
571 4 : ),
572 4 : (
573 4 : get_key(5),
574 4 : Lsn(0x10),
575 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
576 4 : ),
577 4 : (
578 4 : get_key(5),
579 4 : Lsn(0x18),
580 4 : Value::WalRecord(NeonWalRecord::wal_append("b")),
581 4 : ),
582 4 : ];
583 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
584 4 : .await
585 4 : .unwrap();
586 4 : let mut test_deltas2 = test_deltas1.clone();
587 4 : test_deltas2.push((
588 4 : get_key(10),
589 4 : Lsn(0x20),
590 4 : Value::Image(Bytes::copy_from_slice(b"test")),
591 4 : ));
592 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
593 4 : .await
594 4 : .unwrap();
595 4 : let test_deltas3 = vec![
596 4 : (
597 4 : get_key(0),
598 4 : Lsn(0x10),
599 4 : Value::Image(Bytes::copy_from_slice(b"")),
600 4 : ),
601 4 : (
602 4 : get_key(5),
603 4 : Lsn(0x18),
604 4 : Value::Image(Bytes::copy_from_slice(b"b")),
605 4 : ),
606 4 : (
607 4 : get_key(15),
608 4 : Lsn(0x20),
609 4 : Value::Image(Bytes::copy_from_slice(b"test")),
610 4 : ),
611 4 : ];
612 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
613 4 : .await
614 4 : .unwrap();
615 4 : let mut test_deltas4 = test_deltas3.clone();
616 4 : test_deltas4.push((
617 4 : get_key(20),
618 4 : Lsn(0x20),
619 4 : Value::Image(Bytes::copy_from_slice(b"test")),
620 4 : ));
621 4 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
622 4 : .await
623 4 : .unwrap();
624 4 : let mut expect = Vec::new();
625 4 : expect.extend(test_deltas1);
626 4 : expect.extend(test_deltas2);
627 4 : expect.extend(test_deltas3);
628 4 : expect.extend(test_deltas4);
629 4 : expect.sort_by(sort_delta_value);
630 4 :
631 4 : // Test with different layer order for MergeIterator::create to ensure the order
632 4 : // is stable.
633 4 :
634 4 : let mut merge_iter = MergeIterator::create(
635 4 : &[
636 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
637 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
638 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
639 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
640 4 : ],
641 4 : &[],
642 4 : &ctx,
643 4 : );
644 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
645 4 :
646 4 : let mut merge_iter = MergeIterator::create(
647 4 : &[
648 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
649 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
650 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
651 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
652 4 : ],
653 4 : &[],
654 4 : &ctx,
655 4 : );
656 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
657 4 :
658 4 : is_send(merge_iter);
659 4 : }
660 :
661 : #[cfg(feature = "testing")]
662 4 : fn is_send(_: impl Send) {}
663 : }
|