Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::value::Value;
8 : use utils::lsn::Lsn;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 1168 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
23 1168 : match self {
24 132 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
25 1036 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
26 : }
27 1168 : }
28 :
29 0 : fn layer_dbg_info(&self) -> String {
30 0 : match self {
31 0 : Self::Image(x) => x.layer_dbg_info(),
32 0 : Self::Delta(x) => x.layer_dbg_info(),
33 : }
34 0 : }
35 : }
36 :
37 : enum LayerIterRef<'a> {
38 : Image(ImageLayerIterator<'a>),
39 : Delta(DeltaLayerIterator<'a>),
40 : }
41 :
42 : impl LayerIterRef<'_> {
43 4145164 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
44 4145164 : match self {
45 4143612 : Self::Delta(x) => x.next().await,
46 1552 : Self::Image(x) => x.next().await,
47 : }
48 4145164 : }
49 :
50 0 : fn layer_dbg_info(&self) -> String {
51 0 : match self {
52 0 : Self::Image(x) => x.layer_dbg_info(),
53 0 : Self::Delta(x) => x.layer_dbg_info(),
54 : }
55 0 : }
56 : }
57 :
58 : /// This type plays several roles at once
59 : /// 1. Unified iterator for image and delta layers.
60 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
61 : /// 3. Lazy creation of the real delta/image iterator.
62 : pub(crate) enum IteratorWrapper<'a> {
63 : NotLoaded {
64 : ctx: &'a RequestContext,
65 : first_key_lower_bound: (Key, Lsn),
66 : layer: LayerRef<'a>,
67 : source_desc: Arc<PersistentLayerKey>,
68 : },
69 : Loaded {
70 : iter: PeekableLayerIterRef<'a>,
71 : source_desc: Arc<PersistentLayerKey>,
72 : },
73 : }
74 :
75 : pub(crate) struct PeekableLayerIterRef<'a> {
76 : iter: LayerIterRef<'a>,
77 : peeked: Option<(Key, Lsn, Value)>, // None == end
78 : }
79 :
80 : impl<'a> PeekableLayerIterRef<'a> {
81 1168 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
82 1168 : let peeked = iter.next().await?;
83 1168 : Ok(Self { iter, peeked })
84 1168 : }
85 :
86 16902654 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
87 16902654 : &self.peeked
88 16902654 : }
89 :
90 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
91 4143996 : let result = self.peeked.take();
92 4143996 : self.peeked = self.iter.next().await?;
93 4143996 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
94 4141668 : if (k1, l1) < (k2, l2) {
95 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
96 4141668 : }
97 2328 : }
98 4143996 : Ok(result)
99 4143996 : }
100 : }
101 :
102 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
103 0 : fn eq(&self, other: &Self) -> bool {
104 0 : self.cmp(other) == Ordering::Equal
105 0 : }
106 : }
107 :
108 : impl std::cmp::Eq for IteratorWrapper<'_> {}
109 :
110 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
111 8459789 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 8459789 : Some(self.cmp(other))
113 8459789 : }
114 : }
115 :
116 : impl std::cmp::Ord for IteratorWrapper<'_> {
117 8459789 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 : use std::cmp::Ordering;
119 8459789 : let a = self.peek_next_key_lsn_value();
120 8459789 : let b = other.peek_next_key_lsn_value();
121 8459789 : match (a, b) {
122 6043558 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
123 12087116 : fn map_value_to_num(val: &Option<&Value>) -> usize {
124 12069344 : match val {
125 17772 : None => 0,
126 12068324 : Some(Value::Image(_)) => 1,
127 1020 : Some(Value::WalRecord(_)) => 2,
128 : }
129 12087116 : }
130 6043558 : let order_1 = map_value_to_num(&v1);
131 6043558 : let order_2 = map_value_to_num(&v2);
132 6043558 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
133 6043558 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
134 6043558 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
135 : }
136 2005895 : (Some(_), None) => Ordering::Less,
137 2768 : (None, Some(_)) => Ordering::Greater,
138 407568 : (None, None) => Ordering::Equal,
139 : }
140 8459789 : .reverse()
141 8459789 : }
142 : }
143 :
144 : impl<'a> IteratorWrapper<'a> {
145 132 : pub fn create_from_image_layer(
146 132 : image_layer: &'a ImageLayerInner,
147 132 : ctx: &'a RequestContext,
148 132 : ) -> Self {
149 132 : Self::NotLoaded {
150 132 : layer: LayerRef::Image(image_layer),
151 132 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
152 132 : ctx,
153 132 : source_desc: PersistentLayerKey {
154 132 : key_range: image_layer.key_range().clone(),
155 132 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
156 132 : is_delta: false,
157 132 : }
158 132 : .into(),
159 132 : }
160 132 : }
161 :
162 1036 : pub fn create_from_delta_layer(
163 1036 : delta_layer: &'a DeltaLayerInner,
164 1036 : ctx: &'a RequestContext,
165 1036 : ) -> Self {
166 1036 : Self::NotLoaded {
167 1036 : layer: LayerRef::Delta(delta_layer),
168 1036 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
169 1036 : ctx,
170 1036 : source_desc: PersistentLayerKey {
171 1036 : key_range: delta_layer.key_range().clone(),
172 1036 : lsn_range: delta_layer.lsn_range().clone(),
173 1036 : is_delta: true,
174 1036 : }
175 1036 : .into(),
176 1036 : }
177 1036 : }
178 :
179 16919578 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
180 16919578 : match self {
181 16901486 : Self::Loaded { iter, .. } => iter
182 16901486 : .peek()
183 16901486 : .as_ref()
184 16901486 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
185 18092 : Self::NotLoaded {
186 18092 : first_key_lower_bound: (key, lsn),
187 18092 : ..
188 18092 : } => Some((key, *lsn, None)),
189 : }
190 16919578 : }
191 :
192 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
193 : //
194 : // The reason is that `impl Ord for Self` evaluates differently after this function
195 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
196 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
197 : // and not just `PeekMut::deref`
198 : // If we don't take `&mut self`
199 1168 : async fn load(&mut self) -> anyhow::Result<()> {
200 1168 : assert!(!self.is_loaded());
201 1168 : let Self::NotLoaded {
202 1168 : ctx,
203 1168 : first_key_lower_bound,
204 1168 : layer,
205 1168 : source_desc,
206 1168 : } = self
207 : else {
208 0 : unreachable!()
209 : };
210 1168 : let iter = layer.iter(ctx);
211 1168 : let iter = PeekableLayerIterRef::create(iter).await?;
212 1168 : if let Some((k1, l1, _)) = iter.peek() {
213 1168 : let (k2, l2) = first_key_lower_bound;
214 1168 : if (k1, l1) < (k2, l2) {
215 0 : bail!(
216 0 : "layer key range did not include the first key in the layer: {}",
217 0 : layer.layer_dbg_info()
218 0 : );
219 1168 : }
220 0 : }
221 1168 : *self = Self::Loaded {
222 1168 : iter,
223 1168 : source_desc: source_desc.clone(),
224 1168 : };
225 1168 : Ok(())
226 1168 : }
227 :
228 4146332 : fn is_loaded(&self) -> bool {
229 4146332 : matches!(self, Self::Loaded { .. })
230 4146332 : }
231 :
232 : /// Correctness: must load the iterator before using.
233 : ///
234 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
235 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
236 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
237 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
238 4143996 : let Self::Loaded { iter, .. } = self else {
239 0 : panic!("must load the iterator before using")
240 : };
241 4143996 : iter.next().await
242 4143996 : }
243 :
244 : /// Get the persistent layer key corresponding to this iterator
245 1828 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
246 1828 : match self {
247 1828 : Self::Loaded { source_desc, .. } => source_desc.clone(),
248 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
249 : }
250 1828 : }
251 : }
252 :
253 : /// A merge iterator over delta/image layer iterators.
254 : ///
255 : /// When duplicated records are found, the iterator will not perform any
256 : /// deduplication, and the caller should handle these situation. By saying
257 : /// duplicated records, there are many possibilities:
258 : ///
259 : /// * Two same delta at the same LSN.
260 : /// * Two same image at the same LSN.
261 : /// * Delta/image at the same LSN where the image has already applied the delta.
262 : ///
263 : /// The iterator will always put the image before the delta.
264 : pub struct MergeIterator<'a> {
265 : heap: BinaryHeap<IteratorWrapper<'a>>,
266 : }
267 :
268 : pub(crate) trait MergeIteratorItem {
269 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
270 :
271 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
272 : }
273 :
274 : impl MergeIteratorItem for (Key, Lsn, Value) {
275 4141004 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
276 4141004 : item
277 4141004 : }
278 :
279 1580 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
280 1580 : self
281 1580 : }
282 : }
283 :
284 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
285 1828 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
286 1828 : (item, iter.trace_source().clone())
287 1828 : }
288 :
289 3848 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
290 3848 : &self.0
291 3848 : }
292 : }
293 :
294 : impl<'a> MergeIterator<'a> {
295 184 : pub fn create(
296 184 : deltas: &[&'a DeltaLayerInner],
297 184 : images: &[&'a ImageLayerInner],
298 184 : ctx: &'a RequestContext,
299 184 : ) -> Self {
300 184 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
301 316 : for image in images {
302 132 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
303 132 : }
304 1220 : for delta in deltas {
305 1036 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
306 1036 : }
307 184 : Self {
308 184 : heap: BinaryHeap::from(heap),
309 184 : }
310 184 : }
311 :
312 4143012 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
313 4145344 : while let Some(mut iter) = self.heap.peek_mut() {
314 4145164 : if !iter.is_loaded() {
315 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
316 : // We put it back into the heap so that a potentially unloaded layer may have a key between
317 : // [potential_first_key, loaded_first_key).
318 1168 : iter.load().await?;
319 1168 : continue;
320 4143996 : }
321 4143996 : let Some(item) = iter.next().await? else {
322 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
323 : // we order None > Some, and all the rest of the iterators should return None.
324 1164 : binary_heap::PeekMut::pop(iter);
325 1164 : continue;
326 : };
327 4142832 : return Ok(Some(R::new(item, &iter)));
328 : }
329 180 : Ok(None)
330 4143012 : }
331 :
332 : /// Get the next key-value pair from the iterator.
333 4140292 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
334 4140292 : self.next_inner().await
335 4140292 : }
336 :
337 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
338 0 : pub async fn next_with_trace(
339 0 : &mut self,
340 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
341 0 : self.next_inner().await
342 0 : }
343 : }
344 :
345 : #[cfg(test)]
346 : mod tests {
347 : use itertools::Itertools;
348 : use pageserver_api::key::Key;
349 : #[cfg(feature = "testing")]
350 : use pageserver_api::record::NeonWalRecord;
351 : use utils::lsn::Lsn;
352 :
353 : use super::*;
354 : use crate::DEFAULT_PG_VERSION;
355 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
356 : #[cfg(feature = "testing")]
357 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
358 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
359 :
360 16 : async fn assert_merge_iter_equal(
361 16 : merge_iter: &mut MergeIterator<'_>,
362 16 : expect: &[(Key, Lsn, Value)],
363 16 : ) {
364 16 : let mut expect_iter = expect.iter();
365 : loop {
366 12160 : let o1 = merge_iter.next().await.unwrap();
367 12160 : let o2 = expect_iter.next();
368 12160 : assert_eq!(o1.is_some(), o2.is_some());
369 12160 : if o1.is_none() && o2.is_none() {
370 16 : break;
371 12144 : }
372 12144 : let (k1, l1, v1) = o1.unwrap();
373 12144 : let (k2, l2, v2) = o2.unwrap();
374 12144 : assert_eq!(&k1, k2);
375 12144 : assert_eq!(l1, *l2);
376 12144 : assert_eq!(&v1, v2);
377 : }
378 16 : }
379 :
380 : #[tokio::test]
381 4 : async fn merge_in_between() {
382 4 : use bytes::Bytes;
383 4 : use pageserver_api::value::Value;
384 4 :
385 4 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
386 4 : .await
387 4 : .unwrap();
388 4 : let (tenant, ctx) = harness.load().await;
389 4 :
390 4 : let tline = tenant
391 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
392 4 : .await
393 4 : .unwrap();
394 4 :
395 16 : fn get_key(id: u32) -> Key {
396 16 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
397 16 : key.field6 = id;
398 16 : key
399 16 : }
400 4 : let test_deltas1 = vec![
401 4 : (
402 4 : get_key(0),
403 4 : Lsn(0x10),
404 4 : Value::Image(Bytes::copy_from_slice(b"test")),
405 4 : ),
406 4 : (
407 4 : get_key(5),
408 4 : Lsn(0x10),
409 4 : Value::Image(Bytes::copy_from_slice(b"test")),
410 4 : ),
411 4 : ];
412 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
413 4 : .await
414 4 : .unwrap();
415 4 : let test_deltas2 = vec![
416 4 : (
417 4 : get_key(3),
418 4 : Lsn(0x10),
419 4 : Value::Image(Bytes::copy_from_slice(b"test")),
420 4 : ),
421 4 : (
422 4 : get_key(4),
423 4 : Lsn(0x10),
424 4 : Value::Image(Bytes::copy_from_slice(b"test")),
425 4 : ),
426 4 : ];
427 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
428 4 : .await
429 4 : .unwrap();
430 4 : let mut merge_iter = MergeIterator::create(
431 4 : &[
432 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
433 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
434 4 : ],
435 4 : &[],
436 4 : &ctx,
437 4 : );
438 4 : let mut expect = Vec::new();
439 4 : expect.extend(test_deltas1);
440 4 : expect.extend(test_deltas2);
441 4 : expect.sort_by(sort_delta);
442 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
443 4 : }
444 :
445 : #[tokio::test]
446 4 : async fn delta_merge() {
447 4 : use bytes::Bytes;
448 4 : use pageserver_api::value::Value;
449 4 :
450 4 : let harness = TenantHarness::create("merge_iterator_delta_merge")
451 4 : .await
452 4 : .unwrap();
453 4 : let (tenant, ctx) = harness.load().await;
454 4 :
455 4 : let tline = tenant
456 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
457 4 : .await
458 4 : .unwrap();
459 4 :
460 12000 : fn get_key(id: u32) -> Key {
461 12000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
462 12000 : key.field6 = id;
463 12000 : key
464 12000 : }
465 4 : const N: usize = 1000;
466 4 : let test_deltas1 = (0..N)
467 4000 : .map(|idx| {
468 4000 : (
469 4000 : get_key(idx as u32 / 10),
470 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
471 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
472 4000 : )
473 4000 : })
474 4 : .collect_vec();
475 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
476 4 : .await
477 4 : .unwrap();
478 4 : let test_deltas2 = (0..N)
479 4000 : .map(|idx| {
480 4000 : (
481 4000 : get_key(idx as u32 / 10),
482 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
483 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
484 4000 : )
485 4000 : })
486 4 : .collect_vec();
487 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
488 4 : .await
489 4 : .unwrap();
490 4 : let test_deltas3 = (0..N)
491 4000 : .map(|idx| {
492 4000 : (
493 4000 : get_key(idx as u32 / 10 + N as u32),
494 4000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
495 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
496 4000 : )
497 4000 : })
498 4 : .collect_vec();
499 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
500 4 : .await
501 4 : .unwrap();
502 4 : let mut merge_iter = MergeIterator::create(
503 4 : &[
504 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
505 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
506 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
507 4 : ],
508 4 : &[],
509 4 : &ctx,
510 4 : );
511 4 : let mut expect = Vec::new();
512 4 : expect.extend(test_deltas1);
513 4 : expect.extend(test_deltas2);
514 4 : expect.extend(test_deltas3);
515 4 : expect.sort_by(sort_delta);
516 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
517 4 :
518 4 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
519 4 : }
520 :
521 : #[cfg(feature = "testing")]
522 : #[tokio::test]
523 4 : async fn delta_image_mixed_merge() {
524 4 : use bytes::Bytes;
525 4 : use pageserver_api::value::Value;
526 4 :
527 4 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
528 4 : .await
529 4 : .unwrap();
530 4 : let (tenant, ctx) = harness.load().await;
531 4 :
532 4 : let tline = tenant
533 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
534 4 : .await
535 4 : .unwrap();
536 4 :
537 36 : fn get_key(id: u32) -> Key {
538 36 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
539 36 : key.field6 = id;
540 36 : key
541 36 : }
542 4 : // In this test case, we want to test if the iterator still works correctly with multiple copies
543 4 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
544 4 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
545 4 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
546 4 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
547 4 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
548 4 : // correctly process these situations and return everything as-is, and the upper layer of the system
549 4 : // will handle duplicated LSNs.
550 4 : let test_deltas1 = vec![
551 4 : (
552 4 : get_key(0),
553 4 : Lsn(0x10),
554 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
555 4 : ),
556 4 : (
557 4 : get_key(0),
558 4 : Lsn(0x18),
559 4 : Value::WalRecord(NeonWalRecord::wal_append("a")),
560 4 : ),
561 4 : (
562 4 : get_key(5),
563 4 : Lsn(0x10),
564 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
565 4 : ),
566 4 : (
567 4 : get_key(5),
568 4 : Lsn(0x18),
569 4 : Value::WalRecord(NeonWalRecord::wal_append("b")),
570 4 : ),
571 4 : ];
572 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
573 4 : .await
574 4 : .unwrap();
575 4 : let mut test_deltas2 = test_deltas1.clone();
576 4 : test_deltas2.push((
577 4 : get_key(10),
578 4 : Lsn(0x20),
579 4 : Value::Image(Bytes::copy_from_slice(b"test")),
580 4 : ));
581 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
582 4 : .await
583 4 : .unwrap();
584 4 : let test_deltas3 = vec![
585 4 : (
586 4 : get_key(0),
587 4 : Lsn(0x10),
588 4 : Value::Image(Bytes::copy_from_slice(b"")),
589 4 : ),
590 4 : (
591 4 : get_key(5),
592 4 : Lsn(0x18),
593 4 : Value::Image(Bytes::copy_from_slice(b"b")),
594 4 : ),
595 4 : (
596 4 : get_key(15),
597 4 : Lsn(0x20),
598 4 : Value::Image(Bytes::copy_from_slice(b"test")),
599 4 : ),
600 4 : ];
601 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
602 4 : .await
603 4 : .unwrap();
604 4 : let mut test_deltas4 = test_deltas3.clone();
605 4 : test_deltas4.push((
606 4 : get_key(20),
607 4 : Lsn(0x20),
608 4 : Value::Image(Bytes::copy_from_slice(b"test")),
609 4 : ));
610 4 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
611 4 : .await
612 4 : .unwrap();
613 4 : let mut expect = Vec::new();
614 4 : expect.extend(test_deltas1);
615 4 : expect.extend(test_deltas2);
616 4 : expect.extend(test_deltas3);
617 4 : expect.extend(test_deltas4);
618 4 : expect.sort_by(sort_delta_value);
619 4 :
620 4 : // Test with different layer order for MergeIterator::create to ensure the order
621 4 : // is stable.
622 4 :
623 4 : let mut merge_iter = MergeIterator::create(
624 4 : &[
625 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
626 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
627 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
628 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
629 4 : ],
630 4 : &[],
631 4 : &ctx,
632 4 : );
633 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
634 4 :
635 4 : let mut merge_iter = MergeIterator::create(
636 4 : &[
637 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
638 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
639 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
640 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
641 4 : ],
642 4 : &[],
643 4 : &ctx,
644 4 : );
645 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
646 4 :
647 4 : is_send(merge_iter);
648 4 : }
649 :
650 : #[cfg(feature = "testing")]
651 4 : fn is_send(_: impl Send) {}
652 : }
|