Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::value::Value;
8 : use utils::lsn::Lsn;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 : #[allow(dead_code)]
23 0 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
24 0 : match self {
25 0 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
26 0 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
27 : }
28 0 : }
29 :
30 3540 : fn iter_with_options(
31 3540 : self,
32 3540 : ctx: &'a RequestContext,
33 3540 : max_read_size: u64,
34 3540 : max_batch_size: usize,
35 3540 : ) -> LayerIterRef<'a> {
36 3540 : match self {
37 420 : Self::Image(x) => {
38 420 : LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
39 : }
40 3120 : Self::Delta(x) => {
41 3120 : LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
42 : }
43 : }
44 3540 : }
45 :
46 0 : fn layer_dbg_info(&self) -> String {
47 0 : match self {
48 0 : Self::Image(x) => x.layer_dbg_info(),
49 0 : Self::Delta(x) => x.layer_dbg_info(),
50 : }
51 0 : }
52 : }
53 :
54 : enum LayerIterRef<'a> {
55 : Image(ImageLayerIterator<'a>),
56 : Delta(DeltaLayerIterator<'a>),
57 : }
58 :
59 : impl LayerIterRef<'_> {
60 12435684 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
61 12435684 : match self {
62 12430884 : Self::Delta(x) => x.next().await,
63 4800 : Self::Image(x) => x.next().await,
64 : }
65 12435684 : }
66 :
67 0 : fn layer_dbg_info(&self) -> String {
68 0 : match self {
69 0 : Self::Image(x) => x.layer_dbg_info(),
70 0 : Self::Delta(x) => x.layer_dbg_info(),
71 : }
72 0 : }
73 : }
74 :
75 : /// This type plays several roles at once
76 : /// 1. Unified iterator for image and delta layers.
77 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
78 : /// 3. Lazy creation of the real delta/image iterator.
79 : #[allow(clippy::large_enum_variant, reason = "TODO")]
80 : pub(crate) enum IteratorWrapper<'a> {
81 : NotLoaded {
82 : ctx: &'a RequestContext,
83 : first_key_lower_bound: (Key, Lsn),
84 : layer: LayerRef<'a>,
85 : source_desc: Arc<PersistentLayerKey>,
86 : max_read_size: u64,
87 : max_batch_size: usize,
88 : },
89 : Loaded {
90 : iter: PeekableLayerIterRef<'a>,
91 : source_desc: Arc<PersistentLayerKey>,
92 : },
93 : }
94 :
95 : pub(crate) struct PeekableLayerIterRef<'a> {
96 : iter: LayerIterRef<'a>,
97 : peeked: Option<(Key, Lsn, Value)>, // None == end
98 : }
99 :
100 : impl<'a> PeekableLayerIterRef<'a> {
101 3540 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
102 3540 : let peeked = iter.next().await?;
103 3540 : Ok(Self { iter, peeked })
104 3540 : }
105 :
106 50708758 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
107 50708758 : &self.peeked
108 50708758 : }
109 :
110 12432144 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
111 12432144 : let result = self.peeked.take();
112 12432144 : self.peeked = self.iter.next().await?;
113 12432144 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
114 12425136 : if (k1, l1) < (k2, l2) {
115 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
116 12425136 : }
117 7008 : }
118 12432144 : Ok(result)
119 12432144 : }
120 : }
121 :
122 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
123 0 : fn eq(&self, other: &Self) -> bool {
124 0 : self.cmp(other) == Ordering::Equal
125 0 : }
126 : }
127 :
128 : impl std::cmp::Eq for IteratorWrapper<'_> {}
129 :
130 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
131 25379951 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
132 25379951 : Some(self.cmp(other))
133 25379951 : }
134 : }
135 :
136 : impl std::cmp::Ord for IteratorWrapper<'_> {
137 25379951 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
138 : use std::cmp::Ordering;
139 25379951 : let a = self.peek_next_key_lsn_value();
140 25379951 : let b = other.peek_next_key_lsn_value();
141 25379951 : match (a, b) {
142 18131072 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
143 36262144 : fn map_value_to_num(val: &Option<&Value>) -> usize {
144 36208468 : match val {
145 53676 : None => 0,
146 36205372 : Some(Value::Image(_)) => 1,
147 3096 : Some(Value::WalRecord(_)) => 2,
148 : }
149 36262144 : }
150 18131072 : let order_1 = map_value_to_num(&v1);
151 18131072 : let order_2 = map_value_to_num(&v2);
152 18131072 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
153 18131072 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
154 18131072 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
155 : }
156 6017731 : (Some(_), None) => Ordering::Less,
157 8426 : (None, Some(_)) => Ordering::Greater,
158 1222722 : (None, None) => Ordering::Equal,
159 : }
160 25379951 : .reverse()
161 25379951 : }
162 : }
163 :
164 : impl<'a> IteratorWrapper<'a> {
165 420 : pub fn create_from_image_layer(
166 420 : image_layer: &'a ImageLayerInner,
167 420 : ctx: &'a RequestContext,
168 420 : max_read_size: u64,
169 420 : max_batch_size: usize,
170 420 : ) -> Self {
171 420 : Self::NotLoaded {
172 420 : layer: LayerRef::Image(image_layer),
173 420 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
174 420 : ctx,
175 420 : source_desc: PersistentLayerKey {
176 420 : key_range: image_layer.key_range().clone(),
177 420 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
178 420 : is_delta: false,
179 420 : }
180 420 : .into(),
181 420 : max_read_size,
182 420 : max_batch_size,
183 420 : }
184 420 : }
185 :
186 3120 : pub fn create_from_delta_layer(
187 3120 : delta_layer: &'a DeltaLayerInner,
188 3120 : ctx: &'a RequestContext,
189 3120 : max_read_size: u64,
190 3120 : max_batch_size: usize,
191 3120 : ) -> Self {
192 3120 : Self::NotLoaded {
193 3120 : layer: LayerRef::Delta(delta_layer),
194 3120 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
195 3120 : ctx,
196 3120 : source_desc: PersistentLayerKey {
197 3120 : key_range: delta_layer.key_range().clone(),
198 3120 : lsn_range: delta_layer.lsn_range().clone(),
199 3120 : is_delta: true,
200 3120 : }
201 3120 : .into(),
202 3120 : max_read_size,
203 3120 : max_batch_size,
204 3120 : }
205 3120 : }
206 :
207 50759902 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
208 50759902 : match self {
209 50705218 : Self::Loaded { iter, .. } => iter
210 50705218 : .peek()
211 50705218 : .as_ref()
212 50705218 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
213 54684 : Self::NotLoaded {
214 54684 : first_key_lower_bound: (key, lsn),
215 54684 : ..
216 54684 : } => Some((key, *lsn, None)),
217 : }
218 50759902 : }
219 :
220 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
221 : //
222 : // The reason is that `impl Ord for Self` evaluates differently after this function
223 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
224 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
225 : // and not just `PeekMut::deref`
226 : // If we don't take `&mut self`
227 3540 : async fn load(&mut self) -> anyhow::Result<()> {
228 3540 : assert!(!self.is_loaded());
229 3540 : let Self::NotLoaded {
230 3540 : ctx,
231 3540 : first_key_lower_bound,
232 3540 : layer,
233 3540 : source_desc,
234 3540 : max_read_size,
235 3540 : max_batch_size,
236 3540 : } = self
237 : else {
238 0 : unreachable!()
239 : };
240 3540 : let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
241 3540 : let iter = PeekableLayerIterRef::create(iter).await?;
242 3540 : if let Some((k1, l1, _)) = iter.peek() {
243 3540 : let (k2, l2) = first_key_lower_bound;
244 3540 : if (k1, l1) < (k2, l2) {
245 0 : bail!(
246 0 : "layer key range did not include the first key in the layer: {}",
247 0 : layer.layer_dbg_info()
248 0 : );
249 3540 : }
250 0 : }
251 3540 : *self = Self::Loaded {
252 3540 : iter,
253 3540 : source_desc: source_desc.clone(),
254 3540 : };
255 3540 : Ok(())
256 3540 : }
257 :
258 12439224 : fn is_loaded(&self) -> bool {
259 12439224 : matches!(self, Self::Loaded { .. })
260 12439224 : }
261 :
262 : /// Correctness: must load the iterator before using.
263 : ///
264 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
265 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
266 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
267 12432144 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
268 12432144 : let Self::Loaded { iter, .. } = self else {
269 0 : panic!("must load the iterator before using")
270 : };
271 12432144 : iter.next().await
272 12432144 : }
273 :
274 : /// Get the persistent layer key corresponding to this iterator
275 5640 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
276 5640 : match self {
277 5640 : Self::Loaded { source_desc, .. } => source_desc.clone(),
278 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
279 : }
280 5640 : }
281 : }
282 :
283 : /// A merge iterator over delta/image layer iterators.
284 : ///
285 : /// When duplicated records are found, the iterator will not perform any
286 : /// deduplication, and the caller should handle these situation. By saying
287 : /// duplicated records, there are many possibilities:
288 : ///
289 : /// * Two same delta at the same LSN.
290 : /// * Two same image at the same LSN.
291 : /// * Delta/image at the same LSN where the image has already applied the delta.
292 : ///
293 : /// The iterator will always put the image before the delta.
294 : pub struct MergeIterator<'a> {
295 : heap: BinaryHeap<IteratorWrapper<'a>>,
296 : }
297 :
298 : pub(crate) trait MergeIteratorItem {
299 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
300 :
301 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
302 : }
303 :
304 : impl MergeIteratorItem for (Key, Lsn, Value) {
305 12423012 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
306 12423012 : item
307 12423012 : }
308 :
309 4740 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
310 4740 : self
311 4740 : }
312 : }
313 :
314 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
315 5640 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
316 5640 : (item, iter.trace_source().clone())
317 5640 : }
318 :
319 11880 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
320 11880 : &self.0
321 11880 : }
322 : }
323 :
324 : impl<'a> MergeIterator<'a> {
325 564 : pub fn create_with_options(
326 564 : deltas: &[&'a DeltaLayerInner],
327 564 : images: &[&'a ImageLayerInner],
328 564 : ctx: &'a RequestContext,
329 564 : max_read_size: u64,
330 564 : max_batch_size: usize,
331 564 : ) -> Self {
332 564 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
333 984 : for image in images {
334 420 : heap.push(IteratorWrapper::create_from_image_layer(
335 420 : image,
336 420 : ctx,
337 420 : max_read_size,
338 420 : max_batch_size,
339 420 : ));
340 420 : }
341 3684 : for delta in deltas {
342 3120 : heap.push(IteratorWrapper::create_from_delta_layer(
343 3120 : delta,
344 3120 : ctx,
345 3120 : max_read_size,
346 3120 : max_batch_size,
347 3120 : ));
348 3120 : }
349 564 : Self {
350 564 : heap: BinaryHeap::from(heap),
351 564 : }
352 564 : }
353 :
354 240 : pub fn create(
355 240 : deltas: &[&'a DeltaLayerInner],
356 240 : images: &[&'a ImageLayerInner],
357 240 : ctx: &'a RequestContext,
358 240 : ) -> Self {
359 240 : Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
360 240 : }
361 :
362 12429192 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
363 12436224 : while let Some(mut iter) = self.heap.peek_mut() {
364 12435684 : if !iter.is_loaded() {
365 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
366 : // We put it back into the heap so that a potentially unloaded layer may have a key between
367 : // [potential_first_key, loaded_first_key).
368 3540 : iter.load().await?;
369 3540 : continue;
370 12432144 : }
371 12432144 : let Some(item) = iter.next().await? else {
372 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
373 : // we order None > Some, and all the rest of the iterators should return None.
374 3492 : binary_heap::PeekMut::pop(iter);
375 3492 : continue;
376 : };
377 12428652 : return Ok(Some(R::new(item, &iter)));
378 : }
379 540 : Ok(None)
380 12429192 : }
381 :
382 : /// Get the next key-value pair from the iterator.
383 12420876 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
384 12420876 : self.next_inner().await
385 12420876 : }
386 :
387 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
388 0 : pub async fn next_with_trace(
389 0 : &mut self,
390 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
391 0 : self.next_inner().await
392 0 : }
393 : }
394 :
395 : #[cfg(test)]
396 : mod tests {
397 : use itertools::Itertools;
398 : use pageserver_api::key::Key;
399 : #[cfg(feature = "testing")]
400 : use pageserver_api::record::NeonWalRecord;
401 : use utils::lsn::Lsn;
402 :
403 : use super::*;
404 : use crate::DEFAULT_PG_VERSION;
405 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
406 : #[cfg(feature = "testing")]
407 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
408 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
409 :
410 48 : async fn assert_merge_iter_equal(
411 48 : merge_iter: &mut MergeIterator<'_>,
412 48 : expect: &[(Key, Lsn, Value)],
413 48 : ) {
414 48 : let mut expect_iter = expect.iter();
415 : loop {
416 36480 : let o1 = merge_iter.next().await.unwrap();
417 36480 : let o2 = expect_iter.next();
418 36480 : assert_eq!(o1.is_some(), o2.is_some());
419 36480 : if o1.is_none() && o2.is_none() {
420 48 : break;
421 36432 : }
422 36432 : let (k1, l1, v1) = o1.unwrap();
423 36432 : let (k2, l2, v2) = o2.unwrap();
424 36432 : assert_eq!(&k1, k2);
425 36432 : assert_eq!(l1, *l2);
426 36432 : assert_eq!(&v1, v2);
427 : }
428 48 : }
429 :
430 : #[tokio::test]
431 12 : async fn merge_in_between() {
432 12 : use bytes::Bytes;
433 12 : use pageserver_api::value::Value;
434 12 :
435 12 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
436 12 : .await
437 12 : .unwrap();
438 12 : let (tenant, ctx) = harness.load().await;
439 12 :
440 12 : let tline = tenant
441 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
442 12 : .await
443 12 : .unwrap();
444 12 :
445 48 : fn get_key(id: u32) -> Key {
446 48 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
447 48 : key.field6 = id;
448 48 : key
449 48 : }
450 12 : let test_deltas1 = vec![
451 12 : (
452 12 : get_key(0),
453 12 : Lsn(0x10),
454 12 : Value::Image(Bytes::copy_from_slice(b"test")),
455 12 : ),
456 12 : (
457 12 : get_key(5),
458 12 : Lsn(0x10),
459 12 : Value::Image(Bytes::copy_from_slice(b"test")),
460 12 : ),
461 12 : ];
462 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
463 12 : .await
464 12 : .unwrap();
465 12 : let test_deltas2 = vec![
466 12 : (
467 12 : get_key(3),
468 12 : Lsn(0x10),
469 12 : Value::Image(Bytes::copy_from_slice(b"test")),
470 12 : ),
471 12 : (
472 12 : get_key(4),
473 12 : Lsn(0x10),
474 12 : Value::Image(Bytes::copy_from_slice(b"test")),
475 12 : ),
476 12 : ];
477 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
478 12 : .await
479 12 : .unwrap();
480 12 : let mut merge_iter = MergeIterator::create(
481 12 : &[
482 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
483 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
484 12 : ],
485 12 : &[],
486 12 : &ctx,
487 12 : );
488 12 : let mut expect = Vec::new();
489 12 : expect.extend(test_deltas1);
490 12 : expect.extend(test_deltas2);
491 12 : expect.sort_by(sort_delta);
492 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
493 12 : }
494 :
495 : #[tokio::test]
496 12 : async fn delta_merge() {
497 12 : use bytes::Bytes;
498 12 : use pageserver_api::value::Value;
499 12 :
500 12 : let harness = TenantHarness::create("merge_iterator_delta_merge")
501 12 : .await
502 12 : .unwrap();
503 12 : let (tenant, ctx) = harness.load().await;
504 12 :
505 12 : let tline = tenant
506 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
507 12 : .await
508 12 : .unwrap();
509 12 :
510 36000 : fn get_key(id: u32) -> Key {
511 36000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
512 36000 : key.field6 = id;
513 36000 : key
514 36000 : }
515 12 : const N: usize = 1000;
516 12 : let test_deltas1 = (0..N)
517 12000 : .map(|idx| {
518 12000 : (
519 12000 : get_key(idx as u32 / 10),
520 12000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
521 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
522 12000 : )
523 12000 : })
524 12 : .collect_vec();
525 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
526 12 : .await
527 12 : .unwrap();
528 12 : let test_deltas2 = (0..N)
529 12000 : .map(|idx| {
530 12000 : (
531 12000 : get_key(idx as u32 / 10),
532 12000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
533 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
534 12000 : )
535 12000 : })
536 12 : .collect_vec();
537 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
538 12 : .await
539 12 : .unwrap();
540 12 : let test_deltas3 = (0..N)
541 12000 : .map(|idx| {
542 12000 : (
543 12000 : get_key(idx as u32 / 10 + N as u32),
544 12000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
545 12000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
546 12000 : )
547 12000 : })
548 12 : .collect_vec();
549 12 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
550 12 : .await
551 12 : .unwrap();
552 12 : let mut merge_iter = MergeIterator::create(
553 12 : &[
554 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
555 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
556 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
557 12 : ],
558 12 : &[],
559 12 : &ctx,
560 12 : );
561 12 : let mut expect = Vec::new();
562 12 : expect.extend(test_deltas1);
563 12 : expect.extend(test_deltas2);
564 12 : expect.extend(test_deltas3);
565 12 : expect.sort_by(sort_delta);
566 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
567 12 :
568 12 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
569 12 : }
570 :
571 : #[cfg(feature = "testing")]
572 : #[tokio::test]
573 12 : async fn delta_image_mixed_merge() {
574 12 : use bytes::Bytes;
575 12 : use pageserver_api::value::Value;
576 12 :
577 12 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
578 12 : .await
579 12 : .unwrap();
580 12 : let (tenant, ctx) = harness.load().await;
581 12 :
582 12 : let tline = tenant
583 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
584 12 : .await
585 12 : .unwrap();
586 12 :
587 108 : fn get_key(id: u32) -> Key {
588 108 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
589 108 : key.field6 = id;
590 108 : key
591 108 : }
592 12 : // In this test case, we want to test if the iterator still works correctly with multiple copies
593 12 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
594 12 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
595 12 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
596 12 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
597 12 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
598 12 : // correctly process these situations and return everything as-is, and the upper layer of the system
599 12 : // will handle duplicated LSNs.
600 12 : let test_deltas1 = vec![
601 12 : (
602 12 : get_key(0),
603 12 : Lsn(0x10),
604 12 : Value::WalRecord(NeonWalRecord::wal_init("")),
605 12 : ),
606 12 : (
607 12 : get_key(0),
608 12 : Lsn(0x18),
609 12 : Value::WalRecord(NeonWalRecord::wal_append("a")),
610 12 : ),
611 12 : (
612 12 : get_key(5),
613 12 : Lsn(0x10),
614 12 : Value::WalRecord(NeonWalRecord::wal_init("")),
615 12 : ),
616 12 : (
617 12 : get_key(5),
618 12 : Lsn(0x18),
619 12 : Value::WalRecord(NeonWalRecord::wal_append("b")),
620 12 : ),
621 12 : ];
622 12 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
623 12 : .await
624 12 : .unwrap();
625 12 : let mut test_deltas2 = test_deltas1.clone();
626 12 : test_deltas2.push((
627 12 : get_key(10),
628 12 : Lsn(0x20),
629 12 : Value::Image(Bytes::copy_from_slice(b"test")),
630 12 : ));
631 12 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
632 12 : .await
633 12 : .unwrap();
634 12 : let test_deltas3 = vec![
635 12 : (
636 12 : get_key(0),
637 12 : Lsn(0x10),
638 12 : Value::Image(Bytes::copy_from_slice(b"")),
639 12 : ),
640 12 : (
641 12 : get_key(5),
642 12 : Lsn(0x18),
643 12 : Value::Image(Bytes::copy_from_slice(b"b")),
644 12 : ),
645 12 : (
646 12 : get_key(15),
647 12 : Lsn(0x20),
648 12 : Value::Image(Bytes::copy_from_slice(b"test")),
649 12 : ),
650 12 : ];
651 12 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
652 12 : .await
653 12 : .unwrap();
654 12 : let mut test_deltas4 = test_deltas3.clone();
655 12 : test_deltas4.push((
656 12 : get_key(20),
657 12 : Lsn(0x20),
658 12 : Value::Image(Bytes::copy_from_slice(b"test")),
659 12 : ));
660 12 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
661 12 : .await
662 12 : .unwrap();
663 12 : let mut expect = Vec::new();
664 12 : expect.extend(test_deltas1);
665 12 : expect.extend(test_deltas2);
666 12 : expect.extend(test_deltas3);
667 12 : expect.extend(test_deltas4);
668 12 : expect.sort_by(sort_delta_value);
669 12 :
670 12 : // Test with different layer order for MergeIterator::create to ensure the order
671 12 : // is stable.
672 12 :
673 12 : let mut merge_iter = MergeIterator::create(
674 12 : &[
675 12 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
676 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
677 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
678 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
679 12 : ],
680 12 : &[],
681 12 : &ctx,
682 12 : );
683 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
684 12 :
685 12 : let mut merge_iter = MergeIterator::create(
686 12 : &[
687 12 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
688 12 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
689 12 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
690 12 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
691 12 : ],
692 12 : &[],
693 12 : &ctx,
694 12 : );
695 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
696 12 :
697 12 : is_send(merge_iter);
698 12 : }
699 :
700 : #[cfg(feature = "testing")]
701 12 : fn is_send(_: impl Send) {}
702 : }
|