Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : };
5 :
6 : use anyhow::bail;
7 : use pageserver_api::key::Key;
8 : use utils::lsn::Lsn;
9 :
10 : use crate::{context::RequestContext, repository::Value};
11 :
12 : use super::{
13 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
14 : image_layer::{ImageLayerInner, ImageLayerIterator},
15 : };
16 :
17 : #[derive(Clone, Copy)]
18 : enum LayerRef<'a> {
19 : Image(&'a ImageLayerInner),
20 : Delta(&'a DeltaLayerInner),
21 : }
22 :
23 : impl<'a> LayerRef<'a> {
24 1530 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
25 1530 : match self {
26 108 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
27 1422 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
28 : }
29 1530 : }
30 :
31 0 : fn layer_dbg_info(&self) -> String {
32 0 : match self {
33 0 : Self::Image(x) => x.layer_dbg_info(),
34 0 : Self::Delta(x) => x.layer_dbg_info(),
35 : }
36 0 : }
37 : }
38 :
39 : enum LayerIterRef<'a> {
40 : Image(ImageLayerIterator<'a>),
41 : Delta(DeltaLayerIterator<'a>),
42 : }
43 :
44 : impl LayerIterRef<'_> {
45 6215070 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
46 6215070 : match self {
47 6213630 : Self::Delta(x) => x.next().await,
48 1440 : Self::Image(x) => x.next().await,
49 : }
50 6215070 : }
51 :
52 0 : fn layer_dbg_info(&self) -> String {
53 0 : match self {
54 0 : Self::Image(x) => x.layer_dbg_info(),
55 0 : Self::Delta(x) => x.layer_dbg_info(),
56 : }
57 0 : }
58 : }
59 :
60 : /// This type plays several roles at once
61 : /// 1. Unified iterator for image and delta layers.
62 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
63 : /// 3. Lazy creation of the real delta/image iterator.
64 : enum IteratorWrapper<'a> {
65 : NotLoaded {
66 : ctx: &'a RequestContext,
67 : first_key_lower_bound: (Key, Lsn),
68 : layer: LayerRef<'a>,
69 : },
70 : Loaded {
71 : iter: PeekableLayerIterRef<'a>,
72 : },
73 : }
74 :
75 : struct PeekableLayerIterRef<'a> {
76 : iter: LayerIterRef<'a>,
77 : peeked: Option<(Key, Lsn, Value)>, // None == end
78 : }
79 :
80 : impl<'a> PeekableLayerIterRef<'a> {
81 1530 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
82 1896 : let peeked = iter.next().await?;
83 1530 : Ok(Self { iter, peeked })
84 1530 : }
85 :
86 25349464 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
87 25349464 : &self.peeked
88 25349464 : }
89 :
90 6213540 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
91 6213540 : let result = self.peeked.take();
92 6213540 : self.peeked = self.iter.next().await?;
93 6213540 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
94 6210480 : if (k1, l1) < (k2, l2) {
95 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
96 6210480 : }
97 3060 : }
98 6213540 : Ok(result)
99 6213540 : }
100 : }
101 :
102 : impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
103 0 : fn eq(&self, other: &Self) -> bool {
104 0 : self.cmp(other) == Ordering::Equal
105 0 : }
106 : }
107 :
108 : impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
109 :
110 : impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
111 12686690 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 12686690 : Some(self.cmp(other))
113 12686690 : }
114 : }
115 :
116 : impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
117 12686690 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 : use std::cmp::Ordering;
119 12686690 : let a = self.peek_next_key_lsn_value();
120 12686690 : let b = other.peek_next_key_lsn_value();
121 12686690 : match (a, b) {
122 9063302 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
123 18126604 : fn map_value_to_num(val: &Option<&Value>) -> usize {
124 18101410 : match val {
125 25194 : None => 0,
126 18100240 : Some(Value::Image(_)) => 1,
127 1170 : Some(Value::WalRecord(_)) => 2,
128 : }
129 18126604 : }
130 9063302 : let order_1 = map_value_to_num(&v1);
131 9063302 : let order_2 = map_value_to_num(&v2);
132 9063302 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
133 9063302 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
134 9063302 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
135 : }
136 3008472 : (Some(_), None) => Ordering::Less,
137 3832 : (None, Some(_)) => Ordering::Greater,
138 611084 : (None, None) => Ordering::Equal,
139 : }
140 12686690 : .reverse()
141 12686690 : }
142 : }
143 :
144 : impl<'a> IteratorWrapper<'a> {
145 108 : pub fn create_from_image_layer(
146 108 : image_layer: &'a ImageLayerInner,
147 108 : ctx: &'a RequestContext,
148 108 : ) -> Self {
149 108 : Self::NotLoaded {
150 108 : layer: LayerRef::Image(image_layer),
151 108 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
152 108 : ctx,
153 108 : }
154 108 : }
155 :
156 1422 : pub fn create_from_delta_layer(
157 1422 : delta_layer: &'a DeltaLayerInner,
158 1422 : ctx: &'a RequestContext,
159 1422 : ) -> Self {
160 1422 : Self::NotLoaded {
161 1422 : layer: LayerRef::Delta(delta_layer),
162 1422 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
163 1422 : ctx,
164 1422 : }
165 1422 : }
166 :
167 25373380 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
168 25373380 : match self {
169 25347934 : Self::Loaded { iter } => iter
170 25347934 : .peek()
171 25347934 : .as_ref()
172 25347934 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
173 25446 : Self::NotLoaded {
174 25446 : first_key_lower_bound: (key, lsn),
175 25446 : ..
176 25446 : } => Some((key, *lsn, None)),
177 : }
178 25373380 : }
179 :
180 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
181 : //
182 : // The reason is that `impl Ord for Self` evaluates differently after this function
183 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
184 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
185 : // and not just `PeekMut::deref`
186 : // If we don't take `&mut self`
187 1530 : async fn load(&mut self) -> anyhow::Result<()> {
188 1530 : assert!(!self.is_loaded());
189 1530 : let Self::NotLoaded {
190 1530 : ctx,
191 1530 : first_key_lower_bound,
192 1530 : layer,
193 1530 : } = self
194 : else {
195 0 : unreachable!()
196 : };
197 1530 : let iter = layer.iter(ctx);
198 1896 : let iter = PeekableLayerIterRef::create(iter).await?;
199 1530 : if let Some((k1, l1, _)) = iter.peek() {
200 1530 : let (k2, l2) = first_key_lower_bound;
201 1530 : if (k1, l1) < (k2, l2) {
202 0 : bail!(
203 0 : "layer key range did not include the first key in the layer: {}",
204 0 : layer.layer_dbg_info()
205 0 : );
206 1530 : }
207 0 : }
208 1530 : *self = Self::Loaded { iter };
209 1530 : Ok(())
210 1530 : }
211 :
212 6216600 : fn is_loaded(&self) -> bool {
213 6216600 : matches!(self, Self::Loaded { .. })
214 6216600 : }
215 :
216 : /// Correctness: must load the iterator before using.
217 : ///
218 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
219 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
220 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
221 6213540 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
222 6213540 : let Self::Loaded { iter } = self else {
223 0 : panic!("must load the iterator before using")
224 : };
225 6213540 : iter.next().await
226 6213540 : }
227 : }
228 :
229 : /// A merge iterator over delta/image layer iterators.
230 : ///
231 : /// When duplicated records are found, the iterator will not perform any
232 : /// deduplication, and the caller should handle these situation. By saying
233 : /// duplicated records, there are many possibilities:
234 : ///
235 : /// * Two same delta at the same LSN.
236 : /// * Two same image at the same LSN.
237 : /// * Delta/image at the same LSN where the image has already applied the delta.
238 : ///
239 : /// The iterator will always put the image before the delta.
240 : pub struct MergeIterator<'a> {
241 : heap: BinaryHeap<IteratorWrapper<'a>>,
242 : }
243 :
244 : impl<'a> MergeIterator<'a> {
245 186 : pub fn create(
246 186 : deltas: &[&'a DeltaLayerInner],
247 186 : images: &[&'a ImageLayerInner],
248 186 : ctx: &'a RequestContext,
249 186 : ) -> Self {
250 186 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
251 294 : for image in images {
252 108 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
253 108 : }
254 1608 : for delta in deltas {
255 1422 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
256 1422 : }
257 186 : Self {
258 186 : heap: BinaryHeap::from(heap),
259 186 : }
260 186 : }
261 :
262 6212196 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
263 6215256 : while let Some(mut iter) = self.heap.peek_mut() {
264 6215070 : if !iter.is_loaded() {
265 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
266 : // We put it back into the heap so that a potentially unloaded layer may have a key between
267 : // [potential_first_key, loaded_first_key).
268 1896 : iter.load().await?;
269 1530 : continue;
270 6213540 : }
271 6213540 : let Some(item) = iter.next().await? else {
272 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
273 : // we order None > Some, and all the rest of the iterators should return None.
274 1530 : binary_heap::PeekMut::pop(iter);
275 1530 : continue;
276 : };
277 6212010 : return Ok(Some(item));
278 : }
279 186 : Ok(None)
280 6212196 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 : use super::*;
286 :
287 : use itertools::Itertools;
288 : use pageserver_api::key::Key;
289 : use utils::lsn::Lsn;
290 :
291 : use crate::{
292 : tenant::{
293 : harness::{TenantHarness, TIMELINE_ID},
294 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
295 : },
296 : walrecord::NeonWalRecord,
297 : DEFAULT_PG_VERSION,
298 : };
299 :
300 24 : async fn assert_merge_iter_equal(
301 24 : merge_iter: &mut MergeIterator<'_>,
302 24 : expect: &[(Key, Lsn, Value)],
303 24 : ) {
304 24 : let mut expect_iter = expect.iter();
305 : loop {
306 18240 : let o1 = merge_iter.next().await.unwrap();
307 18240 : let o2 = expect_iter.next();
308 18240 : assert_eq!(o1.is_some(), o2.is_some());
309 18240 : if o1.is_none() && o2.is_none() {
310 24 : break;
311 18216 : }
312 18216 : let (k1, l1, v1) = o1.unwrap();
313 18216 : let (k2, l2, v2) = o2.unwrap();
314 18216 : assert_eq!(&k1, k2);
315 18216 : assert_eq!(l1, *l2);
316 18216 : assert_eq!(&v1, v2);
317 : }
318 24 : }
319 :
320 : #[tokio::test]
321 6 : async fn merge_in_between() {
322 6 : use crate::repository::Value;
323 6 : use bytes::Bytes;
324 6 :
325 6 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
326 6 : .await
327 6 : .unwrap();
328 24 : let (tenant, ctx) = harness.load().await;
329 6 :
330 6 : let tline = tenant
331 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
332 12 : .await
333 6 : .unwrap();
334 6 :
335 24 : fn get_key(id: u32) -> Key {
336 24 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
337 24 : key.field6 = id;
338 24 : key
339 24 : }
340 6 : let test_deltas1 = vec![
341 6 : (
342 6 : get_key(0),
343 6 : Lsn(0x10),
344 6 : Value::Image(Bytes::copy_from_slice(b"test")),
345 6 : ),
346 6 : (
347 6 : get_key(5),
348 6 : Lsn(0x10),
349 6 : Value::Image(Bytes::copy_from_slice(b"test")),
350 6 : ),
351 6 : ];
352 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
353 18 : .await
354 6 : .unwrap();
355 6 : let test_deltas2 = vec![
356 6 : (
357 6 : get_key(3),
358 6 : Lsn(0x10),
359 6 : Value::Image(Bytes::copy_from_slice(b"test")),
360 6 : ),
361 6 : (
362 6 : get_key(4),
363 6 : Lsn(0x10),
364 6 : Value::Image(Bytes::copy_from_slice(b"test")),
365 6 : ),
366 6 : ];
367 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
368 18 : .await
369 6 : .unwrap();
370 6 : let mut merge_iter = MergeIterator::create(
371 6 : &[
372 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
373 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
374 6 : ],
375 6 : &[],
376 6 : &ctx,
377 6 : );
378 6 : let mut expect = Vec::new();
379 6 : expect.extend(test_deltas1);
380 6 : expect.extend(test_deltas2);
381 6 : expect.sort_by(sort_delta);
382 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
383 6 : }
384 :
385 : #[tokio::test]
386 6 : async fn delta_merge() {
387 6 : use crate::repository::Value;
388 6 : use bytes::Bytes;
389 6 :
390 6 : let harness = TenantHarness::create("merge_iterator_delta_merge")
391 6 : .await
392 6 : .unwrap();
393 24 : let (tenant, ctx) = harness.load().await;
394 6 :
395 6 : let tline = tenant
396 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
397 12 : .await
398 6 : .unwrap();
399 6 :
400 18000 : fn get_key(id: u32) -> Key {
401 18000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
402 18000 : key.field6 = id;
403 18000 : key
404 18000 : }
405 6 : const N: usize = 1000;
406 6 : let test_deltas1 = (0..N)
407 6000 : .map(|idx| {
408 6000 : (
409 6000 : get_key(idx as u32 / 10),
410 6000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
411 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
412 6000 : )
413 6000 : })
414 6 : .collect_vec();
415 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
416 24 : .await
417 6 : .unwrap();
418 6 : let test_deltas2 = (0..N)
419 6000 : .map(|idx| {
420 6000 : (
421 6000 : get_key(idx as u32 / 10),
422 6000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
423 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
424 6000 : )
425 6000 : })
426 6 : .collect_vec();
427 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
428 24 : .await
429 6 : .unwrap();
430 6 : let test_deltas3 = (0..N)
431 6000 : .map(|idx| {
432 6000 : (
433 6000 : get_key(idx as u32 / 10 + N as u32),
434 6000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
435 6000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
436 6000 : )
437 6000 : })
438 6 : .collect_vec();
439 6 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
440 24 : .await
441 6 : .unwrap();
442 6 : let mut merge_iter = MergeIterator::create(
443 6 : &[
444 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
445 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
446 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
447 6 : ],
448 6 : &[],
449 6 : &ctx,
450 6 : );
451 6 : let mut expect = Vec::new();
452 6 : expect.extend(test_deltas1);
453 6 : expect.extend(test_deltas2);
454 6 : expect.extend(test_deltas3);
455 6 : expect.sort_by(sort_delta);
456 36 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
457 6 :
458 6 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
459 6 : }
460 :
461 : #[tokio::test]
462 6 : async fn delta_image_mixed_merge() {
463 6 : use crate::repository::Value;
464 6 : use bytes::Bytes;
465 6 :
466 6 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
467 6 : .await
468 6 : .unwrap();
469 24 : let (tenant, ctx) = harness.load().await;
470 6 :
471 6 : let tline = tenant
472 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
473 12 : .await
474 6 : .unwrap();
475 6 :
476 54 : fn get_key(id: u32) -> Key {
477 54 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
478 54 : key.field6 = id;
479 54 : key
480 54 : }
481 6 : // In this test case, we want to test if the iterator still works correctly with multiple copies
482 6 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
483 6 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
484 6 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
485 6 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
486 6 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
487 6 : // correctly process these situations and return everything as-is, and the upper layer of the system
488 6 : // will handle duplicated LSNs.
489 6 : let test_deltas1 = vec![
490 6 : (
491 6 : get_key(0),
492 6 : Lsn(0x10),
493 6 : Value::WalRecord(NeonWalRecord::wal_init()),
494 6 : ),
495 6 : (
496 6 : get_key(0),
497 6 : Lsn(0x18),
498 6 : Value::WalRecord(NeonWalRecord::wal_append("a")),
499 6 : ),
500 6 : (
501 6 : get_key(5),
502 6 : Lsn(0x10),
503 6 : Value::WalRecord(NeonWalRecord::wal_init()),
504 6 : ),
505 6 : (
506 6 : get_key(5),
507 6 : Lsn(0x18),
508 6 : Value::WalRecord(NeonWalRecord::wal_append("b")),
509 6 : ),
510 6 : ];
511 6 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
512 18 : .await
513 6 : .unwrap();
514 6 : let mut test_deltas2 = test_deltas1.clone();
515 6 : test_deltas2.push((
516 6 : get_key(10),
517 6 : Lsn(0x20),
518 6 : Value::Image(Bytes::copy_from_slice(b"test")),
519 6 : ));
520 6 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
521 18 : .await
522 6 : .unwrap();
523 6 : let test_deltas3 = vec![
524 6 : (
525 6 : get_key(0),
526 6 : Lsn(0x10),
527 6 : Value::Image(Bytes::copy_from_slice(b"")),
528 6 : ),
529 6 : (
530 6 : get_key(5),
531 6 : Lsn(0x18),
532 6 : Value::Image(Bytes::copy_from_slice(b"b")),
533 6 : ),
534 6 : (
535 6 : get_key(15),
536 6 : Lsn(0x20),
537 6 : Value::Image(Bytes::copy_from_slice(b"test")),
538 6 : ),
539 6 : ];
540 6 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
541 18 : .await
542 6 : .unwrap();
543 6 : let mut test_deltas4 = test_deltas3.clone();
544 6 : test_deltas4.push((
545 6 : get_key(20),
546 6 : Lsn(0x20),
547 6 : Value::Image(Bytes::copy_from_slice(b"test")),
548 6 : ));
549 6 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
550 18 : .await
551 6 : .unwrap();
552 6 : let mut expect = Vec::new();
553 6 : expect.extend(test_deltas1);
554 6 : expect.extend(test_deltas2);
555 6 : expect.extend(test_deltas3);
556 6 : expect.extend(test_deltas4);
557 6 : expect.sort_by(sort_delta_value);
558 6 :
559 6 : // Test with different layer order for MergeIterator::create to ensure the order
560 6 : // is stable.
561 6 :
562 6 : let mut merge_iter = MergeIterator::create(
563 6 : &[
564 6 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
565 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
566 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
567 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
568 6 : ],
569 6 : &[],
570 6 : &ctx,
571 6 : );
572 24 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
573 6 :
574 6 : let mut merge_iter = MergeIterator::create(
575 6 : &[
576 6 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
577 6 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
578 6 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
579 6 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
580 6 : ],
581 6 : &[],
582 6 : &ctx,
583 6 : );
584 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
585 6 :
586 6 : is_send(merge_iter);
587 6 : }
588 :
589 6 : fn is_send(_: impl Send) {}
590 : }
|