Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use utils::lsn::Lsn;
8 : use wal_decoder::models::value::Value;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 295 : fn iter_with_options(
23 295 : self,
24 295 : ctx: &'a RequestContext,
25 295 : max_read_size: u64,
26 295 : max_batch_size: usize,
27 295 : ) -> LayerIterRef<'a> {
28 295 : match self {
29 35 : Self::Image(x) => {
30 35 : LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
31 : }
32 260 : Self::Delta(x) => {
33 260 : LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
34 : }
35 : }
36 295 : }
37 :
38 0 : fn layer_dbg_info(&self) -> String {
39 0 : match self {
40 0 : Self::Image(x) => x.layer_dbg_info(),
41 0 : Self::Delta(x) => x.layer_dbg_info(),
42 : }
43 0 : }
44 : }
45 :
46 : enum LayerIterRef<'a> {
47 : Image(ImageLayerIterator<'a>),
48 : Delta(DeltaLayerIterator<'a>),
49 : }
50 :
51 : impl LayerIterRef<'_> {
52 1036307 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
53 1036307 : match self {
54 1035907 : Self::Delta(x) => x.next().await,
55 400 : Self::Image(x) => x.next().await,
56 : }
57 1036307 : }
58 :
59 0 : fn layer_dbg_info(&self) -> String {
60 0 : match self {
61 0 : Self::Image(x) => x.layer_dbg_info(),
62 0 : Self::Delta(x) => x.layer_dbg_info(),
63 : }
64 0 : }
65 : }
66 :
67 : /// This type plays several roles at once
68 : /// 1. Unified iterator for image and delta layers.
69 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
70 : /// 3. Lazy creation of the real delta/image iterator.
71 : #[allow(clippy::large_enum_variant, reason = "TODO")]
72 : pub(crate) enum IteratorWrapper<'a> {
73 : NotLoaded {
74 : ctx: &'a RequestContext,
75 : first_key_lower_bound: (Key, Lsn),
76 : layer: LayerRef<'a>,
77 : source_desc: Arc<PersistentLayerKey>,
78 : max_read_size: u64,
79 : max_batch_size: usize,
80 : },
81 : Loaded {
82 : iter: PeekableLayerIterRef<'a>,
83 : source_desc: Arc<PersistentLayerKey>,
84 : },
85 : }
86 :
87 : pub(crate) struct PeekableLayerIterRef<'a> {
88 : iter: LayerIterRef<'a>,
89 : peeked: Option<(Key, Lsn, Value)>, // None == end
90 : }
91 :
92 : impl<'a> PeekableLayerIterRef<'a> {
93 295 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
94 295 : let peeked = iter.next().await?;
95 295 : Ok(Self { iter, peeked })
96 295 : }
97 :
98 4140847 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
99 4140847 : &self.peeked
100 4140847 : }
101 :
102 1036012 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
103 1036012 : let result = self.peeked.take();
104 1036012 : self.peeked = self.iter.next().await?;
105 1036012 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
106 1035428 : if (k1, l1) < (k2, l2) {
107 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
108 1035428 : }
109 584 : }
110 1036012 : Ok(result)
111 1036012 : }
112 : }
113 :
114 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
115 0 : fn eq(&self, other: &Self) -> bool {
116 0 : self.cmp(other) == Ordering::Equal
117 0 : }
118 : }
119 :
120 : impl std::cmp::Eq for IteratorWrapper<'_> {}
121 :
122 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
123 2072521 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124 2072521 : Some(self.cmp(other))
125 2072521 : }
126 : }
127 :
128 : impl std::cmp::Ord for IteratorWrapper<'_> {
129 2072521 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
130 : use std::cmp::Ordering;
131 2072521 : let a = self.peek_next_key_lsn_value();
132 2072521 : let b = other.peek_next_key_lsn_value();
133 2072521 : match (a, b) {
134 1468509 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
135 2937018 : fn map_value_to_num(val: &Option<&Value>) -> usize {
136 2932612 : match val {
137 4406 : None => 0,
138 2932354 : Some(Value::Image(_)) => 1,
139 258 : Some(Value::WalRecord(_)) => 2,
140 : }
141 2937018 : }
142 1468509 : let order_1 = map_value_to_num(&v1);
143 1468509 : let order_2 = map_value_to_num(&v2);
144 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
145 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
146 1468509 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
147 : }
148 501462 : (Some(_), None) => Ordering::Less,
149 685 : (None, Some(_)) => Ordering::Greater,
150 101865 : (None, None) => Ordering::Equal,
151 : }
152 2072521 : .reverse()
153 2072521 : }
154 : }
155 :
156 : impl<'a> IteratorWrapper<'a> {
157 35 : pub fn create_from_image_layer(
158 35 : image_layer: &'a ImageLayerInner,
159 35 : ctx: &'a RequestContext,
160 35 : max_read_size: u64,
161 35 : max_batch_size: usize,
162 35 : ) -> Self {
163 35 : Self::NotLoaded {
164 35 : layer: LayerRef::Image(image_layer),
165 35 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
166 35 : ctx,
167 35 : source_desc: PersistentLayerKey {
168 35 : key_range: image_layer.key_range().clone(),
169 35 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
170 35 : is_delta: false,
171 35 : }
172 35 : .into(),
173 35 : max_read_size,
174 35 : max_batch_size,
175 35 : }
176 35 : }
177 :
178 260 : pub fn create_from_delta_layer(
179 260 : delta_layer: &'a DeltaLayerInner,
180 260 : ctx: &'a RequestContext,
181 260 : max_read_size: u64,
182 260 : max_batch_size: usize,
183 260 : ) -> Self {
184 260 : Self::NotLoaded {
185 260 : layer: LayerRef::Delta(delta_layer),
186 260 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
187 260 : ctx,
188 260 : source_desc: PersistentLayerKey {
189 260 : key_range: delta_layer.key_range().clone(),
190 260 : lsn_range: delta_layer.lsn_range().clone(),
191 260 : is_delta: true,
192 260 : }
193 260 : .into(),
194 260 : max_read_size,
195 260 : max_batch_size,
196 260 : }
197 260 : }
198 :
199 4145042 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
200 4145042 : match self {
201 4140552 : Self::Loaded { iter, .. } => iter
202 4140552 : .peek()
203 4140552 : .as_ref()
204 4140552 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
205 4490 : Self::NotLoaded {
206 4490 : first_key_lower_bound: (key, lsn),
207 4490 : ..
208 4490 : } => Some((key, *lsn, None)),
209 : }
210 4145042 : }
211 :
212 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
213 : //
214 : // The reason is that `impl Ord for Self` evaluates differently after this function
215 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
216 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
217 : // and not just `PeekMut::deref`
218 : // If we don't take `&mut self`
219 295 : async fn load(&mut self) -> anyhow::Result<()> {
220 295 : assert!(!self.is_loaded());
221 295 : let Self::NotLoaded {
222 295 : ctx,
223 295 : first_key_lower_bound,
224 295 : layer,
225 295 : source_desc,
226 295 : max_read_size,
227 295 : max_batch_size,
228 295 : } = self
229 : else {
230 0 : unreachable!()
231 : };
232 295 : let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
233 295 : let iter = PeekableLayerIterRef::create(iter).await?;
234 295 : if let Some((k1, l1, _)) = iter.peek() {
235 295 : let (k2, l2) = first_key_lower_bound;
236 295 : if (k1, l1) < (k2, l2) {
237 0 : bail!(
238 0 : "layer key range did not include the first key in the layer: {}",
239 0 : layer.layer_dbg_info()
240 : );
241 295 : }
242 0 : }
243 295 : *self = Self::Loaded {
244 295 : iter,
245 295 : source_desc: source_desc.clone(),
246 295 : };
247 295 : Ok(())
248 295 : }
249 :
250 1036602 : fn is_loaded(&self) -> bool {
251 1036602 : matches!(self, Self::Loaded { .. })
252 1036602 : }
253 :
254 : /// Correctness: must load the iterator before using.
255 : ///
256 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
257 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
258 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
259 1036012 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
260 1036012 : let Self::Loaded { iter, .. } = self else {
261 0 : panic!("must load the iterator before using")
262 : };
263 1036012 : iter.next().await
264 1036012 : }
265 :
266 : /// Get the persistent layer key corresponding to this iterator
267 470 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
268 470 : match self {
269 470 : Self::Loaded { source_desc, .. } => source_desc.clone(),
270 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
271 : }
272 470 : }
273 : }
274 :
275 : /// A merge iterator over delta/image layer iterators.
276 : ///
277 : /// When duplicated records are found, the iterator will not perform any
278 : /// deduplication, and the caller should handle these situation. By saying
279 : /// duplicated records, there are many possibilities:
280 : ///
281 : /// * Two same delta at the same LSN.
282 : /// * Two same image at the same LSN.
283 : /// * Delta/image at the same LSN where the image has already applied the delta.
284 : ///
285 : /// The iterator will always put the image before the delta.
286 : pub struct MergeIterator<'a> {
287 : heap: BinaryHeap<IteratorWrapper<'a>>,
288 : }
289 :
290 : pub(crate) trait MergeIteratorItem {
291 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
292 :
293 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
294 : }
295 :
296 : impl MergeIteratorItem for (Key, Lsn, Value) {
297 1035251 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
298 1035251 : item
299 1035251 : }
300 :
301 395 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
302 395 : self
303 395 : }
304 : }
305 :
306 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
307 470 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
308 470 : (item, iter.trace_source().clone())
309 470 : }
310 :
311 990 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
312 990 : &self.0
313 990 : }
314 : }
315 :
316 : impl<'a> MergeIterator<'a> {
317 : #[cfg(test)]
318 6 : pub(crate) fn create_for_testing(
319 6 : deltas: &[&'a DeltaLayerInner],
320 6 : images: &[&'a ImageLayerInner],
321 6 : ctx: &'a RequestContext,
322 6 : ) -> Self {
323 6 : Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
324 6 : }
325 :
326 : /// Create a new merge iterator with custom options.
327 : ///
328 : /// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
329 : /// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
330 : /// the buffer does not take too much memory.
331 : ///
332 : /// The default options for L0 compactions are:
333 : /// - max_read_size: 1024 * 8192 (8MB)
334 : /// - max_batch_size: 1024
335 : ///
336 : /// The default options for gc-compaction are:
337 : /// - max_read_size: 128 * 8192 (1MB)
338 : /// - max_batch_size: 128
339 56 : pub fn create_with_options(
340 56 : deltas: &[&'a DeltaLayerInner],
341 56 : images: &[&'a ImageLayerInner],
342 56 : ctx: &'a RequestContext,
343 56 : max_read_size: u64,
344 56 : max_batch_size: usize,
345 56 : ) -> Self {
346 56 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
347 91 : for image in images {
348 35 : heap.push(IteratorWrapper::create_from_image_layer(
349 35 : image,
350 35 : ctx,
351 35 : max_read_size,
352 35 : max_batch_size,
353 35 : ));
354 35 : }
355 316 : for delta in deltas {
356 260 : heap.push(IteratorWrapper::create_from_delta_layer(
357 260 : delta,
358 260 : ctx,
359 260 : max_read_size,
360 260 : max_batch_size,
361 260 : ));
362 260 : }
363 56 : Self {
364 56 : heap: BinaryHeap::from(heap),
365 56 : }
366 56 : }
367 :
368 1035775 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
369 1036361 : while let Some(mut iter) = self.heap.peek_mut() {
370 1036307 : if !iter.is_loaded() {
371 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
372 : // We put it back into the heap so that a potentially unloaded layer may have a key between
373 : // [potential_first_key, loaded_first_key).
374 295 : iter.load().await?;
375 295 : continue;
376 1036012 : }
377 1036012 : let Some(item) = iter.next().await? else {
378 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
379 : // we order None > Some, and all the rest of the iterators should return None.
380 291 : binary_heap::PeekMut::pop(iter);
381 291 : continue;
382 : };
383 1035721 : return Ok(Some(R::new(item, &iter)));
384 : }
385 54 : Ok(None)
386 1035775 : }
387 :
388 : /// Get the next key-value pair from the iterator.
389 1035082 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
390 1035082 : self.next_inner().await
391 1035082 : }
392 :
393 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
394 0 : pub async fn next_with_trace(
395 0 : &mut self,
396 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
397 0 : self.next_inner().await
398 0 : }
399 : }
400 :
401 : #[cfg(test)]
402 : mod tests {
403 : use itertools::Itertools;
404 : use pageserver_api::key::Key;
405 : use utils::lsn::Lsn;
406 : #[cfg(feature = "testing")]
407 : use wal_decoder::models::record::NeonWalRecord;
408 :
409 : use super::*;
410 : use crate::DEFAULT_PG_VERSION;
411 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
412 : #[cfg(feature = "testing")]
413 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
414 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
415 :
416 4 : async fn assert_merge_iter_equal(
417 4 : merge_iter: &mut MergeIterator<'_>,
418 4 : expect: &[(Key, Lsn, Value)],
419 4 : ) {
420 4 : let mut expect_iter = expect.iter();
421 : loop {
422 3040 : let o1 = merge_iter.next().await.unwrap();
423 3040 : let o2 = expect_iter.next();
424 3040 : assert_eq!(o1.is_some(), o2.is_some());
425 3040 : if o1.is_none() && o2.is_none() {
426 4 : break;
427 3036 : }
428 3036 : let (k1, l1, v1) = o1.unwrap();
429 3036 : let (k2, l2, v2) = o2.unwrap();
430 3036 : assert_eq!(&k1, k2);
431 3036 : assert_eq!(l1, *l2);
432 3036 : assert_eq!(&v1, v2);
433 : }
434 4 : }
435 :
436 : #[tokio::test]
437 1 : async fn merge_in_between() {
438 : use bytes::Bytes;
439 :
440 1 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
441 1 : .await
442 1 : .unwrap();
443 1 : let (tenant, ctx) = harness.load().await;
444 :
445 1 : let tline = tenant
446 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
447 1 : .await
448 1 : .unwrap();
449 :
450 4 : fn get_key(id: u32) -> Key {
451 4 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
452 4 : key.field6 = id;
453 4 : key
454 4 : }
455 1 : let test_deltas1 = vec![
456 1 : (
457 1 : get_key(0),
458 1 : Lsn(0x10),
459 1 : Value::Image(Bytes::copy_from_slice(b"test")),
460 1 : ),
461 1 : (
462 1 : get_key(5),
463 1 : Lsn(0x10),
464 1 : Value::Image(Bytes::copy_from_slice(b"test")),
465 1 : ),
466 : ];
467 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
468 1 : .await
469 1 : .unwrap();
470 1 : let test_deltas2 = vec![
471 1 : (
472 1 : get_key(3),
473 1 : Lsn(0x10),
474 1 : Value::Image(Bytes::copy_from_slice(b"test")),
475 1 : ),
476 1 : (
477 1 : get_key(4),
478 1 : Lsn(0x10),
479 1 : Value::Image(Bytes::copy_from_slice(b"test")),
480 1 : ),
481 : ];
482 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
483 1 : .await
484 1 : .unwrap();
485 1 : let mut merge_iter = MergeIterator::create_for_testing(
486 : &[
487 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
488 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
489 : ],
490 1 : &[],
491 1 : &ctx,
492 : );
493 1 : let mut expect = Vec::new();
494 1 : expect.extend(test_deltas1);
495 1 : expect.extend(test_deltas2);
496 1 : expect.sort_by(sort_delta);
497 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
498 1 : }
499 :
500 : #[tokio::test]
501 1 : async fn delta_merge() {
502 : use bytes::Bytes;
503 :
504 1 : let harness = TenantHarness::create("merge_iterator_delta_merge")
505 1 : .await
506 1 : .unwrap();
507 1 : let (tenant, ctx) = harness.load().await;
508 :
509 1 : let tline = tenant
510 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
511 1 : .await
512 1 : .unwrap();
513 :
514 3000 : fn get_key(id: u32) -> Key {
515 3000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
516 3000 : key.field6 = id;
517 3000 : key
518 3000 : }
519 : const N: usize = 1000;
520 1 : let test_deltas1 = (0..N)
521 1000 : .map(|idx| {
522 1000 : (
523 1000 : get_key(idx as u32 / 10),
524 1000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
525 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
526 1000 : )
527 1000 : })
528 1 : .collect_vec();
529 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
530 1 : .await
531 1 : .unwrap();
532 1 : let test_deltas2 = (0..N)
533 1000 : .map(|idx| {
534 1000 : (
535 1000 : get_key(idx as u32 / 10),
536 1000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
537 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
538 1000 : )
539 1000 : })
540 1 : .collect_vec();
541 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
542 1 : .await
543 1 : .unwrap();
544 1 : let test_deltas3 = (0..N)
545 1000 : .map(|idx| {
546 1000 : (
547 1000 : get_key(idx as u32 / 10 + N as u32),
548 1000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
549 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
550 1000 : )
551 1000 : })
552 1 : .collect_vec();
553 1 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
554 1 : .await
555 1 : .unwrap();
556 1 : let mut merge_iter = MergeIterator::create_for_testing(
557 : &[
558 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
559 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
560 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
561 : ],
562 1 : &[],
563 1 : &ctx,
564 : );
565 1 : let mut expect = Vec::new();
566 1 : expect.extend(test_deltas1);
567 1 : expect.extend(test_deltas2);
568 1 : expect.extend(test_deltas3);
569 1 : expect.sort_by(sort_delta);
570 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
571 :
572 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
573 1 : }
574 :
575 : #[cfg(feature = "testing")]
576 : #[tokio::test]
577 1 : async fn delta_image_mixed_merge() {
578 : use bytes::Bytes;
579 :
580 1 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
581 1 : .await
582 1 : .unwrap();
583 1 : let (tenant, ctx) = harness.load().await;
584 :
585 1 : let tline = tenant
586 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
587 1 : .await
588 1 : .unwrap();
589 :
590 9 : fn get_key(id: u32) -> Key {
591 9 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
592 9 : key.field6 = id;
593 9 : key
594 9 : }
595 : // In this test case, we want to test if the iterator still works correctly with multiple copies
596 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
597 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
598 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
599 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
600 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
601 : // correctly process these situations and return everything as-is, and the upper layer of the system
602 : // will handle duplicated LSNs.
603 1 : let test_deltas1 = vec![
604 1 : (
605 1 : get_key(0),
606 1 : Lsn(0x10),
607 1 : Value::WalRecord(NeonWalRecord::wal_init("")),
608 1 : ),
609 1 : (
610 1 : get_key(0),
611 1 : Lsn(0x18),
612 1 : Value::WalRecord(NeonWalRecord::wal_append("a")),
613 1 : ),
614 1 : (
615 1 : get_key(5),
616 1 : Lsn(0x10),
617 1 : Value::WalRecord(NeonWalRecord::wal_init("")),
618 1 : ),
619 1 : (
620 1 : get_key(5),
621 1 : Lsn(0x18),
622 1 : Value::WalRecord(NeonWalRecord::wal_append("b")),
623 1 : ),
624 : ];
625 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
626 1 : .await
627 1 : .unwrap();
628 1 : let mut test_deltas2 = test_deltas1.clone();
629 1 : test_deltas2.push((
630 1 : get_key(10),
631 1 : Lsn(0x20),
632 1 : Value::Image(Bytes::copy_from_slice(b"test")),
633 1 : ));
634 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
635 1 : .await
636 1 : .unwrap();
637 1 : let test_deltas3 = vec![
638 1 : (
639 1 : get_key(0),
640 1 : Lsn(0x10),
641 1 : Value::Image(Bytes::copy_from_slice(b"")),
642 1 : ),
643 1 : (
644 1 : get_key(5),
645 1 : Lsn(0x18),
646 1 : Value::Image(Bytes::copy_from_slice(b"b")),
647 1 : ),
648 1 : (
649 1 : get_key(15),
650 1 : Lsn(0x20),
651 1 : Value::Image(Bytes::copy_from_slice(b"test")),
652 1 : ),
653 : ];
654 1 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
655 1 : .await
656 1 : .unwrap();
657 1 : let mut test_deltas4 = test_deltas3.clone();
658 1 : test_deltas4.push((
659 1 : get_key(20),
660 1 : Lsn(0x20),
661 1 : Value::Image(Bytes::copy_from_slice(b"test")),
662 1 : ));
663 1 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
664 1 : .await
665 1 : .unwrap();
666 1 : let mut expect = Vec::new();
667 1 : expect.extend(test_deltas1);
668 1 : expect.extend(test_deltas2);
669 1 : expect.extend(test_deltas3);
670 1 : expect.extend(test_deltas4);
671 1 : expect.sort_by(sort_delta_value);
672 :
673 : // Test with different layer order for MergeIterator::create to ensure the order
674 : // is stable.
675 :
676 1 : let mut merge_iter = MergeIterator::create_for_testing(
677 : &[
678 1 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
679 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
680 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
681 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
682 : ],
683 1 : &[],
684 1 : &ctx,
685 : );
686 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
687 :
688 1 : let mut merge_iter = MergeIterator::create_for_testing(
689 : &[
690 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
691 1 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
692 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
693 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
694 : ],
695 1 : &[],
696 1 : &ctx,
697 : );
698 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
699 :
700 1 : is_send(merge_iter);
701 1 : }
702 :
703 : #[cfg(feature = "testing")]
704 1 : fn is_send(_: impl Send) {}
705 : }
|