Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{binary_heap, BinaryHeap},
4 : };
5 :
6 : use anyhow::bail;
7 : use pageserver_api::key::Key;
8 : use utils::lsn::Lsn;
9 :
10 : use crate::{context::RequestContext, repository::Value};
11 :
12 : use super::{
13 : delta_layer::{DeltaLayerInner, DeltaLayerIterator},
14 : image_layer::{ImageLayerInner, ImageLayerIterator},
15 : };
16 :
17 : #[derive(Clone, Copy)]
18 : enum LayerRef<'a> {
19 : Image(&'a ImageLayerInner),
20 : Delta(&'a DeltaLayerInner),
21 : }
22 :
23 : impl<'a> LayerRef<'a> {
24 514 : fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
25 514 : match self {
26 36 : Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
27 478 : Self::Delta(x) => LayerIterRef::Delta(x.iter(ctx)),
28 : }
29 514 : }
30 :
31 0 : fn layer_dbg_info(&self) -> String {
32 0 : match self {
33 0 : Self::Image(x) => x.layer_dbg_info(),
34 0 : Self::Delta(x) => x.layer_dbg_info(),
35 : }
36 0 : }
37 : }
38 :
39 : enum LayerIterRef<'a> {
40 : Image(ImageLayerIterator<'a>),
41 : Delta(DeltaLayerIterator<'a>),
42 : }
43 :
44 : impl LayerIterRef<'_> {
45 2072088 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
46 2072088 : match self {
47 2071608 : Self::Delta(x) => x.next().await,
48 480 : Self::Image(x) => x.next().await,
49 : }
50 2072088 : }
51 :
52 0 : fn layer_dbg_info(&self) -> String {
53 0 : match self {
54 0 : Self::Image(x) => x.layer_dbg_info(),
55 0 : Self::Delta(x) => x.layer_dbg_info(),
56 : }
57 0 : }
58 : }
59 :
60 : /// This type plays several roles at once
61 : /// 1. Unified iterator for image and delta layers.
62 : /// 2. `Ord` for use in [`MergeIterator::heap`] (for the k-merge).
63 : /// 3. Lazy creation of the real delta/image iterator.
64 : enum IteratorWrapper<'a> {
65 : NotLoaded {
66 : ctx: &'a RequestContext,
67 : first_key_lower_bound: (Key, Lsn),
68 : layer: LayerRef<'a>,
69 : },
70 : Loaded {
71 : iter: PeekableLayerIterRef<'a>,
72 : },
73 : }
74 :
75 : struct PeekableLayerIterRef<'a> {
76 : iter: LayerIterRef<'a>,
77 : peeked: Option<(Key, Lsn, Value)>, // None == end
78 : }
79 :
80 : impl<'a> PeekableLayerIterRef<'a> {
81 514 : async fn create(mut iter: LayerIterRef<'a>) -> anyhow::Result<Self> {
82 643 : let peeked = iter.next().await?;
83 514 : Ok(Self { iter, peeked })
84 514 : }
85 :
86 8450072 : fn peek(&self) -> &Option<(Key, Lsn, Value)> {
87 8450072 : &self.peeked
88 8450072 : }
89 :
90 2071574 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
91 2071574 : let result = self.peeked.take();
92 2071574 : self.peeked = self.iter.next().await?;
93 2071574 : if let (Some((k1, l1, _)), Some((k2, l2, _))) = (&self.peeked, &result) {
94 2070550 : if (k1, l1) < (k2, l2) {
95 0 : bail!("iterator is not ordered: {}", self.iter.layer_dbg_info());
96 2070550 : }
97 1024 : }
98 2071574 : Ok(result)
99 2071574 : }
100 : }
101 :
102 : impl std::cmp::PartialEq for IteratorWrapper<'_> {
103 0 : fn eq(&self, other: &Self) -> bool {
104 0 : self.cmp(other) == Ordering::Equal
105 0 : }
106 : }
107 :
108 : impl std::cmp::Eq for IteratorWrapper<'_> {}
109 :
110 : impl std::cmp::PartialOrd for IteratorWrapper<'_> {
111 4229020 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 4229020 : Some(self.cmp(other))
113 4229020 : }
114 : }
115 :
116 : impl std::cmp::Ord for IteratorWrapper<'_> {
117 4229020 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 : use std::cmp::Ordering;
119 4229020 : let a = self.peek_next_key_lsn_value();
120 4229020 : let b = other.peek_next_key_lsn_value();
121 4229020 : match (a, b) {
122 3021212 : (Some((k1, l1, v1)), Some((k2, l2, v2))) => {
123 6042424 : fn map_value_to_num(val: &Option<&Value>) -> usize {
124 6034026 : match val {
125 8398 : None => 0,
126 6033636 : Some(Value::Image(_)) => 1,
127 390 : Some(Value::WalRecord(_)) => 2,
128 : }
129 6042424 : }
130 3021212 : let order_1 = map_value_to_num(&v1);
131 3021212 : let order_2 = map_value_to_num(&v2);
132 3021212 : // When key_lsn are the same, the unloaded iter will always appear before the loaded one.
133 3021212 : // And note that we do a reverse at the end of the comparison, so it works with the max heap.
134 3021212 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
135 : }
136 1002829 : (Some(_), None) => Ordering::Less,
137 1283 : (None, Some(_)) => Ordering::Greater,
138 203696 : (None, None) => Ordering::Equal,
139 : }
140 4229020 : .reverse()
141 4229020 : }
142 : }
143 :
144 : impl<'a> IteratorWrapper<'a> {
145 36 : pub fn create_from_image_layer(
146 36 : image_layer: &'a ImageLayerInner,
147 36 : ctx: &'a RequestContext,
148 36 : ) -> Self {
149 36 : Self::NotLoaded {
150 36 : layer: LayerRef::Image(image_layer),
151 36 : first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
152 36 : ctx,
153 36 : }
154 36 : }
155 :
156 478 : pub fn create_from_delta_layer(
157 478 : delta_layer: &'a DeltaLayerInner,
158 478 : ctx: &'a RequestContext,
159 478 : ) -> Self {
160 478 : Self::NotLoaded {
161 478 : layer: LayerRef::Delta(delta_layer),
162 478 : first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
163 478 : ctx,
164 478 : }
165 478 : }
166 :
167 8458040 : fn peek_next_key_lsn_value(&self) -> Option<(&Key, Lsn, Option<&Value>)> {
168 8458040 : match self {
169 8449558 : Self::Loaded { iter } => iter
170 8449558 : .peek()
171 8449558 : .as_ref()
172 8449558 : .map(|(key, lsn, val)| (key, *lsn, Some(val))),
173 8482 : Self::NotLoaded {
174 8482 : first_key_lower_bound: (key, lsn),
175 8482 : ..
176 8482 : } => Some((key, *lsn, None)),
177 : }
178 8458040 : }
179 :
180 : // CORRECTNESS: this function must always take `&mut self`, never `&self`.
181 : //
182 : // The reason is that `impl Ord for Self` evaluates differently after this function
183 : // returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
184 : // the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
185 : // and not just `PeekMut::deref`
186 : // If we don't take `&mut self`
187 514 : async fn load(&mut self) -> anyhow::Result<()> {
188 514 : assert!(!self.is_loaded());
189 514 : let Self::NotLoaded {
190 514 : ctx,
191 514 : first_key_lower_bound,
192 514 : layer,
193 514 : } = self
194 : else {
195 0 : unreachable!()
196 : };
197 514 : let iter = layer.iter(ctx);
198 643 : let iter = PeekableLayerIterRef::create(iter).await?;
199 514 : if let Some((k1, l1, _)) = iter.peek() {
200 514 : let (k2, l2) = first_key_lower_bound;
201 514 : if (k1, l1) < (k2, l2) {
202 0 : bail!(
203 0 : "layer key range did not include the first key in the layer: {}",
204 0 : layer.layer_dbg_info()
205 0 : );
206 514 : }
207 0 : }
208 514 : *self = Self::Loaded { iter };
209 514 : Ok(())
210 514 : }
211 :
212 2072602 : fn is_loaded(&self) -> bool {
213 2072602 : matches!(self, Self::Loaded { .. })
214 2072602 : }
215 :
216 : /// Correctness: must load the iterator before using.
217 : ///
218 : /// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
219 : /// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
220 : /// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
221 2071574 : async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
222 2071574 : let Self::Loaded { iter } = self else {
223 0 : panic!("must load the iterator before using")
224 : };
225 2071574 : iter.next().await
226 2071574 : }
227 : }
228 :
229 : /// A merge iterator over delta/image layer iterators.
230 : ///
231 : /// When duplicated records are found, the iterator will not perform any
232 : /// deduplication, and the caller should handle these situation. By saying
233 : /// duplicated records, there are many possibilities:
234 : ///
235 : /// * Two same delta at the same LSN.
236 : /// * Two same image at the same LSN.
237 : /// * Delta/image at the same LSN where the image has already applied the delta.
238 : ///
239 : /// The iterator will always put the image before the delta.
240 : pub struct MergeIterator<'a> {
241 : heap: BinaryHeap<IteratorWrapper<'a>>,
242 : }
243 :
244 : impl<'a> MergeIterator<'a> {
245 66 : pub fn create(
246 66 : deltas: &[&'a DeltaLayerInner],
247 66 : images: &[&'a ImageLayerInner],
248 66 : ctx: &'a RequestContext,
249 66 : ) -> Self {
250 66 : let mut heap = Vec::with_capacity(images.len() + deltas.len());
251 102 : for image in images {
252 36 : heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
253 36 : }
254 544 : for delta in deltas {
255 478 : heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
256 478 : }
257 66 : Self {
258 66 : heap: BinaryHeap::from(heap),
259 66 : }
260 66 : }
261 :
262 2071126 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
263 2072152 : while let Some(mut iter) = self.heap.peek_mut() {
264 2072088 : if !iter.is_loaded() {
265 : // Once we load the iterator, we can know the real first key-value pair in the iterator.
266 : // We put it back into the heap so that a potentially unloaded layer may have a key between
267 : // [potential_first_key, loaded_first_key).
268 643 : iter.load().await?;
269 514 : continue;
270 2071574 : }
271 2071574 : let Some(item) = iter.next().await? else {
272 : // If the iterator returns None, we pop this iterator. Actually, in the current implementation,
273 : // we order None > Some, and all the rest of the iterators should return None.
274 512 : binary_heap::PeekMut::pop(iter);
275 512 : continue;
276 : };
277 2071062 : return Ok(Some(item));
278 : }
279 64 : Ok(None)
280 2071126 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 : use super::*;
286 :
287 : use itertools::Itertools;
288 : use pageserver_api::key::Key;
289 : use utils::lsn::Lsn;
290 :
291 : use crate::{
292 : tenant::{
293 : harness::{TenantHarness, TIMELINE_ID},
294 : storage_layer::delta_layer::test::{produce_delta_layer, sort_delta, sort_delta_value},
295 : },
296 : walrecord::NeonWalRecord,
297 : DEFAULT_PG_VERSION,
298 : };
299 :
300 8 : async fn assert_merge_iter_equal(
301 8 : merge_iter: &mut MergeIterator<'_>,
302 8 : expect: &[(Key, Lsn, Value)],
303 8 : ) {
304 8 : let mut expect_iter = expect.iter();
305 : loop {
306 6080 : let o1 = merge_iter.next().await.unwrap();
307 6080 : let o2 = expect_iter.next();
308 6080 : assert_eq!(o1.is_some(), o2.is_some());
309 6080 : if o1.is_none() && o2.is_none() {
310 8 : break;
311 6072 : }
312 6072 : let (k1, l1, v1) = o1.unwrap();
313 6072 : let (k2, l2, v2) = o2.unwrap();
314 6072 : assert_eq!(&k1, k2);
315 6072 : assert_eq!(l1, *l2);
316 6072 : assert_eq!(&v1, v2);
317 : }
318 8 : }
319 :
320 : #[tokio::test]
321 2 : async fn merge_in_between() {
322 2 : use crate::repository::Value;
323 2 : use bytes::Bytes;
324 2 :
325 2 : let harness = TenantHarness::create("merge_iterator_merge_in_between")
326 2 : .await
327 2 : .unwrap();
328 7 : let (tenant, ctx) = harness.load().await;
329 2 :
330 2 : let tline = tenant
331 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
332 4 : .await
333 2 : .unwrap();
334 2 :
335 8 : fn get_key(id: u32) -> Key {
336 8 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
337 8 : key.field6 = id;
338 8 : key
339 8 : }
340 2 : let test_deltas1 = vec![
341 2 : (
342 2 : get_key(0),
343 2 : Lsn(0x10),
344 2 : Value::Image(Bytes::copy_from_slice(b"test")),
345 2 : ),
346 2 : (
347 2 : get_key(5),
348 2 : Lsn(0x10),
349 2 : Value::Image(Bytes::copy_from_slice(b"test")),
350 2 : ),
351 2 : ];
352 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
353 6 : .await
354 2 : .unwrap();
355 2 : let test_deltas2 = vec![
356 2 : (
357 2 : get_key(3),
358 2 : Lsn(0x10),
359 2 : Value::Image(Bytes::copy_from_slice(b"test")),
360 2 : ),
361 2 : (
362 2 : get_key(4),
363 2 : Lsn(0x10),
364 2 : Value::Image(Bytes::copy_from_slice(b"test")),
365 2 : ),
366 2 : ];
367 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
368 6 : .await
369 2 : .unwrap();
370 2 : let mut merge_iter = MergeIterator::create(
371 2 : &[
372 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
373 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
374 2 : ],
375 2 : &[],
376 2 : &ctx,
377 2 : );
378 2 : let mut expect = Vec::new();
379 2 : expect.extend(test_deltas1);
380 2 : expect.extend(test_deltas2);
381 2 : expect.sort_by(sort_delta);
382 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
383 2 : }
384 :
385 : #[tokio::test]
386 2 : async fn delta_merge() {
387 2 : use crate::repository::Value;
388 2 : use bytes::Bytes;
389 2 :
390 2 : let harness = TenantHarness::create("merge_iterator_delta_merge")
391 2 : .await
392 2 : .unwrap();
393 8 : let (tenant, ctx) = harness.load().await;
394 2 :
395 2 : let tline = tenant
396 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
397 4 : .await
398 2 : .unwrap();
399 2 :
400 6000 : fn get_key(id: u32) -> Key {
401 6000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
402 6000 : key.field6 = id;
403 6000 : key
404 6000 : }
405 2 : const N: usize = 1000;
406 2 : let test_deltas1 = (0..N)
407 2000 : .map(|idx| {
408 2000 : (
409 2000 : get_key(idx as u32 / 10),
410 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1)),
411 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
412 2000 : )
413 2000 : })
414 2 : .collect_vec();
415 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
416 8 : .await
417 2 : .unwrap();
418 2 : let test_deltas2 = (0..N)
419 2000 : .map(|idx| {
420 2000 : (
421 2000 : get_key(idx as u32 / 10),
422 2000 : Lsn(0x20 * ((idx as u64) % 10 + 1) + 0x10),
423 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
424 2000 : )
425 2000 : })
426 2 : .collect_vec();
427 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
428 8 : .await
429 2 : .unwrap();
430 2 : let test_deltas3 = (0..N)
431 2000 : .map(|idx| {
432 2000 : (
433 2000 : get_key(idx as u32 / 10 + N as u32),
434 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
435 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
436 2000 : )
437 2000 : })
438 2 : .collect_vec();
439 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
440 8 : .await
441 2 : .unwrap();
442 2 : let mut merge_iter = MergeIterator::create(
443 2 : &[
444 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
445 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
446 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
447 2 : ],
448 2 : &[],
449 2 : &ctx,
450 2 : );
451 2 : let mut expect = Vec::new();
452 2 : expect.extend(test_deltas1);
453 2 : expect.extend(test_deltas2);
454 2 : expect.extend(test_deltas3);
455 2 : expect.sort_by(sort_delta);
456 12 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
457 2 :
458 2 : // TODO: test layers are loaded only when needed, reducing num of active iterators in k-merge
459 2 : }
460 :
461 : #[tokio::test]
462 2 : async fn delta_image_mixed_merge() {
463 2 : use crate::repository::Value;
464 2 : use bytes::Bytes;
465 2 :
466 2 : let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
467 2 : .await
468 2 : .unwrap();
469 8 : let (tenant, ctx) = harness.load().await;
470 2 :
471 2 : let tline = tenant
472 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
473 4 : .await
474 2 : .unwrap();
475 2 :
476 18 : fn get_key(id: u32) -> Key {
477 18 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
478 18 : key.field6 = id;
479 18 : key
480 18 : }
481 2 : // In this test case, we want to test if the iterator still works correctly with multiple copies
482 2 : // of a delta+image at the same LSN, for example, the following sequence a@10=+a, a@10=+a, a@10=ab, a@10=ab.
483 2 : // Duplicated deltas/images are possible for old tenants before the full L0 compaction file name fix.
484 2 : // An incomplete compaction could produce multiple exactly-the-same delta layers. Force image generation
485 2 : // could produce overlapping images. Apart from duplicated deltas/images, in the current storage implementation
486 2 : // one key-lsn could have a delta in the delta layer and one image in the image layer. The iterator should
487 2 : // correctly process these situations and return everything as-is, and the upper layer of the system
488 2 : // will handle duplicated LSNs.
489 2 : let test_deltas1 = vec![
490 2 : (
491 2 : get_key(0),
492 2 : Lsn(0x10),
493 2 : Value::WalRecord(NeonWalRecord::wal_init()),
494 2 : ),
495 2 : (
496 2 : get_key(0),
497 2 : Lsn(0x18),
498 2 : Value::WalRecord(NeonWalRecord::wal_append("a")),
499 2 : ),
500 2 : (
501 2 : get_key(5),
502 2 : Lsn(0x10),
503 2 : Value::WalRecord(NeonWalRecord::wal_init()),
504 2 : ),
505 2 : (
506 2 : get_key(5),
507 2 : Lsn(0x18),
508 2 : Value::WalRecord(NeonWalRecord::wal_append("b")),
509 2 : ),
510 2 : ];
511 2 : let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
512 6 : .await
513 2 : .unwrap();
514 2 : let mut test_deltas2 = test_deltas1.clone();
515 2 : test_deltas2.push((
516 2 : get_key(10),
517 2 : Lsn(0x20),
518 2 : Value::Image(Bytes::copy_from_slice(b"test")),
519 2 : ));
520 2 : let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
521 6 : .await
522 2 : .unwrap();
523 2 : let test_deltas3 = vec![
524 2 : (
525 2 : get_key(0),
526 2 : Lsn(0x10),
527 2 : Value::Image(Bytes::copy_from_slice(b"")),
528 2 : ),
529 2 : (
530 2 : get_key(5),
531 2 : Lsn(0x18),
532 2 : Value::Image(Bytes::copy_from_slice(b"b")),
533 2 : ),
534 2 : (
535 2 : get_key(15),
536 2 : Lsn(0x20),
537 2 : Value::Image(Bytes::copy_from_slice(b"test")),
538 2 : ),
539 2 : ];
540 2 : let resident_layer_3 = produce_delta_layer(&tenant, &tline, test_deltas3.clone(), &ctx)
541 6 : .await
542 2 : .unwrap();
543 2 : let mut test_deltas4 = test_deltas3.clone();
544 2 : test_deltas4.push((
545 2 : get_key(20),
546 2 : Lsn(0x20),
547 2 : Value::Image(Bytes::copy_from_slice(b"test")),
548 2 : ));
549 2 : let resident_layer_4 = produce_delta_layer(&tenant, &tline, test_deltas4.clone(), &ctx)
550 6 : .await
551 2 : .unwrap();
552 2 : let mut expect = Vec::new();
553 2 : expect.extend(test_deltas1);
554 2 : expect.extend(test_deltas2);
555 2 : expect.extend(test_deltas3);
556 2 : expect.extend(test_deltas4);
557 2 : expect.sort_by(sort_delta_value);
558 2 :
559 2 : // Test with different layer order for MergeIterator::create to ensure the order
560 2 : // is stable.
561 2 :
562 2 : let mut merge_iter = MergeIterator::create(
563 2 : &[
564 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
565 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
566 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
567 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
568 2 : ],
569 2 : &[],
570 2 : &ctx,
571 2 : );
572 8 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
573 2 :
574 2 : let mut merge_iter = MergeIterator::create(
575 2 : &[
576 2 : resident_layer_1.get_as_delta(&ctx).await.unwrap(),
577 2 : resident_layer_4.get_as_delta(&ctx).await.unwrap(),
578 2 : resident_layer_3.get_as_delta(&ctx).await.unwrap(),
579 2 : resident_layer_2.get_as_delta(&ctx).await.unwrap(),
580 2 : ],
581 2 : &[],
582 2 : &ctx,
583 2 : );
584 4 : assert_merge_iter_equal(&mut merge_iter, &expect).await;
585 2 :
586 2 : is_send(merge_iter);
587 2 : }
588 :
589 2 : fn is_send(_: impl Send) {}
590 : }
|