Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : };
5 :
6 : use anyhow::bail;
7 : use pageserver_api::key::Key;
8 : use utils::lsn::Lsn;
9 :
10 : use crate::{context::RequestContext, repository::Value};
11 :
12 : use super::{
13 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
14 : image_layer::{ImageLayerInner, ImageLayerIterator},
15 : };
16 :
17 : #[derive(Clone, Copy)]
18 : enum LayerRef<'a> {
19 : Image(&'a ImageLayerInner),
20 : Delta(&'a DeltaLayerInner),
21 : }
22 :
23 : impl<'a> LayerRef<'a> {
24 1530 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
25 1530 : match self {
26 108 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
27 1422 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
28 : }
29 1530 : }
30 :
31 0 : fn layer_dbg_info(&self) -> String {
32 0 : match self {
33 0 : Self::Image(x) => x.layer_dbg_info(),
34 0 : Self::Delta(x) => x.layer_dbg_info(),
35 : }
36 0 : }
37 : }
38 :
39 : enum LayerIterRef<'a> {
40 : Image(ImageLayerIterator<'a>),
41 : Delta(DeltaLayerIterator<'a>),
42 : }
43 :
44 : impl LayerIterRef<'_> {
45 6215070 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
46 6215070 : match self {
47 6213630 : Self::Delta(x) => x.next().await,
48 1440 : Self::Image(x) => x.next().await,
49 : }
50 6215070 : }
51 :
52 0 : fn layer_dbg_info(&self) -> String {
53 0 : match self {
54 0 : Self::Image(x) => x.layer_dbg_info(),
55 0 : Self::Delta(x) => x.layer_dbg_info(),
56 : }
57 0 : }
58 : }
59 :
60 : /// This type plays several roles at once
61 : /// 1. Unified iterator for image and delta layers.
62 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
63 : /// 3. Lazy creation of the real delta/image iterator.
64 : enum IteratorWrapper<'a> {
65 : NotLoaded {
66 : ctx: &'a RequestContext,
67 : first_key_lower_bound: (Key, Lsn),
68 : layer: LayerRef<'a>,
69 : },
70 : Loaded {
71 : iter: PeekableLayerIterRef<'a>,
72 : },
73 : }
74 :
75 : struct PeekableLayerIterRef<'a> {
76 : iter: LayerIterRef<'a>,
77 : peeked: Option<(Key, Lsn, Value)>, // None == end
78 : }
79 :
80 : impl<'a> PeekableLayerIterRef<'a> {
81 1530 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
82 1848 : let peeked = iter.next().await?;
83 1530 : Ok(Self { iter, peeked })
84 1530 : }
85 :
86 25351774 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
87 25351774 : &self.peeked
88 25351774 : }
89 :
90 6213540 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
91 6213540 : let result = self.peeked.take();
92 6213540 : self.peeked = self.iter.next().await?;
93 6213540 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
94 6210480 : if (k1, l1) < (k2, l2) {
95 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
96 6210480 : }
97 3060 : }
98 6213540 : Ok(result)
99 6213540 : }
100 : }
101 :
102 : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
103 0 : fn eq(&self, other: &Self) -> bool {
104 0 : self.cmp(other) == Ordering::Equal
105 0 : }
106 : }
107 :
108 : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
109 :
110 : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
111 12687689 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 12687689 : Some(self.cmp(other))
113 12687689 : }
114 : }
115 :
116 : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
117 12687689 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 12687689 : use std::cmp::Ordering;
119 12687689 : let a = self.peek_next_key_lsn_value();
120 12687689 : let b = other.peek_next_key_lsn_value();
121 12687689 : match (a, b) {
122 9064277 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
123 18128554 : fn map_value_to_num(val: &Option<&Value>) -> usize {
124 18103672 : match val {
125 9064277 : None => 0,
126 18102250 : Some(Value::Image(_)) => 1,
127 9064277 : Some(Value::WalRecord(_)) => 2,
128 9064277 : }
129 18128554 : }
130 9064277 : let order_1 = map_value_to_num(&v1);
131 9064277 : let order_2 = map_value_to_num(&v2);
132 9064277 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
133 9064277 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
134 9064277 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
135 : }
136 3008496 : (Some(_), None) => Ordering::Less,
137 3827 : (None, Some(_)) => Ordering::Greater,
138 611089 : (None, None) => Ordering::Equal,
139 : }
140 12687689 : .reverse()
141 12687689 : }
142 : }
143 :
144 : impl<'a> IteratorWrapper<'a> {
145 108 : pub fn create_from_image_layer(
146 108 : image_layer: &'a ImageLayerInner,
147 108 : ctx: &'a RequestContext,
148 108 : ) -> Self {
149 108 : Self::NotLoaded {
150 108 : layer: LayerRef::Image(image_layer),
151 108 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
152 108 : ctx,
153 108 : }
154 108 : }
155 :
156 1422 : pub fn create_from_delta_layer(
157 1422 : delta_layer: &'a DeltaLayerInner,
158 1422 : ctx: &'a RequestContext,
159 1422 : ) -> Self {
160 1422 : Self::NotLoaded {
161 1422 : layer: LayerRef::Delta(delta_layer),
162 1422 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
163 1422 : ctx,
164 1422 : }
165 1422 : }
166 :
167 25375378 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
168 25375378 : match self {
169 25350244 : Self::Loaded { iter } => iter
170 25350244 : .peek()
171 25350244 : .as_ref()
172 25350244 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
173 25134 : Self::NotLoaded {
174 25134 : first_key_lower_bound: (key, lsn),
175 25134 : ..
176 25134 : } => Some((key, *lsn, None)),
177 : }
178 25375378 : }
179 :
180 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
181 : //
182 : // The reason is that `impl Ord for Self` evaluates differently after this function
183 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
184 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
185 : // and not just `PeekMut::deref`
186 : // If we don't take `&mut self`
187 1530 : async fn load(&mut self) -> anyhow::Result<()> {
188 1530 : assert!(!self.is_loaded());
189 1530 : let Self::NotLoaded {
190 1530 : ctx,
191 1530 : first_key_lower_bound,
192 1530 : layer,
193 1530 : } = self
194 : else {
195 0 : unreachable!()
196 : };
197 1530 : let iter = layer.iter(ctx);
198 1848 : let iter = PeekableLayerIterRef::create(iter).await?;
199 1530 : if let Some((k1, l1, _)) = iter.peek() {
200 1530 : let (k2, l2) = first_key_lower_bound;
201 1530 : if (k1, l1) < (k2, l2) {
202 0 : bail!(
203 0 : "layer key range did not include the first key in the layer: {}",
204 0 : layer.layer_dbg_info()
205 0 : );
206 1530 : }
207 0 : }
208 1530 : *self = Self::Loaded { iter };
209 1530 : Ok(())
210 1530 : }
211 :
212 6216600 : fn is_loaded(&self) -> bool {
213 6216600 : matches!(self, Self::Loaded { .. })
214 6216600 : }
215 :
216 : /// Correctness: must load the iterator before using.
217 : ///
218 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
219 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
220 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
221 6213540 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
222 6213540 : let Self::Loaded { iter } = self else {
223 0 : panic!("must load the iterator before using")
224 : };
225 6213540 : iter.next().await
226 6213540 : }
227 : }
228 :
229 : /// A merge iterator over delta/image layer iterators. When duplicated records are
230 : /// found, the iterator will not perform any deduplication, and the caller should handle
231 : /// these situation. By saying duplicated records, there are many possibilities:
232 : ///
233 : /// * Two same delta at the same LSN.
234 : /// * Two same image at the same LSN.
235 : /// * Delta/image at the same LSN where the image has already applied the delta.
236 : ///
237 : /// The iterator will always put the image before the delta.
238 : pub struct MergeIterator<'a> {
239 : heap: BinaryHeap<IteratorWrapper<'a>>,
240 : }
241 :
242 : impl<'a> MergeIterator<'a> {
243 186 : pub fn create(
244 186 : deltas: &[&'a DeltaLayerInner],
245 186 : images: &[&'a ImageLayerInner],
246 186 : ctx: &'a RequestContext,
247 186 : ) -> Self {
248 186 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
249 294 : for image in images {
250 108 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
251 108 : }
252 1608 : for delta in deltas {
253 1422 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
254 1422 : }
255 186 : Self {
256 186 : heap: BinaryHeap::from(heap),
257 186 : }
258 186 : }
259 :
260 6212196 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
261 6215256 : while let Some(mut iter) = self.heap.peek_mut() {
262 6215070 : if !iter.is_loaded() {
263 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
264 : // We put it back into the heap so that a potentially unloaded layer may have a key between
265 : // [potential_first_key, loaded_first_key).
266 1848 : iter.load().await?;
267 1530 : continue;
268 6213540 : }
269 6213540 : let Some(item) = iter.next().await? else {
270 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
271 : // we order None > Some, and all the rest of the iterators should return None.
272 1530 : binary_heap::PeekMut::pop(iter);
273 1530 : continue;
274 : };
275 6212010 : return Ok(Some(item));
276 : }
277 186 : Ok(None)
278 6212196 : }
279 : }
280 :
281 : #[cfg(test)]
282 : mod tests {
283 : use super::*;
284 :
285 : use itertools::Itertools;
286 : use pageserver_api::key::Key;
287 : use utils::lsn::Lsn;
288 :
289 : use crate::{
290 : tenant::{
291 : harness::{TenantHarness, TIMELINE_ID},
292 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
293 : },
294 : walrecord::NeonWalRecord,
295 : DEFAULT_PG_VERSION,
296 : };
297 :
298 24 : async fn assert_merge_iter_equal(
299 24 : merge_iter: &mut MergeIterator<'_>,
300 24 : expect: &[(Key, Lsn, Value)],
301 24 : ) {
302 24 : let mut expect_iter = expect.iter();
303 : loop {
304 18240 : let o1 = merge_iter.next().await.unwrap();
305 18240 : let o2 = expect_iter.next();
306 18240 : assert_eq!(o1.is_some(), o2.is_some());
307 18240 : if o1.is_none() && o2.is_none() {
308 24 : break;
309 18216 : }
310 18216 : let (k1, l1, v1) = o1.unwrap();
311 18216 : let (k2, l2, v2) = o2.unwrap();
312 18216 : assert_eq!(&k1, k2);
313 18216 : assert_eq!(l1, *l2);
314 18216 : assert_eq!(&v1, v2);
315 : }
316 24 : }
317 :
318 : #[tokio::test]
319 6 : async fn merge_in_between() {
320 6 : use crate::repository::Value;
321 6 : use bytes::Bytes;
322 6 :
323 6 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
324 6 : .await
325 6 : .unwrap();
326 24 : let (tenant, ctx) = harness.load().await;
327 6 :
328 6 : let tline = tenant
329 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
330 12 : .await
331 6 : .unwrap();
332 6 :
333 24 : fn get_key(id: u32) -> Key {
334 24 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
335 24 : key.field6 = id;
336 24 : key
337 24 : }
338 6 : let test_deltas1 = vec![
339 6 : (
340 6 : get_key(0),
341 6 : Lsn(0x10),
342 6 : Value::Image(Bytes::copy_from_slice(b"test")),
343 6 : ),
344 6 : (
345 6 : get_key(5),
346 6 : Lsn(0x10),
347 6 : Value::Image(Bytes::copy_from_slice(b"test")),
348 6 : ),
349 6 : ];
350 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
351 18 : .await
352 6 : .unwrap();
353 6 : let test_deltas2 = vec![
354 6 : (
355 6 : get_key(3),
356 6 : Lsn(0x10),
357 6 : Value::Image(Bytes::copy_from_slice(b"test")),
358 6 : ),
359 6 : (
360 6 : get_key(4),
361 6 : Lsn(0x10),
362 6 : Value::Image(Bytes::copy_from_slice(b"test")),
363 6 : ),
364 6 : ];
365 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
366 18 : .await
367 6 : .unwrap();
368 6 : let mut merge_iter = MergeIterator::create(
369 6 : &[
370 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
371 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
372 6 : ],
373 6 : &[],
374 6 : &ctx,
375 6 : );
376 6 : let mut expect = Vec::new();
377 6 : expect.extend(test_deltas1);
378 6 : expect.extend(test_deltas2);
379 6 : expect.sort_by(sort_delta);
380 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
381 6 : }
382 :
383 : #[tokio::test]
384 6 : async fn delta_merge() {
385 6 : use crate::repository::Value;
386 6 : use bytes::Bytes;
387 6 :
388 6 : let harness = TenantHarness::create("merge_iterator_delta_merge")
389 6 : .await
390 6 : .unwrap();
391 23 : let (tenant, ctx) = harness.load().await;
392 6 :
393 6 : let tline = tenant
394 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
395 12 : .await
396 6 : .unwrap();
397 6 :
398 18000 : fn get_key(id: u32) -> Key {
399 18000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
400 18000 : key.field6 = id;
401 18000 : key
402 18000 : }
403 6 : const N: usize = 1000;
404 6 : let test_deltas1 = (0..N)
405 6000 : .map(|idx| {
406 6000 : (
407 6000 : get_key(idx as u32 / 10),
408 6000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
409 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
410 6000 : )
411 6000 : })
412 6 : .collect_vec();
413 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
414 24 : .await
415 6 : .unwrap();
416 6 : let test_deltas2 = (0..N)
417 6000 : .map(|idx| {
418 6000 : (
419 6000 : get_key(idx as u32 / 10),
420 6000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
421 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
422 6000 : )
423 6000 : })
424 6 : .collect_vec();
425 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
426 24 : .await
427 6 : .unwrap();
428 6 : let test_deltas3 = (0..N)
429 6000 : .map(|idx| {
430 6000 : (
431 6000 : get_key(idx as u32 / 10 + N as u32),
432 6000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
433 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
434 6000 : )
435 6000 : })
436 6 : .collect_vec();
437 6 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
438 24 : .await
439 6 : .unwrap();
440 6 : let mut merge_iter = MergeIterator::create(
441 6 : &[
442 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
443 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
444 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
445 6 : ],
446 6 : &[],
447 6 : &ctx,
448 6 : );
449 6 : let mut expect = Vec::new();
450 6 : expect.extend(test_deltas1);
451 6 : expect.extend(test_deltas2);
452 6 : expect.extend(test_deltas3);
453 6 : expect.sort_by(sort_delta);
454 36 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
455 6 :
456 6 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
457 6 : }
458 :
459 : #[tokio::test]
460 6 : async fn delta_image_mixed_merge() {
461 6 : use crate::repository::Value;
462 6 : use bytes::Bytes;
463 6 :
464 6 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
465 6 : .await
466 6 : .unwrap();
467 24 : let (tenant, ctx) = harness.load().await;
468 6 :
469 6 : let tline = tenant
470 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
471 12 : .await
472 6 : .unwrap();
473 6 :
474 54 : fn get_key(id: u32) -> Key {
475 54 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
476 54 : key.field6 = id;
477 54 : key
478 54 : }
479 6 : // In this test case, we want to test if the iterator still works correctly with multiple copies
480 6 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
481 6 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
482 6 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
483 6 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
484 6 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
485 6 : // correctly process these situations and return everything as-is, and the upper layer of the system
486 6 : // will handle duplicated LSNs.
487 6 : let test_deltas1 = vec![
488 6 : (
489 6 : get_key(0),
490 6 : Lsn(0x10),
491 6 : Value::WalRecord(NeonWalRecord::wal_init()),
492 6 : ),
493 6 : (
494 6 : get_key(0),
495 6 : Lsn(0x18),
496 6 : Value::WalRecord(NeonWalRecord::wal_append("a")),
497 6 : ),
498 6 : (
499 6 : get_key(5),
500 6 : Lsn(0x10),
501 6 : Value::WalRecord(NeonWalRecord::wal_init()),
502 6 : ),
503 6 : (
504 6 : get_key(5),
505 6 : Lsn(0x18),
506 6 : Value::WalRecord(NeonWalRecord::wal_append("b")),
507 6 : ),
508 6 : ];
509 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
510 18 : .await
511 6 : .unwrap();
512 6 : let mut test_deltas2 = test_deltas1.clone();
513 6 : test_deltas2.push((
514 6 : get_key(10),
515 6 : Lsn(0x20),
516 6 : Value::Image(Bytes::copy_from_slice(b"test")),
517 6 : ));
518 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
519 18 : .await
520 6 : .unwrap();
521 6 : let test_deltas3 = vec![
522 6 : (
523 6 : get_key(0),
524 6 : Lsn(0x10),
525 6 : Value::Image(Bytes::copy_from_slice(b"")),
526 6 : ),
527 6 : (
528 6 : get_key(5),
529 6 : Lsn(0x18),
530 6 : Value::Image(Bytes::copy_from_slice(b"b")),
531 6 : ),
532 6 : (
533 6 : get_key(15),
534 6 : Lsn(0x20),
535 6 : Value::Image(Bytes::copy_from_slice(b"test")),
536 6 : ),
537 6 : ];
538 6 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
539 18 : .await
540 6 : .unwrap();
541 6 : let mut test_deltas4 = test_deltas3.clone();
542 6 : test_deltas4.push((
543 6 : get_key(20),
544 6 : Lsn(0x20),
545 6 : Value::Image(Bytes::copy_from_slice(b"test")),
546 6 : ));
547 6 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
548 18 : .await
549 6 : .unwrap();
550 6 : let mut expect = Vec::new();
551 6 : expect.extend(test_deltas1);
552 6 : expect.extend(test_deltas2);
553 6 : expect.extend(test_deltas3);
554 6 : expect.extend(test_deltas4);
555 6 : expect.sort_by(sort_delta_value);
556 6 :
557 6 : // Test with different layer order for MergeIterator::create to ensure the order
558 6 : // is stable.
559 6 :
560 6 : let mut merge_iter = MergeIterator::create(
561 6 : &[
562 6 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
563 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
564 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
565 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
566 6 : ],
567 6 : &[],
568 6 : &ctx,
569 6 : );
570 24 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
571 6 :
572 6 : let mut merge_iter = MergeIterator::create(
573 6 : &[
574 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
575 6 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
576 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
577 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
578 6 : ],
579 6 : &[],
580 6 : &ctx,
581 6 : );
582 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
583 6 :
584 6 : is_send(merge_iter);
585 6 : }
586 :
587 6 : fn is_send(_: impl Send) {}
588 : }
|