Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, binary_heap};
3 : use std::sync::Arc;
4 :
5 : use anyhow::bail;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::value::Value;
8 : use utils::lsn::Lsn;
9 :
10 : use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
11 : use super::image_layer::{ImageLayerInner, ImageLayerIterator};
12 : use super::{PersistentLayerDesc, PersistentLayerKey};
13 : use crate::context::RequestContext;
14 :
15 : #[derive(Clone, Copy)]
16 : pub(crate) enum LayerRef<'a> {
17 : Image(&'a ImageLayerInner),
18 : Delta(&'a DeltaLayerInner),
19 : }
20 :
21 : impl<'a> LayerRef<'a> {
22 295 : fn iter_with_options(
23 295 : self,
24 295 : ctx: &'a RequestContext,
25 295 : max_read_size: u64,
26 295 : max_batch_size: usize,
27 295 : ) -> LayerIterRef<'a> {
28 295 : match self {
29 35 : Self::Image(x) => {
30 35 : LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
31 : }
32 260 : Self::Delta(x) => {
33 260 : LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
34 : }
35 : }
36 295 : }
37 :
38 0 : fn layer_dbg_info(&self) -> String {
39 0 : match self {
40 0 : Self::Image(x) => x.layer_dbg_info(),
41 0 : Self::Delta(x) => x.layer_dbg_info(),
42 : }
43 0 : }
44 : }
45 :
46 : enum LayerIterRef<'a> {
47 : Image(ImageLayerIterator<'a>),
48 : Delta(DeltaLayerIterator<'a>),
49 : }
50 :
51 : impl LayerIterRef<'_> {
52 1036307 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
53 1036307 : match self {
54 1035907 : Self::Delta(x) => x.next().await,
55 400 : Self::Image(x) => x.next().await,
56 : }
57 1036307 : }
58 :
59 0 : fn layer_dbg_info(&self) -> String {
60 0 : match self {
61 0 : Self::Image(x) => x.layer_dbg_info(),
62 0 : Self::Delta(x) => x.layer_dbg_info(),
63 : }
64 0 : }
65 : }
66 :
67 : /// This type plays several roles at once
68 : /// 1. Unified iterator for image and delta layers.
69 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
70 : /// 3. Lazy creation of the real delta/image iterator.
71 : #[allow(clippy::large_enum_variant, reason = "TODO")]
72 : pub(crate) enum IteratorWrapper<'a> {
73 : NotLoaded {
74 : ctx: &'a RequestContext,
75 : first_key_lower_bound: (Key, Lsn),
76 : layer: LayerRef<'a>,
77 : source_desc: Arc<PersistentLayerKey>,
78 : max_read_size: u64,
79 : max_batch_size: usize,
80 : },
81 : Loaded {
82 : iter: PeekableLayerIterRef<'a>,
83 : source_desc: Arc<PersistentLayerKey>,
84 : },
85 : }
86 :
87 : pub(crate) struct PeekableLayerIterRef<'a> {
88 : iter: LayerIterRef<'a>,
89 : peeked: Option<(Key, Lsn, Value)>, // None == end
90 : }
91 :
92 : impl<'a> PeekableLayerIterRef<'a> {
93 295 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
94 295 : let peeked = iter.next().await?;
95 295 : Ok(Self { iter, peeked })
96 295 : }
97 :
98 4225120 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
99 4225120 : &self.peeked
100 4225120 : }
101 :
102 1036012 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
103 1036012 : let result = self.peeked.take();
104 1036012 : self.peeked = self.iter.next().await?;
105 1036012 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
106 1035428 : if (k1, l1) < (k2, l2) {
107 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
108 1035428 : }
109 584 : }
110 1036012 : Ok(result)
111 1036012 : }
112 : }
113 :
114 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
115 0 : fn eq(&self, other: &Self) -> bool {
116 0 : self.cmp(other) == Ordering::Equal
117 0 : }
118 : }
119 :
120 : impl std::cmp::Eq for IteratorWrapper<'_> {}
121 :
122 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
123 2114691 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124 2114691 : Some(self.cmp(other))
125 2114691 : }
126 : }
127 :
128 : impl std::cmp::Ord for IteratorWrapper<'_> {
129 2114691 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
130 : use std::cmp::Ordering;
131 2114691 : let a = self.peek_next_key_lsn_value();
132 2114691 : let b = other.peek_next_key_lsn_value();
133 2114691 : match (a, b) {
134 1510627 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
135 3021254 : fn map_value_to_num(val: &Option<&Value>) -> usize {
136 3016781 : match val {
137 4473 : None => 0,
138 3016523 : Some(Value::Image(_)) => 1,
139 258 : Some(Value::WalRecord(_)) => 2,
140 : }
141 3021254 : }
142 1510627 : let order_1 = map_value_to_num(&v1);
143 1510627 : let order_2 = map_value_to_num(&v2);
144 1510627 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
145 1510627 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
146 1510627 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
147 : }
148 501465 : (Some(_), None) => Ordering::Less,
149 707 : (None, Some(_)) => Ordering::Greater,
150 101892 : (None, None) => Ordering::Equal,
151 : }
152 2114691 : .reverse()
153 2114691 : }
154 : }
155 :
156 : impl<'a> IteratorWrapper<'a> {
157 35 : pub fn create_from_image_layer(
158 35 : image_layer: &'a ImageLayerInner,
159 35 : ctx: &'a RequestContext,
160 35 : max_read_size: u64,
161 35 : max_batch_size: usize,
162 35 : ) -> Self {
163 35 : Self::NotLoaded {
164 35 : layer: LayerRef::Image(image_layer),
165 35 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
166 35 : ctx,
167 35 : source_desc: PersistentLayerKey {
168 35 : key_range: image_layer.key_range().clone(),
169 35 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer.lsn()),
170 35 : is_delta: false,
171 35 : }
172 35 : .into(),
173 35 : max_read_size,
174 35 : max_batch_size,
175 35 : }
176 35 : }
177 :
178 260 : pub fn create_from_delta_layer(
179 260 : delta_layer: &'a DeltaLayerInner,
180 260 : ctx: &'a RequestContext,
181 260 : max_read_size: u64,
182 260 : max_batch_size: usize,
183 260 : ) -> Self {
184 260 : Self::NotLoaded {
185 260 : layer: LayerRef::Delta(delta_layer),
186 260 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
187 260 : ctx,
188 260 : source_desc: PersistentLayerKey {
189 260 : key_range: delta_layer.key_range().clone(),
190 260 : lsn_range: delta_layer.lsn_range().clone(),
191 260 : is_delta: true,
192 260 : }
193 260 : .into(),
194 260 : max_read_size,
195 260 : max_batch_size,
196 260 : }
197 260 : }
198 :
199 4229382 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
200 4229382 : match self {
201 4224825 : Self::Loaded { iter, .. } => iter
202 4224825 : .peek()
203 4224825 : .as_ref()
204 4224825 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
205 4557 : Self::NotLoaded {
206 4557 : first_key_lower_bound: (key, lsn),
207 4557 : ..
208 4557 : } => Some((key, *lsn, None)),
209 : }
210 4229382 : }
211 :
212 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
213 : //
214 : // The reason is that `impl Ord for Self` evaluates differently after this function
215 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
216 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
217 : // and not just `PeekMut::deref`
218 : // If we don't take `&mut self`
219 295 : async fn load(&mut self) -> anyhow::Result<()> {
220 295 : assert!(!self.is_loaded());
221 295 : let Self::NotLoaded {
222 295 : ctx,
223 295 : first_key_lower_bound,
224 295 : layer,
225 295 : source_desc,
226 295 : max_read_size,
227 295 : max_batch_size,
228 295 : } = self
229 : else {
230 0 : unreachable!()
231 : };
232 295 : let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
233 295 : let iter = PeekableLayerIterRef::create(iter).await?;
234 295 : if let Some((k1, l1, _)) = iter.peek() {
235 295 : let (k2, l2) = first_key_lower_bound;
236 295 : if (k1, l1) < (k2, l2) {
237 0 : bail!(
238 0 : "layer key range did not include the first key in the layer: {}",
239 0 : layer.layer_dbg_info()
240 0 : );
241 295 : }
242 0 : }
243 295 : *self = Self::Loaded {
244 295 : iter,
245 295 : source_desc: source_desc.clone(),
246 295 : };
247 295 : Ok(())
248 295 : }
249 :
250 1036602 : fn is_loaded(&self) -> bool {
251 1036602 : matches!(self, Self::Loaded { .. })
252 1036602 : }
253 :
254 : /// Correctness: must load the iterator before using.
255 : ///
256 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
257 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
258 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
259 1036012 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
260 1036012 : let Self::Loaded { iter, .. } = self else {
261 0 : panic!("must load the iterator before using")
262 : };
263 1036012 : iter.next().await
264 1036012 : }
265 :
266 : /// Get the persistent layer key corresponding to this iterator
267 470 : fn trace_source(&self) -> Arc<PersistentLayerKey> {
268 470 : match self {
269 470 : Self::Loaded { source_desc, .. } => source_desc.clone(),
270 0 : Self::NotLoaded { source_desc, .. } => source_desc.clone(),
271 : }
272 470 : }
273 : }
274 :
275 : /// A merge iterator over delta/image layer iterators.
276 : ///
277 : /// When duplicated records are found, the iterator will not perform any
278 : /// deduplication, and the caller should handle these situation. By saying
279 : /// duplicated records, there are many possibilities:
280 : ///
281 : /// * Two same delta at the same LSN.
282 : /// * Two same image at the same LSN.
283 : /// * Delta/image at the same LSN where the image has already applied the delta.
284 : ///
285 : /// The iterator will always put the image before the delta.
286 : pub struct MergeIterator<'a> {
287 : heap: BinaryHeap<IteratorWrapper<'a>>,
288 : }
289 :
290 : pub(crate) trait MergeIteratorItem {
291 : fn new(item: (Key, Lsn, Value), iterator: &IteratorWrapper<'_>) -> Self;
292 :
293 : fn key_lsn_value(&self) -> &(Key, Lsn, Value);
294 : }
295 :
296 : impl MergeIteratorItem for (Key, Lsn, Value) {
297 1035251 : fn new(item: (Key, Lsn, Value), _: &IteratorWrapper<'_>) -> Self {
298 1035251 : item
299 1035251 : }
300 :
301 395 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
302 395 : self
303 395 : }
304 : }
305 :
306 : impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
307 470 : fn new(item: (Key, Lsn, Value), iter: &IteratorWrapper<'_>) -> Self {
308 470 : (item, iter.trace_source().clone())
309 470 : }
310 :
311 990 : fn key_lsn_value(&self) -> &(Key, Lsn, Value) {
312 990 : &self.0
313 990 : }
314 : }
315 :
316 : impl<'a> MergeIterator<'a> {
317 : #[cfg(test)]
318 6 : pub(crate) fn create_for_testing(
319 6 : deltas: &[&'a DeltaLayerInner],
320 6 : images: &[&'a ImageLayerInner],
321 6 : ctx: &'a RequestContext,
322 6 : ) -> Self {
323 6 : Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
324 6 : }
325 :
326 : /// Create a new merge iterator with custom options.
327 : ///
328 : /// Adjust `max_read_size` and `max_batch_size` to trade memory usage for performance. The size should scale
329 : /// with the number of layers to compact. If there are a lot of layers, consider reducing the values, so that
330 : /// the buffer does not take too much memory.
331 : ///
332 : /// The default options for L0 compactions are:
333 : /// - max_read_size: 1024 * 8192 (8MB)
334 : /// - max_batch_size: 1024
335 : ///
336 : /// The default options for gc-compaction are:
337 : /// - max_read_size: 128 * 8192 (1MB)
338 : /// - max_batch_size: 128
339 47 : pub fn create_with_options(
340 47 : deltas: &[&'a DeltaLayerInner],
341 47 : images: &[&'a ImageLayerInner],
342 47 : ctx: &'a RequestContext,
343 47 : max_read_size: u64,
344 47 : max_batch_size: usize,
345 47 : ) -> Self {
346 47 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
347 82 : for image in images {
348 35 : heap.push(IteratorWrapper::create_from_image_layer(
349 35 : image,
350 35 : ctx,
351 35 : max_read_size,
352 35 : max_batch_size,
353 35 : ));
354 35 : }
355 307 : for delta in deltas {
356 260 : heap.push(IteratorWrapper::create_from_delta_layer(
357 260 : delta,
358 260 : ctx,
359 260 : max_read_size,
360 260 : max_batch_size,
361 260 : ));
362 260 : }
363 47 : Self {
364 47 : heap: BinaryHeap::from(heap),
365 47 : }
366 47 : }
367 :
368 1035766 : pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
369 1036352 : while let Some(mut iter) = self.heap.peek_mut() {
370 1036307 : if !iter.is_loaded() {
371 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
372 : // We put it back into the heap so that a potentially unloaded layer may have a key between
373 : // [potential_first_key, loaded_first_key).
374 295 : iter.load().await?;
375 295 : continue;
376 1036012 : }
377 1036012 : let Some(item) = iter.next().await? else {
378 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
379 : // we order None > Some, and all the rest of the iterators should return None.
380 291 : binary_heap::PeekMut::pop(iter);
381 291 : continue;
382 : };
383 1035721 : return Ok(Some(R::new(item, &iter)));
384 : }
385 45 : Ok(None)
386 1035766 : }
387 :
388 : /// Get the next key-value pair from the iterator.
389 1035073 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
390 1035073 : self.next_inner().await
391 1035073 : }
392 :
393 : /// Get the next key-value pair from the iterator, and trace where the key comes from.
394 0 : pub async fn next_with_trace(
395 0 : &mut self,
396 0 : ) -> anyhow::Result<Option<((Key, Lsn, Value), Arc<PersistentLayerKey>)>> {
397 0 : self.next_inner().await
398 0 : }
399 : }
400 :
401 : #[cfg(test)]
402 : mod tests {
403 : use itertools::Itertools;
404 : use pageserver_api::key::Key;
405 : #[cfg(feature = "testing")]
406 : use pageserver_api::record::NeonWalRecord;
407 : use utils::lsn::Lsn;
408 :
409 : use super::*;
410 : use crate::DEFAULT_PG_VERSION;
411 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
412 : #[cfg(feature = "testing")]
413 : use crate::tenant::storage_layer::delta_layer::test::sort_delta_value;
414 : use crate::tenant::storage_layer::delta_layer::test::{produce_delta_layer, sort_delta};
415 :
416 4 : async fn assert_merge_iter_equal(
417 4 : merge_iter: &mut MergeIterator<'_>,
418 4 : expect: &[(Key, Lsn, Value)],
419 4 : ) {
420 4 : let mut expect_iter = expect.iter();
421 : loop {
422 3040 : let o1 = merge_iter.next().await.unwrap();
423 3040 : let o2 = expect_iter.next();
424 3040 : assert_eq!(o1.is_some(), o2.is_some());
425 3040 : if o1.is_none() && o2.is_none() {
426 4 : break;
427 3036 : }
428 3036 : let (k1, l1, v1) = o1.unwrap();
429 3036 : let (k2, l2, v2) = o2.unwrap();
430 3036 : assert_eq!(&k1, k2);
431 3036 : assert_eq!(l1, *l2);
432 3036 : assert_eq!(&v1, v2);
433 : }
434 4 : }
435 :
436 : #[tokio::test]
437 1 : async fn merge_in_between() {
438 1 : use bytes::Bytes;
439 1 : use pageserver_api::value::Value;
440 1 :
441 1 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
442 1 : .await
443 1 : .unwrap();
444 1 : let (tenant, ctx) = harness.load().await;
445 1 :
446 1 : let tline = tenant
447 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
448 1 : .await
449 1 : .unwrap();
450 1 :
451 4 : fn get_key(id: u32) -> Key {
452 4 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
453 4 : key.field6 = id;
454 4 : key
455 4 : }
456 1 : let test_deltas1 = vec![
457 1 : (
458 1 : get_key(0),
459 1 : Lsn(0x10),
460 1 : Value::Image(Bytes::copy_from_slice(b"test")),
461 1 : ),
462 1 : (
463 1 : get_key(5),
464 1 : Lsn(0x10),
465 1 : Value::Image(Bytes::copy_from_slice(b"test")),
466 1 : ),
467 1 : ];
468 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
469 1 : .await
470 1 : .unwrap();
471 1 : let test_deltas2 = vec![
472 1 : (
473 1 : get_key(3),
474 1 : Lsn(0x10),
475 1 : Value::Image(Bytes::copy_from_slice(b"test")),
476 1 : ),
477 1 : (
478 1 : get_key(4),
479 1 : Lsn(0x10),
480 1 : Value::Image(Bytes::copy_from_slice(b"test")),
481 1 : ),
482 1 : ];
483 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
484 1 : .await
485 1 : .unwrap();
486 1 : let mut merge_iter = MergeIterator::create_for_testing(
487 1 : &[
488 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
489 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
490 1 : ],
491 1 : &[],
492 1 : &ctx,
493 1 : );
494 1 : let mut expect = Vec::new();
495 1 : expect.extend(test_deltas1);
496 1 : expect.extend(test_deltas2);
497 1 : expect.sort_by(sort_delta);
498 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
499 1 : }
500 :
501 : #[tokio::test]
502 1 : async fn delta_merge() {
503 1 : use bytes::Bytes;
504 1 : use pageserver_api::value::Value;
505 1 :
506 1 : let harness = TenantHarness::create("merge_iterator_delta_merge")
507 1 : .await
508 1 : .unwrap();
509 1 : let (tenant, ctx) = harness.load().await;
510 1 :
511 1 : let tline = tenant
512 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
513 1 : .await
514 1 : .unwrap();
515 1 :
516 3000 : fn get_key(id: u32) -> Key {
517 3000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
518 3000 : key.field6 = id;
519 3000 : key
520 3000 : }
521 1 : const N: usize = 1000;
522 1 : let test_deltas1 = (0..N)
523 1000 : .map(|idx| {
524 1000 : (
525 1000 : get_key(idx as u32 / 10),
526 1000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
527 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
528 1000 : )
529 1000 : })
530 1 : .collect_vec();
531 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
532 1 : .await
533 1 : .unwrap();
534 1 : let test_deltas2 = (0..N)
535 1000 : .map(|idx| {
536 1000 : (
537 1000 : get_key(idx as u32 / 10),
538 1000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
539 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
540 1000 : )
541 1000 : })
542 1 : .collect_vec();
543 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
544 1 : .await
545 1 : .unwrap();
546 1 : let test_deltas3 = (0..N)
547 1000 : .map(|idx| {
548 1000 : (
549 1000 : get_key(idx as u32 / 10 + N as u32),
550 1000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
551 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
552 1000 : )
553 1000 : })
554 1 : .collect_vec();
555 1 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
556 1 : .await
557 1 : .unwrap();
558 1 : let mut merge_iter = MergeIterator::create_for_testing(
559 1 : &[
560 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
561 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
562 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
563 1 : ],
564 1 : &[],
565 1 : &ctx,
566 1 : );
567 1 : let mut expect = Vec::new();
568 1 : expect.extend(test_deltas1);
569 1 : expect.extend(test_deltas2);
570 1 : expect.extend(test_deltas3);
571 1 : expect.sort_by(sort_delta);
572 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
573 1 :
574 1 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
575 1 : }
576 :
577 : #[cfg(feature = "testing")]
578 : #[tokio::test]
579 1 : async fn delta_image_mixed_merge() {
580 1 : use bytes::Bytes;
581 1 : use pageserver_api::value::Value;
582 1 :
583 1 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
584 1 : .await
585 1 : .unwrap();
586 1 : let (tenant, ctx) = harness.load().await;
587 1 :
588 1 : let tline = tenant
589 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
590 1 : .await
591 1 : .unwrap();
592 1 :
593 9 : fn get_key(id: u32) -> Key {
594 9 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
595 9 : key.field6 = id;
596 9 : key
597 9 : }
598 1 : // In this test case, we want to test if the iterator still works correctly with multiple copies
599 1 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
600 1 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
601 1 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
602 1 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
603 1 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
604 1 : // correctly process these situations and return everything as-is, and the upper layer of the system
605 1 : // will handle duplicated LSNs.
606 1 : let test_deltas1 = vec![
607 1 : (
608 1 : get_key(0),
609 1 : Lsn(0x10),
610 1 : Value::WalRecord(NeonWalRecord::wal_init("")),
611 1 : ),
612 1 : (
613 1 : get_key(0),
614 1 : Lsn(0x18),
615 1 : Value::WalRecord(NeonWalRecord::wal_append("a")),
616 1 : ),
617 1 : (
618 1 : get_key(5),
619 1 : Lsn(0x10),
620 1 : Value::WalRecord(NeonWalRecord::wal_init("")),
621 1 : ),
622 1 : (
623 1 : get_key(5),
624 1 : Lsn(0x18),
625 1 : Value::WalRecord(NeonWalRecord::wal_append("b")),
626 1 : ),
627 1 : ];
628 1 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
629 1 : .await
630 1 : .unwrap();
631 1 : let mut test_deltas2 = test_deltas1.clone();
632 1 : test_deltas2.push((
633 1 : get_key(10),
634 1 : Lsn(0x20),
635 1 : Value::Image(Bytes::copy_from_slice(b"test")),
636 1 : ));
637 1 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
638 1 : .await
639 1 : .unwrap();
640 1 : let test_deltas3 = vec![
641 1 : (
642 1 : get_key(0),
643 1 : Lsn(0x10),
644 1 : Value::Image(Bytes::copy_from_slice(b"")),
645 1 : ),
646 1 : (
647 1 : get_key(5),
648 1 : Lsn(0x18),
649 1 : Value::Image(Bytes::copy_from_slice(b"b")),
650 1 : ),
651 1 : (
652 1 : get_key(15),
653 1 : Lsn(0x20),
654 1 : Value::Image(Bytes::copy_from_slice(b"test")),
655 1 : ),
656 1 : ];
657 1 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
658 1 : .await
659 1 : .unwrap();
660 1 : let mut test_deltas4 = test_deltas3.clone();
661 1 : test_deltas4.push((
662 1 : get_key(20),
663 1 : Lsn(0x20),
664 1 : Value::Image(Bytes::copy_from_slice(b"test")),
665 1 : ));
666 1 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
667 1 : .await
668 1 : .unwrap();
669 1 : let mut expect = Vec::new();
670 1 : expect.extend(test_deltas1);
671 1 : expect.extend(test_deltas2);
672 1 : expect.extend(test_deltas3);
673 1 : expect.extend(test_deltas4);
674 1 : expect.sort_by(sort_delta_value);
675 1 :
676 1 : // Test with different layer order for MergeIterator::create to ensure the order
677 1 : // is stable.
678 1 :
679 1 : let mut merge_iter = MergeIterator::create_for_testing(
680 1 : &[
681 1 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
682 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
683 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
684 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
685 1 : ],
686 1 : &[],
687 1 : &ctx,
688 1 : );
689 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
690 1 :
691 1 : let mut merge_iter = MergeIterator::create_for_testing(
692 1 : &[
693 1 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
694 1 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
695 1 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
696 1 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
697 1 : ],
698 1 : &[],
699 1 : &ctx,
700 1 : );
701 1 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
702 1 :
703 1 : is_send(merge_iter);
704 1 : }
705 :
706 : #[cfg(feature = "testing")]
707 1 : fn is_send(_: impl Send) {}
708 : }
|