Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::value::Value;
8 : use utils::lsn::Lsn;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 1168 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
23 1168 : match self {
24 132 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
25 1036 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
26 : }
27 1168 : }
28 :
29 0 : fn layer_dbg_info(&self) -> String {
30 0 : match self {
31 0 : Self::Image(x) => x.layer_dbg_info(),
32 0 : Self::Delta(x) => x.layer_dbg_info(),
33 : }
34 0 : }
35 : }
36 :
37 : enum LayerIterRef<'a> {
38 : Image(ImageLayerIterator<'a>),
39 : Delta(DeltaLayerIterator<'a>),
40 : }
41 :
42 : impl LayerIterRef<'_> {
43 4145164 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
44 4145164 : match self {
45 4143612 : Self::Delta(x) => x.next().await,
46 1552 : Self::Image(x) => x.next().await,
47 : }
48 4145164 : }
49 :
50 0 : fn layer_dbg_info(&self) -> String {
51 0 : match self {
52 0 : Self::Image(x) => x.layer_dbg_info(),
53 0 : Self::Delta(x) => x.layer_dbg_info(),
54 : }
55 0 : }
56 : }
57 :
58 : /// This type plays several roles at once
59 : /// 1. Unified iterator for image and delta layers.
60 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
61 : /// 3. Lazy creation of the real delta/image iterator.
62 : #[allow(clippy::large_enum_variant, reason = "TODO")]
63 : pub(crate) enum IteratorWrapper<'a> {
64 : NotLoaded {
65 : ctx: &'a RequestContext,
66 : first_key_lower_bound: (Key, Lsn),
67 : layer: LayerRef<'a>,
68 : source_desc: Arc<PersistentLayerKey>,
69 : },
70 : Loaded {
71 : iter: PeekableLayerIterRef<'a>,
72 : source_desc: Arc<PersistentLayerKey>,
73 : },
74 : }
75 :
76 : pub(crate) struct PeekableLayerIterRef<'a> {
77 : iter: LayerIterRef<'a>,
78 : peeked: Option<(Key, Lsn, Value)>, // None == end
79 : }
80 :
81 : impl<'a> PeekableLayerIterRef<'a> {
82 1168 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
83 1168 : let peeked = iter.next().await?;
84 1168 : Ok(Self { iter, peeked })
85 1168 : }
86 :
87 16902210 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
88 16902210 : &self.peeked
89 16902210 : }
90 :
91 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
92 4143996 : let result = self.peeked.take();
93 4143996 : self.peeked = self.iter.next().await?;
94 4143996 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
95 4141668 : if (k1, l1) < (k2, l2) {
96 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
97 4141668 : }
98 2328 : }
99 4143996 : Ok(result)
100 4143996 : }
101 : }
102 :
103 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
104 0 : fn eq(&self, other: &Self) -> bool {
105 0 : self.cmp(other) == Ordering::Equal
106 0 : }
107 : }
108 :
109 : impl std::cmp::Eq for IteratorWrapper<'_> {}
110 :
111 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
112 8459567 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
113 8459567 : Some(self.cmp(other))
114 8459567 : }
115 : }
116 :
117 : impl std::cmp::Ord for IteratorWrapper<'_> {
118 8459567 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
119 : use std::cmp::Ordering;
120 8459567 : let a = self.peek_next_key_lsn_value();
121 8459567 : let b = other.peek_next_key_lsn_value();
122 8459567 : match (a, b) {
123 6043327 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
124 12086654 : fn map_value_to_num(val: &Option<&Value>) -> usize {
125 12068882 : match val {
126 17772 : None => 0,
127 12067862 : Some(Value::Image(_)) => 1,
128 1020 : Some(Value::WalRecord(_)) => 2,
129 : }
130 12086654 : }
131 6043327 : let order_1 = map_value_to_num(&v1);
132 6043327 : let order_2 = map_value_to_num(&v2);
133 6043327 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
134 6043327 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
135 6043327 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
136 : }
137 2005896 : (Some(_), None) => Ordering::Less,
138 2777 : (None, Some(_)) => Ordering::Greater,
139 407567 : (None, None) => Ordering::Equal,
140 : }
141 8459567 : .reverse()
142 8459567 : }
143 : }
144 :
145 : impl<'a> IteratorWrapper<'a> {
146 132 : pub fn create_from_image_layer(
147 132 : image_layer: &'a ImageLayerInner,
148 132 : ctx: &'a RequestContext,
149 132 : ) -> Self {
150 132 : Self::NotLoaded {
151 132 : layer: LayerRef::Image(image_layer),
152 132 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
153 132 : ctx,
154 132 : source_desc: PersistentLayerKey {
155 132 : key_range: image_layer.key_range().clone(),
156 132 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
157 132 : is_delta: false,
158 132 : }
159 132 : .into(),
160 132 : }
161 132 : }
162 :
163 1036 : pub fn create_from_delta_layer(
164 1036 : delta_layer: &'a DeltaLayerInner,
165 1036 : ctx: &'a RequestContext,
166 1036 : ) -> Self {
167 1036 : Self::NotLoaded {
168 1036 : layer: LayerRef::Delta(delta_layer),
169 1036 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
170 1036 : ctx,
171 1036 : source_desc: PersistentLayerKey {
172 1036 : key_range: delta_layer.key_range().clone(),
173 1036 : lsn_range: delta_layer.lsn_range().clone(),
174 1036 : is_delta: true,
175 1036 : }
176 1036 : .into(),
177 1036 : }
178 1036 : }
179 :
180 16919134 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
181 16919134 : match self {
182 16901042 : Self::Loaded { iter, .. } => iter
183 16901042 : .peek()
184 16901042 : .as_ref()
185 16901042 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
186 18092 : Self::NotLoaded {
187 18092 : first_key_lower_bound: (key, lsn),
188 18092 : ..
189 18092 : } => Some((key, *lsn, None)),
190 : }
191 16919134 : }
192 :
193 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
194 : //
195 : // The reason is that `impl Ord for Self` evaluates differently after this function
196 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
197 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
198 : // and not just `PeekMut::deref`
199 : // If we don't take `&mut self`
200 1168 : async fn load(&mut self) -> anyhow::Result<()> {
201 1168 : assert!(!self.is_loaded());
202 1168 : let Self::NotLoaded {
203 1168 : ctx,
204 1168 : first_key_lower_bound,
205 1168 : layer,
206 1168 : source_desc,
207 1168 : } = self
208 : else {
209 0 : unreachable!()
210 : };
211 1168 : let iter = layer.iter(ctx);
212 1168 : let iter = PeekableLayerIterRef::create(iter).await?;
213 1168 : if let Some((k1, l1, _)) = iter.peek() {
214 1168 : let (k2, l2) = first_key_lower_bound;
215 1168 : if (k1, l1) < (k2, l2) {
216 0 : bail!(
217 0 : "layer key range did not include the first key in the layer: {}",
218 0 : layer.layer_dbg_info()
219 0 : );
220 1168 : }
221 0 : }
222 1168 : *self = Self::Loaded {
223 1168 : iter,
224 1168 : source_desc: source_desc.clone(),
225 1168 : };
226 1168 : Ok(())
227 1168 : }
228 :
229 4146332 : fn is_loaded(&self) -> bool {
230 4146332 : matches!(self, Self::Loaded { .. })
231 4146332 : }
232 :
233 : /// Correctness: must load the iterator before using.
234 : ///
235 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
236 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
237 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
238 4143996 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
239 4143996 : let Self::Loaded { iter, .. } = self else {
240 0 : panic!("must load the iterator before using")
241 : };
242 4143996 : iter.next().await
243 4143996 : }
244 :
245 : /// Get the persistent layer key corresponding to this iterator
246 1828 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
247 1828 : match self {
248 1828 : Self::Loaded { source_desc, .. } => source_desc.clone(),
249 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
250 : }
251 1828 : }
252 : }
253 :
254 : /// A merge iterator over delta/image layer iterators.
255 : ///
256 : /// When duplicated records are found, the iterator will not perform any
257 : /// deduplication, and the caller should handle these situation. By saying
258 : /// duplicated records, there are many possibilities:
259 : ///
260 : /// * Two same delta at the same LSN.
261 : /// * Two same image at the same LSN.
262 : /// * Delta/image at the same LSN where the image has already applied the delta.
263 : ///
264 : /// The iterator will always put the image before the delta.
265 : pub struct MergeIterator<'a> {
266 : heap: BinaryHeap<IteratorWrapper<'a>>,
267 : }
268 :
269 : pub(crate) trait MergeIteratorItem {
270 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
271 :
272 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
273 : }
274 :
275 : impl MergeIteratorItem for (Key, Lsn, Value) {
276 4141004 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
277 4141004 : item
278 4141004 : }
279 :
280 1580 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
281 1580 : self
282 1580 : }
283 : }
284 :
285 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
286 1828 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
287 1828 : (item, iter.trace_source().clone())
288 1828 : }
289 :
290 3848 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
291 3848 : &self.0
292 3848 : }
293 : }
294 :
295 : impl<'a> MergeIterator<'a> {
296 184 : pub fn create(
297 184 : deltas: &[&'a DeltaLayerInner],
298 184 : images: &[&'a ImageLayerInner],
299 184 : ctx: &'a RequestContext,
300 184 : ) -> Self {
301 184 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
302 316 : for image in images {
303 132 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
304 132 : }
305 1220 : for delta in deltas {
306 1036 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
307 1036 : }
308 184 : Self {
309 184 : heap: BinaryHeap::from(heap),
310 184 : }
311 184 : }
312 :
313 4143012 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
314 4145344 : while let Some(mut iter) = self.heap.peek_mut() {
315 4145164 : if !iter.is_loaded() {
316 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
317 : // We put it back into the heap so that a potentially unloaded layer may have a key between
318 : // [potential_first_key, loaded_first_key).
319 1168 : iter.load().await?;
320 1168 : continue;
321 4143996 : }
322 4143996 : let Some(item) = iter.next().await? else {
323 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
324 : // we order None > Some, and all the rest of the iterators should return None.
325 1164 : binary_heap::PeekMut::pop(iter);
326 1164 : continue;
327 : };
328 4142832 : return Ok(Some(R::new(item, &iter)));
329 : }
330 180 : Ok(None)
331 4143012 : }
332 :
333 : /// Get the next key-value pair from the iterator.
334 4140292 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
335 4140292 : self.next_inner().await
336 4140292 : }
337 :
338 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
339 0 : pub async fn next_with_trace(
340 0 : &mut self,
341 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
342 0 : self.next_inner().await
343 0 : }
344 : }
345 :
346 : #[cfg(test)]
347 : mod tests {
348 : use itertools::Itertools;
349 : use pageserver_api::key::Key;
350 : #[cfg(feature = "testing")]
351 : use pageserver_api::record::NeonWalRecord;
352 : use utils::lsn::Lsn;
353 :
354 : use super::*;
355 : use crate::DEFAULT_PG_VERSION;
356 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
357 : #[cfg(feature = "testing")]
358 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
359 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
360 :
361 16 : async fn assert_merge_iter_equal(
362 16 : merge_iter: &mut MergeIterator<'_>,
363 16 : expect: &[(Key, Lsn, Value)],
364 16 : ) {
365 16 : let mut expect_iter = expect.iter();
366 : loop {
367 12160 : let o1 = merge_iter.next().await.unwrap();
368 12160 : let o2 = expect_iter.next();
369 12160 : assert_eq!(o1.is_some(), o2.is_some());
370 12160 : if o1.is_none() && o2.is_none() {
371 16 : break;
372 12144 : }
373 12144 : let (k1, l1, v1) = o1.unwrap();
374 12144 : let (k2, l2, v2) = o2.unwrap();
375 12144 : assert_eq!(&k1, k2);
376 12144 : assert_eq!(l1, *l2);
377 12144 : assert_eq!(&v1, v2);
378 : }
379 16 : }
380 :
381 : #[tokio::test]
382 4 : async fn merge_in_between() {
383 4 : use bytes::Bytes;
384 4 : use pageserver_api::value::Value;
385 4 :
386 4 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
387 4 : .await
388 4 : .unwrap();
389 4 : let (tenant, ctx) = harness.load().await;
390 4 :
391 4 : let tline = tenant
392 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
393 4 : .await
394 4 : .unwrap();
395 4 :
396 16 : fn get_key(id: u32) -> Key {
397 16 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
398 16 : key.field6 = id;
399 16 : key
400 16 : }
401 4 : let test_deltas1 = vec![
402 4 : (
403 4 : get_key(0),
404 4 : Lsn(0x10),
405 4 : Value::Image(Bytes::copy_from_slice(b"test")),
406 4 : ),
407 4 : (
408 4 : get_key(5),
409 4 : Lsn(0x10),
410 4 : Value::Image(Bytes::copy_from_slice(b"test")),
411 4 : ),
412 4 : ];
413 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
414 4 : .await
415 4 : .unwrap();
416 4 : let test_deltas2 = vec![
417 4 : (
418 4 : get_key(3),
419 4 : Lsn(0x10),
420 4 : Value::Image(Bytes::copy_from_slice(b"test")),
421 4 : ),
422 4 : (
423 4 : get_key(4),
424 4 : Lsn(0x10),
425 4 : Value::Image(Bytes::copy_from_slice(b"test")),
426 4 : ),
427 4 : ];
428 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
429 4 : .await
430 4 : .unwrap();
431 4 : let mut merge_iter = MergeIterator::create(
432 4 : &[
433 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
434 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
435 4 : ],
436 4 : &[],
437 4 : &ctx,
438 4 : );
439 4 : let mut expect = Vec::new();
440 4 : expect.extend(test_deltas1);
441 4 : expect.extend(test_deltas2);
442 4 : expect.sort_by(sort_delta);
443 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
444 4 : }
445 :
446 : #[tokio::test]
447 4 : async fn delta_merge() {
448 4 : use bytes::Bytes;
449 4 : use pageserver_api::value::Value;
450 4 :
451 4 : let harness = TenantHarness::create("merge_iterator_delta_merge")
452 4 : .await
453 4 : .unwrap();
454 4 : let (tenant, ctx) = harness.load().await;
455 4 :
456 4 : let tline = tenant
457 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
458 4 : .await
459 4 : .unwrap();
460 4 :
461 12000 : fn get_key(id: u32) -> Key {
462 12000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
463 12000 : key.field6 = id;
464 12000 : key
465 12000 : }
466 4 : const N: usize = 1000;
467 4 : let test_deltas1 = (0..N)
468 4000 : .map(|idx| {
469 4000 : (
470 4000 : get_key(idx as u32 / 10),
471 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
472 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
473 4000 : )
474 4000 : })
475 4 : .collect_vec();
476 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
477 4 : .await
478 4 : .unwrap();
479 4 : let test_deltas2 = (0..N)
480 4000 : .map(|idx| {
481 4000 : (
482 4000 : get_key(idx as u32 / 10),
483 4000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
484 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
485 4000 : )
486 4000 : })
487 4 : .collect_vec();
488 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
489 4 : .await
490 4 : .unwrap();
491 4 : let test_deltas3 = (0..N)
492 4000 : .map(|idx| {
493 4000 : (
494 4000 : get_key(idx as u32 / 10 + N as u32),
495 4000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
496 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
497 4000 : )
498 4000 : })
499 4 : .collect_vec();
500 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
501 4 : .await
502 4 : .unwrap();
503 4 : let mut merge_iter = MergeIterator::create(
504 4 : &[
505 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
506 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
507 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
508 4 : ],
509 4 : &[],
510 4 : &ctx,
511 4 : );
512 4 : let mut expect = Vec::new();
513 4 : expect.extend(test_deltas1);
514 4 : expect.extend(test_deltas2);
515 4 : expect.extend(test_deltas3);
516 4 : expect.sort_by(sort_delta);
517 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
518 4 :
519 4 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
520 4 : }
521 :
522 : #[cfg(feature = "testing")]
523 : #[tokio::test]
524 4 : async fn delta_image_mixed_merge() {
525 4 : use bytes::Bytes;
526 4 : use pageserver_api::value::Value;
527 4 :
528 4 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
529 4 : .await
530 4 : .unwrap();
531 4 : let (tenant, ctx) = harness.load().await;
532 4 :
533 4 : let tline = tenant
534 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
535 4 : .await
536 4 : .unwrap();
537 4 :
538 36 : fn get_key(id: u32) -> Key {
539 36 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
540 36 : key.field6 = id;
541 36 : key
542 36 : }
543 4 : // In this test case, we want to test if the iterator still works correctly with multiple copies
544 4 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
545 4 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
546 4 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
547 4 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
548 4 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
549 4 : // correctly process these situations and return everything as-is, and the upper layer of the system
550 4 : // will handle duplicated LSNs.
551 4 : let test_deltas1 = vec![
552 4 : (
553 4 : get_key(0),
554 4 : Lsn(0x10),
555 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
556 4 : ),
557 4 : (
558 4 : get_key(0),
559 4 : Lsn(0x18),
560 4 : Value::WalRecord(NeonWalRecord::wal_append("a")),
561 4 : ),
562 4 : (
563 4 : get_key(5),
564 4 : Lsn(0x10),
565 4 : Value::WalRecord(NeonWalRecord::wal_init("")),
566 4 : ),
567 4 : (
568 4 : get_key(5),
569 4 : Lsn(0x18),
570 4 : Value::WalRecord(NeonWalRecord::wal_append("b")),
571 4 : ),
572 4 : ];
573 4 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
574 4 : .await
575 4 : .unwrap();
576 4 : let mut test_deltas2 = test_deltas1.clone();
577 4 : test_deltas2.push((
578 4 : get_key(10),
579 4 : Lsn(0x20),
580 4 : Value::Image(Bytes::copy_from_slice(b"test")),
581 4 : ));
582 4 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
583 4 : .await
584 4 : .unwrap();
585 4 : let test_deltas3 = vec![
586 4 : (
587 4 : get_key(0),
588 4 : Lsn(0x10),
589 4 : Value::Image(Bytes::copy_from_slice(b"")),
590 4 : ),
591 4 : (
592 4 : get_key(5),
593 4 : Lsn(0x18),
594 4 : Value::Image(Bytes::copy_from_slice(b"b")),
595 4 : ),
596 4 : (
597 4 : get_key(15),
598 4 : Lsn(0x20),
599 4 : Value::Image(Bytes::copy_from_slice(b"test")),
600 4 : ),
601 4 : ];
602 4 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
603 4 : .await
604 4 : .unwrap();
605 4 : let mut test_deltas4 = test_deltas3.clone();
606 4 : test_deltas4.push((
607 4 : get_key(20),
608 4 : Lsn(0x20),
609 4 : Value::Image(Bytes::copy_from_slice(b"test")),
610 4 : ));
611 4 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
612 4 : .await
613 4 : .unwrap();
614 4 : let mut expect = Vec::new();
615 4 : expect.extend(test_deltas1);
616 4 : expect.extend(test_deltas2);
617 4 : expect.extend(test_deltas3);
618 4 : expect.extend(test_deltas4);
619 4 : expect.sort_by(sort_delta_value);
620 4 :
621 4 : // Test with different layer order for MergeIterator::create to ensure the order
622 4 : // is stable.
623 4 :
624 4 : let mut merge_iter = MergeIterator::create(
625 4 : &[
626 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
627 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
628 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
629 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
630 4 : ],
631 4 : &[],
632 4 : &ctx,
633 4 : );
634 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
635 4 :
636 4 : let mut merge_iter = MergeIterator::create(
637 4 : &[
638 4 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
639 4 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
640 4 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
641 4 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
642 4 : ],
643 4 : &[],
644 4 : &ctx,
645 4 : );
646 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
647 4 :
648 4 : is_send(merge_iter);
649 4 : }
650 :
651 : #[cfg(feature = "testing")]
652 4 : fn is_send(_: impl Send) {}
653 : }
|